當前位置:
首頁 > 知識 > 標籤傳播演算法(Label Propagation)及 Python 實現

標籤傳播演算法(Label Propagation)及 Python 實現

(點擊

上方藍字

,快速關注我們)




來源:zouxy09


blog.csdn.net/zouxy09/article/details/49105265


如有好文章投稿,請點擊 → 這裡了解詳情




眾所周知,機器學習可以大體分為三大類:監督學習、非監督學習和半監督學習。監督學習可以認為是我們有非常多的labeled標註數據來train一個模型,期待這個模型能學習到數據的分布,以期對未來沒有見到的樣本做預測。那這個性能的源頭–訓練數據,就顯得非常感覺。你必須有足夠的訓練數據,以覆蓋真正現實數據中的樣本分布才可以,這樣學習到的模型才有意義。那非監督學習就是沒有任何的labeled數據,就是平時所說的聚類了,利用他們本身的數據分布,給他們劃分類別。而半監督學習,顧名思義就是處於兩者之間的,只有少量的labeled數據,我們試圖從這少量的labeled數據和大量的unlabeled數據中學習到有用的信息。



一、半監督學習




半監督學習(Semi-supervised learning)發揮作用的場合是:你的數據有一些有label,一些沒有。而且一般是絕大部分都沒有,只有少許幾個有label。半監督學習演算法會充分的利用unlabeled數據來捕捉我們整個數據的潛在分布。它基於三大假設:






  1. Smoothness平滑假設:相似的數據具有相同的label。



  2. Cluster聚類假設:處於同一個聚類下的數據具有相同label。



  3. Manifold流形假設:處於同一流形結構下的數據具有相同label。




例如下圖,只有兩個labeled數據,如果直接用他們來訓練一個分類器,例如LR或者SVM,那麼學出來的分類面就是左圖那樣的。如果現實中,這個數據是右圖那邊分布的話,豬都看得出來,左圖訓練的這個分類器爛的一塌糊塗、慘不忍睹。因為我們的labeled訓練數據太少了,都沒辦法覆蓋我們未來可能遇到的情況。但是,如果右圖那樣,把大量的unlabeled數據(黑色的)都考慮進來,有個全局觀念,牛逼的演算法會發現,哎喲,原來是兩個圈圈(分別處於兩個圓形的流形之上)!那演算法就很聰明,把大圈的數據都歸類為紅色類別,把內圈的數據都歸類為藍色類別。因為,實踐中,labeled數據是昂貴,很難獲得的,但unlabeled數據就不是了,寫個腳本在網上爬就可以了,因此如果能充分利用大量的unlabeled數據來輔助提升我們的模型學習,這個價值就非常大。






半監督學習演算法有很多,下面我們介紹最簡單的標籤傳播演算法(label propagation),最喜歡簡單了,哈哈。




二、標籤傳播演算法




標籤傳播演算法(label propagation)的核心思想非常簡單:相似的數據應該具有相同的label。LP演算法包括兩大步驟:1)構造相似矩陣;2)勇敢的傳播吧。



2.1、相似矩陣構建




LP演算法是基於Graph的,因此我們需要先構建一個圖。我們為所有的數據構建一個圖,圖的節點就是一個數據點,包含labeled和unlabeled的數據。節點i和節點j的邊表示他們的相似度。這個圖的構建方法有很多,這裡我們假設這個圖是全連接的,節點i和節點j的邊權重為:







這裡,α是超參。




還有個非常常用的圖構建方法是knn圖,也就是只保留每個節點的k近鄰權重,其他的為0,也就是不存在邊,因此是稀疏的相似矩陣。



2.2、LP演算法




標籤傳播演算法非常簡單:通過節點之間的邊傳播label。邊的權重越大,表示兩個節點越相似,那麼label越容易傳播過去。我們定義一個NxN的概率轉移矩陣P:







Pij表示從節點i轉移到節點j的概率。假設有C個類和L個labeled樣本,我們定義一個LxC的label矩陣YL,第i行表示第i個樣本的標籤指示向量,即如果第i個樣本的類別是j,那麼該行的第j個元素為1,其他為0。同樣,我們也給U個unlabeled樣本一個UxC的label矩陣YU。把他們合併,我們得到一個NxC的soft label矩陣F=[YL;YU]。soft label的意思是,我們保留樣本i屬於每個類別的概率,而不是互斥性的,這個樣本以概率1隻屬於一個類。當然了,最後確定這個樣本i的類別的時候,是取max也就是概率最大的那個類作為它的類別的。那F裡面有個YU,它一開始是不知道的,那最開始的值是多少?無所謂,隨便設置一個值就可以了。



千呼萬喚始出來,簡單的LP演算法如下:






  1. 執行傳播:F=PF



  2. 重置F中labeled樣本的標籤:FL=YL



  3. 重複步驟1)和2)直到F收斂。




步驟1)就是將矩陣P和矩陣F相乘,這一步,每個節點都將自己的label以P確定的概率傳播給其他節點。如果兩個節點越相似(在歐式空間中距離越近),那麼對方的label就越容易被自己的label賦予,就是更容易拉幫結派。步驟2)非常關鍵,因為labeled數據的label是事先確定的,它不能被帶跑,所以每次傳播完,它都得回歸它本來的label。隨著labeled數據不斷的將自己的label傳播出去,最後的類邊界會穿越高密度區域,而停留在低密度的間隔中。相當於每個不同類別的labeled樣本劃分了勢力範圍。




2.3、變身的LP演算法



我們知道,我們每次迭代都是計算一個soft label矩陣F=[YL;YU],但是YL是已知的,計算它沒有什麼用,在步驟2)的時候,還得把它弄回來。我們關心的只是YU,那我們能不能只計算YU呢?Yes。我們將矩陣P做以下劃分:







這時候,我們的演算法就一個運算:







迭代上面這個步驟直到收斂就ok了,是不是很cool。可以看到FU不但取決於labeled數據的標籤及其轉移概率,還取決了unlabeled數據的當前label和轉移概率。因此LP演算法能額外運用unlabeled數據的分布特點。



這個演算法的收斂性也非常容易證明,具體見參考文獻[1]。實際上,它是可以收斂到一個凸解的:







所以我們也可以直接這樣求解,以獲得最終的YU。但是在實際的應用過程中,由於矩陣求逆需要O(n3)的複雜度,所以如果unlabeled數據非常多,那麼I – PUU矩陣的求逆將會非常耗時,因此這時候一般選擇迭代演算法來實現。




三、LP演算法的Python實現



Python環境的搭建就不啰嗦了,可以參考前面的博客。需要額外依賴的庫是經典的numpy和matplotlib。代碼中包含了兩種圖的構建方法:RBF和KNN指定。同時,自己生成了兩個toy資料庫:兩條長形形狀和兩個圈圈的數據。第四部分我們用大點的資料庫來做實驗,先簡單的可視化驗證代碼的正確性,再前線。




演算法代碼:





import time  


import numpy

as

np

  


  


# return k neighbors index  


def navie_knn

(

dataSet

,

query

,

k

)

:  


    

numSamples

=

dataSet

.

shape

[

0

]

  


  


    

## step 1: calculate Euclidean distance  


    

diff

=

np

.

tile

(

query

,

(

numSamples

,

1

))

-

dataSet  


    

squaredDiff

=

diff *

*

2

  


    

squaredDist

=

np

.

sum

(

squaredDiff

,

axis

=

1

)

# sum is performed by row  


  


    

## step 2: sort the distance  


    

sortedDistIndices

=

np

.

argsort

(

squaredDist

)

  


    

if

k

>

len

(

sortedDistIndices

)

:  


        

k

=

len

(

sortedDistIndices

)

  


  


    

return

sortedDistIndices

[

0

:

k

]

  


  


  


# build a big graph (normalized weight matrix)  


def buildGraph

(

MatX

,

kernel_type

,

rbf_sigma

=

None

,

knn_num_neighbors

=

None

)

:  


    

num_samples

=

MatX

.

shape

[

0

]

  


    

affinity_matrix

=

np

.

zeros

((

num_samples

,

num_samples

),

np

.

float32

)

  


    

if

kernel_type

==

"rbf"

:  


        

if

rbf_sigma

==

None

:  


            

raise ValueError

(

"You should input a sigma of rbf kernel!"

)

  


        

for

i

in

xrange

(

num_samples

)

:  


            

row_sum

=

0.0

  


            

for

j

in

xrange

(

num_samples

)

:  


                

diff

=

MatX

[

i

,

:

]

-

MatX

[

j

,

:

]

  


                

affinity_matrix

[

i

][

j

]

=

np

.

exp

(

sum

(

diff*

*

2

)

/

(

-

2.0

*

rbf_sigma*

*

2

))

  


                

row_sum

+=

affinity_matrix

[

i

][

j

]

  


            

affinity_matrix

[

i

][

:

]

/=

row_sum  


    elif

kernel_type

==

"knn"

:  


        

if

knn_num_neighbors

==

None

:  


            

raise ValueError

(

"You should input a k of knn kernel!"

)

  


        

for

i

in

xrange

(

num_samples

)

:  


            

k_neighbors

=

navie_knn

(

MatX

,

MatX

[

i

,

:

],

knn_num_neighbors

)

  


            

affinity_matrix

[

i

][

k_neighbors

]

=

1.0

/

knn_num_neighbors  


    

else

:  


        

raise NameError

(

"Not support kernel type! You can use knn or rbf!"

)

  


      


    

return

affinity_matrix

  


  


  


# label propagation  


def labelPropagation

(

Mat_Label

,

Mat_Unlabel

,

labels

,

kernel_type

=

"rbf"

,

rbf_sigma

=

1.5

,

  


                    

knn_num_neighbors

=

10

,

max_iter

=

500

,

tol

=

1e

-

3

)

:  


    

# initialize  


    

num_label_samples

=

Mat_Label

.

shape

[

0

]

  


    

num_unlabel_samples

=

Mat_Unlabel

.

shape

[

0

]

  


    

num_samples

=

num_label_samples

+

num_unlabel_samples  


    

labels_list

=

np

.

unique

(

labels

)

  


    

num_classes

=

len

(

labels_list

)

  


      


    

MatX

=

np

.

vstack

((

Mat_Label

,

Mat_Unlabel

))

  


    

clamp_data_label

=

np

.

zeros

((

num_label_samples

,

num_classes

),

np

.

float32

)

  


    

for

i

in

xrange

(

num_label_samples

)

:  


        

clamp_data_label

[

i

][

labels

[

i

]]

=

1.0

  


      


    

label_function

=

np

.

zeros

((

num_samples

,

num_classes

),

np

.

float32

)

  


    

label_function

[

0

:

num_label_samples

]

=

clamp_data_label  


    

label_function

[

num_label_samples

:

num_samples

]

= -

1

  


      


    

# graph construction  


    

affinity_matrix

=

buildGraph

(

MatX

,

kernel_type

,

rbf_sigma

,

knn_num_neighbors

)

  


      


    

# start to propagation  


    

iter

=

0

;

pre_label_function

=

np

.

zeros

((

num_samples

,

num_classes

),

np

.

float32

)

  


    

changed

=

np

.

abs

(

pre_label_function

-

label_function

).

sum

()

  


    

while

iter

<

max_iter

and

changed

>

tol

:  


        

if

iter

%

1

==

0

:  


            

print

"---> Iteration %d/%d, changed: %f"

%

(

iter

,

max_iter

,

changed

)

  


        

pre_label_function

=

label_function  


        

iter

+=

1

  


          


        

# propagation  


        

label_function

=

np

.

dot

(

affinity_matrix

,

label_function

)

  


          


        

# clamp  


        

label_function

[

0

:

num_label_samples

]

=

clamp_data_label

  


          


        

# check converge  


        

changed

=

np

.

abs

(

pre_label_function

-

label_function

).

sum

()

  


      


    

# get terminate label of unlabeled data  


    

unlabel_data_labels

=

np

.

zeros

(

num_unlabel_samples

)

  


    

for

i

in

xrange

(

num_unlabel_samples

)

:  


        

unlabel_data_labels

[

i

]

=

np

.

argmax

(

label_function

[

i

+

num_label_samples

])

  


      


    

return

unlabel_data_labels




測試代碼:





import time  


import math  


import numpy

as

np  


from label_propagation import

labelPropagation

  


  


# show  


def show

(

Mat_Label

,

labels

,

Mat_Unlabel

,

unlabel_data_labels

)

:  


    

import

matplotlib

.

pyplot

as

plt  


      


    

for

i

in

range

(

Mat_Label

.

shape

[

0

])

:  


        

if

int

(

labels

[

i

])

==

0

:    


            

plt

.

plot

(

Mat_Label

[

i

,

0

],

Mat_Label

[

i

,

1

],

"Dr"

)

    


        

elif

int

(

labels

[

i

])

==

1

:    


            

plt

.

plot

(

Mat_Label

[

i

,

0

],

Mat_Label

[

i

,

1

],

"Db"

)

  


        

else

:  


            

plt

.

plot

(

Mat_Label

[

i

,

0

],

Mat_Label

[

i

,

1

],

"Dy"

)

  


      


    

for

i

in

range

(

Mat_Unlabel

.

shape

[

0

])

:  


        

if

int

(

unlabel_data_labels

[

i

])

==

0

:    


            

plt

.

plot

(

Mat_Unlabel

[

i

,

0

],

Mat_Unlabel

[

i

,

1

],

"or"

)

    


        

elif

int

(

unlabel_data_labels

[

i

])

==

1

:    


            

plt

.

plot

(

Mat_Unlabel

[

i

,

0

],

Mat_Unlabel

[

i

,

1

],

"ob"

)

  


        

else

:  


            

plt

.

plot

(

Mat_Unlabel

[

i

,

0

],

Mat_Unlabel

[

i

,

1

],

"oy"

)

  


      


    

plt

.

xlabel

(

"X1"

);

plt

.

ylabel

(

"X2"

)

  


    

plt

.

xlim

(

0.0

,

12.

)

  


    

plt

.

ylim

(

0.0

,

12.

)

  


    

plt

.

show

()

    


  


  


def loadCircleData

(

num_data

)

:  


    

center

=

np

.

array

([

5.0

,

5.0

])

  


    

radiu_inner

=

2

  


    

radiu_outer

=

4

  


    

num_inner

=

num_data

/

3

  


    

num_outer

=

num_data

-

num_inner  


      


    

data

=

[]

  


    

theta

=

0.0

  


    

for

i

in

range

(

num_inner

)

:  


        

pho

=

(

theta

%

360

)

*

math

.

pi

/

180

  


        

tmp

=

np

.

zeros

(

2

,

np

.

float32

)

  


        

tmp

[

0

]

=

radiu_inner *

math

.

cos

(

pho

)

+

np

.

random

.

rand

(

1

)

+

center

[

0

]

  


        

tmp

[

1

]

=

radiu_inner *

math

.

sin

(

pho

)

+

np

.

random

.

rand

(

1

)

+

center

[

1

]

  


        

data

.

append

(

tmp

)

  


        

theta

+=

2

  


      


    

theta

=

0.0

  


    

for

i

in

range

(

num_outer

)

:  


        

pho

=

(

theta

%

360

)

*

math

.

pi

/

180

  


        

tmp

=

np

.

zeros

(

2

,

np

.

float32

)

  


        

tmp

[

0

]

=

radiu_outer *

math

.

cos

(

pho

)

+

np

.

random

.

rand

(

1

)

+

center

[

0

]

  


        

tmp

[

1

]

=

radiu_outer *

math

.

sin

(

pho

)

+

np

.

random

.

rand

(

1

)

+

center

[

1

]

  


        

data

.

append

(

tmp

)

  


        

theta

+=

1

  


      


    

Mat_Label

=

np

.

zeros

((

2

,

2

),

np

.

float32

)

  


    

Mat_Label

[

0

]

=

center

+

np

.

array

([

-

radiu_inner

+

0.5

,

0

])

  


    

Mat_Label

[

1

]

=

center

+

np

.

array

([

-

radiu_outer

+

0.5

,

0

])

  


    

labels

=

[

0

,

1

]

  


    

Mat_Unlabel

=

np

.

vstack

(

data

)

  


    

return

Mat_Label

,

labels

,

Mat_Unlabel  


  


  


def loadBandData

(

num_unlabel_samples

)

:  


    

#Mat_Label = np.array([[5.0, 2.], [5.0, 8.0]])  


    

#labels = [0, 1]  


    

#Mat_Unlabel = np.array([[5.1, 2.], [5.0, 8.1]])  


      


    

Mat_Label

=

np

.

array

([[

5.0

,

2.

],

[

5.0

,

8.0

]])

  


    

labels

=

[

0

,

1

]

  


    

num_dim

=

Mat_Label

.

shape

[

1

]

  


    

Mat_Unlabel

=

np

.

zeros

((

num_unlabel_samples

,

num_dim

),

np

.

float32

)

  


    

Mat_Unlabel

[

:

num_unlabel_samples

/

2

,

:

]

=

(

np

.

random

.

rand

(

num_unlabel_samples

/

2

,

num_dim

)

-

0.5

)

*

np

.

array

([

3

,

1

])

+

Mat_Label

[

0

]

  


    

Mat_Unlabel

[

num_unlabel_samples

/

2

:

num_unlabel_samples

,

:

]

=

(

np

.

random

.

rand

(

num_unlabel_samples

/

2

,

num_dim

)

-

0.5

)

*

np

.

array

([

3

,

1

])

+

Mat_Label

[

1

]

  


    

return

Mat_Label

,

labels

,

Mat_Unlabel

  


  


  


# main function  


if

__name__

==

"__main__"

:  


    

num_unlabel_samples

=

800

  


    

#Mat_Label, labels, Mat_Unlabel = loadBandData(num_unlabel_samples)  


    

Mat_Label

,

labels

,

Mat_Unlabel

=

loadCircleData

(

num_unlabel_samples

)

  


      


    

## Notice: when use "rbf" as our kernel, the choice of hyper parameter "sigma" is very import! It should be  


    

## chose according to your dataset, specific the distance of two data points. I think it should ensure that  


    

## each point has about 10 knn or w_i,j is large enough. It also influence the speed of converge. So, may be  


    

## "knn" kernel is better!  


    

#unlabel_data_labels = labelPropagation(Mat_Label, Mat_Unlabel, labels, kernel_type = "rbf", rbf_sigma = 0.2)  


    

unlabel_data_labels

=

labelPropagation

(

Mat_Label

,

Mat_Unlabel

,

labels

,

kernel_type

=

"knn"

,

knn_num_neighbors

=

10

,

max_iter

=

400

)

  


    

show

(

Mat_Label

,

labels

,

Mat_Unlabel

,

unlabel_data_labels

)

  




該注釋的,代碼都注釋的,有看不明白的,歡迎交流。不同迭代次數時候的結果如下:







是不是很漂亮的傳播過程?!在數值上也是可以看到隨著迭代的進行逐漸收斂的,迭代的數值變化過程如下:





四、LP演算法MPI並行實現




這裡,我們測試的是LP的變身版本。從公式,我們可以看到,第二項PULYL迭代過程並沒有發生變化,所以這部分實際上從迭代開始就可以計算好,從而避免重複計算。不過,不管怎樣,LP演算法都要計算一個UxU的矩陣PUU和一個UxC矩陣FU的乘積。當我們的unlabeled數據非常多,而且類別也很多的時候,計算是很慢的,同時佔用的內存量也非常大。另外,構造Graph需要計算兩兩的相似度,也是O(n2)的複雜度,當我們數據的特徵維度很大的時候,這個計算量也是非常客觀的。所以我們就得考慮並行處理了。而且最好是能放到集群上並行。那如何並行呢?




對演算法的並行化,一般分為兩種:數據並行和模型並行。




數據並行很好理解,就是將數據劃分,每個節點只處理一部分數據,例如我們構造圖的時候,計算每個數據的k近鄰。例如我們有1000個樣本和20個CPU節點,那麼就平均分發,讓每個CPU節點計算50個樣本的k近鄰,然後最後再合併大家的結果。可見這個加速比也是非常可觀的。




模型並行一般發生在模型很大,無法放到單機的內存裡面的時候。例如龐大的深度神經網路訓練的時候,就需要把這個網路切開,然後分別求解梯度,最後有個leader的節點來收集大家的梯度,再反饋給大家去更新。當然了,其中存在更細緻和高效的工程處理方法。在我們的LP演算法中,也是可以做模型並行的。假如我們的類別數C很大,把類別數切開,讓不同的CPU節點處理,實際上就相當於模型並行了。




那為啥不切大矩陣PUU,而是切小點的矩陣FU,因為大矩陣PUU沒法獨立分塊,並行的一個原則是處理必須是獨立的。 矩陣FU依賴的是所有的U,而把PUU切開分發到其他節點的時候,每次FU的更新都需要和其他的節點通信,這個通信的代價是很大的(實際上,很多並行系統沒法達到線性的加速度的瓶頸是通信!線性加速比是,我增加了n台機器,速度就提升了n倍)。但是對類別C也就是矩陣FU切分,就不會有這個問題,因為他們的計算是獨立的。只是決定樣本的最終類別的時候,將所有的FU收集回來求max就可以了。




所以,在下面的代碼中,是同時包含了數據並行和模型並行的雛形的。另外,還值得一提的是,我們是迭代演算法,那決定什麼時候迭代演算法停止?除了判斷收斂外,我們還可以讓每迭代幾步,就用測試label測試一次結果,看模型的整體訓練性能如何。特別是判斷訓練是否過擬合的時候非常有效。因此,代碼中包含了這部分內容。




好了,代碼終於來了。大家可以搞點大資料庫來測試,如果有MPI集群條件的話就更好了。




下面的代碼依賴numpy、scipy(用其稀疏矩陣加速計算)和mpi4py。其中mpi4py需要依賴openmpi和Cpython,可以參考我之前的博客進行安裝。




由於微信字數限制,此部分的代碼通過點擊閱讀原文查看





五、參考資料




[1]Semi-SupervisedLearning with Graphs.pdf




看完本文有收穫?請轉發分享給更多人


關注「大數據與機器學習文摘」,成為Top 1%


喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 Python開發者 的精彩文章:

用 Python 3 的 async / await 做非同步編程
為什麼 Python 增長如此之快?
讓你的 Python 代碼優雅又地道
一個 Reentrant Error 引發的對 Python 信號機制的探索和思考
初探 Python 3 的非同步 IO 編程

TAG:Python開發者 |