標籤傳播演算法(Label Propagation)及 Python 實現
(點擊
上方藍字
,快速關注我們)
來源:zouxy09
blog.csdn.net/zouxy09/article/details/49105265
如有好文章投稿,請點擊 → 這裡了解詳情
眾所周知,機器學習可以大體分為三大類:監督學習、非監督學習和半監督學習。監督學習可以認為是我們有非常多的labeled標註數據來train一個模型,期待這個模型能學習到數據的分布,以期對未來沒有見到的樣本做預測。那這個性能的源頭–訓練數據,就顯得非常感覺。你必須有足夠的訓練數據,以覆蓋真正現實數據中的樣本分布才可以,這樣學習到的模型才有意義。那非監督學習就是沒有任何的labeled數據,就是平時所說的聚類了,利用他們本身的數據分布,給他們劃分類別。而半監督學習,顧名思義就是處於兩者之間的,只有少量的labeled數據,我們試圖從這少量的labeled數據和大量的unlabeled數據中學習到有用的信息。
一、半監督學習
半監督學習(Semi-supervised learning)發揮作用的場合是:你的數據有一些有label,一些沒有。而且一般是絕大部分都沒有,只有少許幾個有label。半監督學習演算法會充分的利用unlabeled數據來捕捉我們整個數據的潛在分布。它基於三大假設:
Smoothness平滑假設:相似的數據具有相同的label。
Cluster聚類假設:處於同一個聚類下的數據具有相同label。
Manifold流形假設:處於同一流形結構下的數據具有相同label。
例如下圖,只有兩個labeled數據,如果直接用他們來訓練一個分類器,例如LR或者SVM,那麼學出來的分類面就是左圖那樣的。如果現實中,這個數據是右圖那邊分布的話,豬都看得出來,左圖訓練的這個分類器爛的一塌糊塗、慘不忍睹。因為我們的labeled訓練數據太少了,都沒辦法覆蓋我們未來可能遇到的情況。但是,如果右圖那樣,把大量的unlabeled數據(黑色的)都考慮進來,有個全局觀念,牛逼的演算法會發現,哎喲,原來是兩個圈圈(分別處於兩個圓形的流形之上)!那演算法就很聰明,把大圈的數據都歸類為紅色類別,把內圈的數據都歸類為藍色類別。因為,實踐中,labeled數據是昂貴,很難獲得的,但unlabeled數據就不是了,寫個腳本在網上爬就可以了,因此如果能充分利用大量的unlabeled數據來輔助提升我們的模型學習,這個價值就非常大。
半監督學習演算法有很多,下面我們介紹最簡單的標籤傳播演算法(label propagation),最喜歡簡單了,哈哈。
二、標籤傳播演算法
標籤傳播演算法(label propagation)的核心思想非常簡單:相似的數據應該具有相同的label。LP演算法包括兩大步驟:1)構造相似矩陣;2)勇敢的傳播吧。
2.1、相似矩陣構建
LP演算法是基於Graph的,因此我們需要先構建一個圖。我們為所有的數據構建一個圖,圖的節點就是一個數據點,包含labeled和unlabeled的數據。節點i和節點j的邊表示他們的相似度。這個圖的構建方法有很多,這裡我們假設這個圖是全連接的,節點i和節點j的邊權重為:
這裡,α是超參。
還有個非常常用的圖構建方法是knn圖,也就是只保留每個節點的k近鄰權重,其他的為0,也就是不存在邊,因此是稀疏的相似矩陣。
2.2、LP演算法
標籤傳播演算法非常簡單:通過節點之間的邊傳播label。邊的權重越大,表示兩個節點越相似,那麼label越容易傳播過去。我們定義一個NxN的概率轉移矩陣P:
Pij表示從節點i轉移到節點j的概率。假設有C個類和L個labeled樣本,我們定義一個LxC的label矩陣YL,第i行表示第i個樣本的標籤指示向量,即如果第i個樣本的類別是j,那麼該行的第j個元素為1,其他為0。同樣,我們也給U個unlabeled樣本一個UxC的label矩陣YU。把他們合併,我們得到一個NxC的soft label矩陣F=[YL;YU]。soft label的意思是,我們保留樣本i屬於每個類別的概率,而不是互斥性的,這個樣本以概率1隻屬於一個類。當然了,最後確定這個樣本i的類別的時候,是取max也就是概率最大的那個類作為它的類別的。那F裡面有個YU,它一開始是不知道的,那最開始的值是多少?無所謂,隨便設置一個值就可以了。
千呼萬喚始出來,簡單的LP演算法如下:
執行傳播:F=PF
重置F中labeled樣本的標籤:FL=YL
重複步驟1)和2)直到F收斂。
步驟1)就是將矩陣P和矩陣F相乘,這一步,每個節點都將自己的label以P確定的概率傳播給其他節點。如果兩個節點越相似(在歐式空間中距離越近),那麼對方的label就越容易被自己的label賦予,就是更容易拉幫結派。步驟2)非常關鍵,因為labeled數據的label是事先確定的,它不能被帶跑,所以每次傳播完,它都得回歸它本來的label。隨著labeled數據不斷的將自己的label傳播出去,最後的類邊界會穿越高密度區域,而停留在低密度的間隔中。相當於每個不同類別的labeled樣本劃分了勢力範圍。
2.3、變身的LP演算法
我們知道,我們每次迭代都是計算一個soft label矩陣F=[YL;YU],但是YL是已知的,計算它沒有什麼用,在步驟2)的時候,還得把它弄回來。我們關心的只是YU,那我們能不能只計算YU呢?Yes。我們將矩陣P做以下劃分:
這時候,我們的演算法就一個運算:
迭代上面這個步驟直到收斂就ok了,是不是很cool。可以看到FU不但取決於labeled數據的標籤及其轉移概率,還取決了unlabeled數據的當前label和轉移概率。因此LP演算法能額外運用unlabeled數據的分布特點。
這個演算法的收斂性也非常容易證明,具體見參考文獻[1]。實際上,它是可以收斂到一個凸解的:
所以我們也可以直接這樣求解,以獲得最終的YU。但是在實際的應用過程中,由於矩陣求逆需要O(n3)的複雜度,所以如果unlabeled數據非常多,那麼I – PUU矩陣的求逆將會非常耗時,因此這時候一般選擇迭代演算法來實現。
三、LP演算法的Python實現
Python環境的搭建就不啰嗦了,可以參考前面的博客。需要額外依賴的庫是經典的numpy和matplotlib。代碼中包含了兩種圖的構建方法:RBF和KNN指定。同時,自己生成了兩個toy資料庫:兩條長形形狀和兩個圈圈的數據。第四部分我們用大點的資料庫來做實驗,先簡單的可視化驗證代碼的正確性,再前線。
演算法代碼:
import time
import numpy
as
np
# return k neighbors index
def navie_knn
(
dataSet
,
query
,
k
)
:
numSamples
=
dataSet
.
shape
[
0
]
## step 1: calculate Euclidean distance
diff
=
np
.
tile
(
query
,
(
numSamples
,
1
))
-
dataSet
squaredDiff
=
diff *
*
2
squaredDist
=
np
.
sum
(
squaredDiff
,
axis
=
1
)
# sum is performed by row
## step 2: sort the distance
sortedDistIndices
=
np
.
argsort
(
squaredDist
)
if
k
>
len
(
sortedDistIndices
)
:
k
=
len
(
sortedDistIndices
)
return
sortedDistIndices
[
0
:
k
]
# build a big graph (normalized weight matrix)
def buildGraph
(
MatX
,
kernel_type
,
rbf_sigma
=
None
,
knn_num_neighbors
=
None
)
:
num_samples
=
MatX
.
shape
[
0
]
affinity_matrix
=
np
.
zeros
((
num_samples
,
num_samples
),
np
.
float32
)
if
kernel_type
==
"rbf"
:
if
rbf_sigma
==
None
:
raise ValueError
(
"You should input a sigma of rbf kernel!"
)
for
i
in
xrange
(
num_samples
)
:
row_sum
=
0.0
for
j
in
xrange
(
num_samples
)
:
diff
=
MatX
[
i
,
:
]
-
MatX
[
j
,
:
]
affinity_matrix
[
i
][
j
]
=
np
.
exp
(
sum
(
diff*
*
2
)
/
(
-
2.0
*
rbf_sigma*
*
2
))
row_sum
+=
affinity_matrix
[
i
][
j
]
affinity_matrix
[
i
][
:
]
/=
row_sum
elif
kernel_type
==
"knn"
:
if
knn_num_neighbors
==
None
:
raise ValueError
(
"You should input a k of knn kernel!"
)
for
i
in
xrange
(
num_samples
)
:
k_neighbors
=
navie_knn
(
MatX
,
MatX
[
i
,
:
],
knn_num_neighbors
)
affinity_matrix
[
i
][
k_neighbors
]
=
1.0
/
knn_num_neighbors
else
:
raise NameError
(
"Not support kernel type! You can use knn or rbf!"
)
return
affinity_matrix
# label propagation
def labelPropagation
(
Mat_Label
,
Mat_Unlabel
,
labels
,
kernel_type
=
"rbf"
,
rbf_sigma
=
1.5
,
knn_num_neighbors
=
10
,
max_iter
=
500
,
tol
=
1e
-
3
)
:
# initialize
num_label_samples
=
Mat_Label
.
shape
[
0
]
num_unlabel_samples
=
Mat_Unlabel
.
shape
[
0
]
num_samples
=
num_label_samples
+
num_unlabel_samples
labels_list
=
np
.
unique
(
labels
)
num_classes
=
len
(
labels_list
)
MatX
=
np
.
vstack
((
Mat_Label
,
Mat_Unlabel
))
clamp_data_label
=
np
.
zeros
((
num_label_samples
,
num_classes
),
np
.
float32
)
for
i
in
xrange
(
num_label_samples
)
:
clamp_data_label
[
i
][
labels
[
i
]]
=
1.0
label_function
=
np
.
zeros
((
num_samples
,
num_classes
),
np
.
float32
)
label_function
[
0
:
num_label_samples
]
=
clamp_data_label
label_function
[
num_label_samples
:
num_samples
]
= -
1
# graph construction
affinity_matrix
=
buildGraph
(
MatX
,
kernel_type
,
rbf_sigma
,
knn_num_neighbors
)
# start to propagation
iter
=
0
;
pre_label_function
=
np
.
zeros
((
num_samples
,
num_classes
),
np
.
float32
)
changed
=
np
.
abs
(
pre_label_function
-
label_function
).
sum
()
while
iter
<
max_iter
and
changed
>
tol
:
if
iter
%
1
==
0
:
"---> Iteration %d/%d, changed: %f"
%
(
iter
,
max_iter
,
changed
)
pre_label_function
=
label_function
iter
+=
1
# propagation
label_function
=
np
.
dot
(
affinity_matrix
,
label_function
)
# clamp
label_function
[
0
:
num_label_samples
]
=
clamp_data_label
# check converge
changed
=
np
.
abs
(
pre_label_function
-
label_function
).
sum
()
# get terminate label of unlabeled data
unlabel_data_labels
=
np
.
zeros
(
num_unlabel_samples
)
for
i
in
xrange
(
num_unlabel_samples
)
:
unlabel_data_labels
[
i
]
=
np
.
argmax
(
label_function
[
i
+
num_label_samples
])
return
unlabel_data_labels
測試代碼:
import time
import math
import numpy
as
np
from label_propagation import
labelPropagation
# show
def show
(
Mat_Label
,
labels
,
Mat_Unlabel
,
unlabel_data_labels
)
:
import
matplotlib
.
pyplot
as
plt
for
i
in
range
(
Mat_Label
.
shape
[
0
])
:
if
int
(
labels
[
i
])
==
0
:
plt
.
plot
(
Mat_Label
[
i
,
0
],
Mat_Label
[
i
,
1
],
"Dr"
)
elif
int
(
labels
[
i
])
==
1
:
plt
.
plot
(
Mat_Label
[
i
,
0
],
Mat_Label
[
i
,
1
],
"Db"
)
else
:
plt
.
plot
(
Mat_Label
[
i
,
0
],
Mat_Label
[
i
,
1
],
"Dy"
)
for
i
in
range
(
Mat_Unlabel
.
shape
[
0
])
:
if
int
(
unlabel_data_labels
[
i
])
==
0
:
plt
.
plot
(
Mat_Unlabel
[
i
,
0
],
Mat_Unlabel
[
i
,
1
],
"or"
)
elif
int
(
unlabel_data_labels
[
i
])
==
1
:
plt
.
plot
(
Mat_Unlabel
[
i
,
0
],
Mat_Unlabel
[
i
,
1
],
"ob"
)
else
:
plt
.
plot
(
Mat_Unlabel
[
i
,
0
],
Mat_Unlabel
[
i
,
1
],
"oy"
)
plt
.
xlabel
(
"X1"
);
plt
.
ylabel
(
"X2"
)
plt
.
xlim
(
0.0
,
12.
)
plt
.
ylim
(
0.0
,
12.
)
plt
.
show
()
def loadCircleData
(
num_data
)
:
center
=
np
.
array
([
5.0
,
5.0
])
radiu_inner
=
2
radiu_outer
=
4
num_inner
=
num_data
/
3
num_outer
=
num_data
-
num_inner
data
=
[]
theta
=
0.0
for
i
in
range
(
num_inner
)
:
pho
=
(
theta
%
360
)
*
math
.
pi
/
180
tmp
=
np
.
zeros
(
2
,
np
.
float32
)
tmp
[
0
]
=
radiu_inner *
math
.
cos
(
pho
)
+
np
.
random
.
rand
(
1
)
+
center
[
0
]
tmp
[
1
]
=
radiu_inner *
math
.
sin
(
pho
)
+
np
.
random
.
rand
(
1
)
+
center
[
1
]
data
.
append
(
tmp
)
theta
+=
2
theta
=
0.0
for
i
in
range
(
num_outer
)
:
pho
=
(
theta
%
360
)
*
math
.
pi
/
180
tmp
=
np
.
zeros
(
2
,
np
.
float32
)
tmp
[
0
]
=
radiu_outer *
math
.
cos
(
pho
)
+
np
.
random
.
rand
(
1
)
+
center
[
0
]
tmp
[
1
]
=
radiu_outer *
math
.
sin
(
pho
)
+
np
.
random
.
rand
(
1
)
+
center
[
1
]
data
.
append
(
tmp
)
theta
+=
1
Mat_Label
=
np
.
zeros
((
2
,
2
),
np
.
float32
)
Mat_Label
[
0
]
=
center
+
np
.
array
([
-
radiu_inner
+
0.5
,
0
])
Mat_Label
[
1
]
=
center
+
np
.
array
([
-
radiu_outer
+
0.5
,
0
])
labels
=
[
0
,
1
]
Mat_Unlabel
=
np
.
vstack
(
data
)
return
Mat_Label
,
labels
,
Mat_Unlabel
def loadBandData
(
num_unlabel_samples
)
:
#Mat_Label = np.array([[5.0, 2.], [5.0, 8.0]])
#labels = [0, 1]
#Mat_Unlabel = np.array([[5.1, 2.], [5.0, 8.1]])
Mat_Label
=
np
.
array
([[
5.0
,
2.
],
[
5.0
,
8.0
]])
labels
=
[
0
,
1
]
num_dim
=
Mat_Label
.
shape
[
1
]
Mat_Unlabel
=
np
.
zeros
((
num_unlabel_samples
,
num_dim
),
np
.
float32
)
Mat_Unlabel
[
:
num_unlabel_samples
/
2
,
:
]
=
(
np
.
random
.
rand
(
num_unlabel_samples
/
2
,
num_dim
)
-
0.5
)
*
np
.
array
([
3
,
1
])
+
Mat_Label
[
0
]
Mat_Unlabel
[
num_unlabel_samples
/
2
:
num_unlabel_samples
,
:
]
=
(
np
.
random
.
rand
(
num_unlabel_samples
/
2
,
num_dim
)
-
0.5
)
*
np
.
array
([
3
,
1
])
+
Mat_Label
[
1
]
return
Mat_Label
,
labels
,
Mat_Unlabel
# main function
if
__name__
==
"__main__"
:
num_unlabel_samples
=
800
#Mat_Label, labels, Mat_Unlabel = loadBandData(num_unlabel_samples)
Mat_Label
,
labels
,
Mat_Unlabel
=
loadCircleData
(
num_unlabel_samples
)
## Notice: when use "rbf" as our kernel, the choice of hyper parameter "sigma" is very import! It should be
## chose according to your dataset, specific the distance of two data points. I think it should ensure that
## each point has about 10 knn or w_i,j is large enough. It also influence the speed of converge. So, may be
## "knn" kernel is better!
#unlabel_data_labels = labelPropagation(Mat_Label, Mat_Unlabel, labels, kernel_type = "rbf", rbf_sigma = 0.2)
unlabel_data_labels
=
labelPropagation
(
Mat_Label
,
Mat_Unlabel
,
labels
,
kernel_type
=
"knn"
,
knn_num_neighbors
=
10
,
max_iter
=
400
)
show
(
Mat_Label
,
labels
,
Mat_Unlabel
,
unlabel_data_labels
)
該注釋的,代碼都注釋的,有看不明白的,歡迎交流。不同迭代次數時候的結果如下:
是不是很漂亮的傳播過程?!在數值上也是可以看到隨著迭代的進行逐漸收斂的,迭代的數值變化過程如下:
四、LP演算法MPI並行實現
這裡,我們測試的是LP的變身版本。從公式,我們可以看到,第二項PULYL迭代過程並沒有發生變化,所以這部分實際上從迭代開始就可以計算好,從而避免重複計算。不過,不管怎樣,LP演算法都要計算一個UxU的矩陣PUU和一個UxC矩陣FU的乘積。當我們的unlabeled數據非常多,而且類別也很多的時候,計算是很慢的,同時佔用的內存量也非常大。另外,構造Graph需要計算兩兩的相似度,也是O(n2)的複雜度,當我們數據的特徵維度很大的時候,這個計算量也是非常客觀的。所以我們就得考慮並行處理了。而且最好是能放到集群上並行。那如何並行呢?
對演算法的並行化,一般分為兩種:數據並行和模型並行。
數據並行很好理解,就是將數據劃分,每個節點只處理一部分數據,例如我們構造圖的時候,計算每個數據的k近鄰。例如我們有1000個樣本和20個CPU節點,那麼就平均分發,讓每個CPU節點計算50個樣本的k近鄰,然後最後再合併大家的結果。可見這個加速比也是非常可觀的。
模型並行一般發生在模型很大,無法放到單機的內存裡面的時候。例如龐大的深度神經網路訓練的時候,就需要把這個網路切開,然後分別求解梯度,最後有個leader的節點來收集大家的梯度,再反饋給大家去更新。當然了,其中存在更細緻和高效的工程處理方法。在我們的LP演算法中,也是可以做模型並行的。假如我們的類別數C很大,把類別數切開,讓不同的CPU節點處理,實際上就相當於模型並行了。
那為啥不切大矩陣PUU,而是切小點的矩陣FU,因為大矩陣PUU沒法獨立分塊,並行的一個原則是處理必須是獨立的。 矩陣FU依賴的是所有的U,而把PUU切開分發到其他節點的時候,每次FU的更新都需要和其他的節點通信,這個通信的代價是很大的(實際上,很多並行系統沒法達到線性的加速度的瓶頸是通信!線性加速比是,我增加了n台機器,速度就提升了n倍)。但是對類別C也就是矩陣FU切分,就不會有這個問題,因為他們的計算是獨立的。只是決定樣本的最終類別的時候,將所有的FU收集回來求max就可以了。
所以,在下面的代碼中,是同時包含了數據並行和模型並行的雛形的。另外,還值得一提的是,我們是迭代演算法,那決定什麼時候迭代演算法停止?除了判斷收斂外,我們還可以讓每迭代幾步,就用測試label測試一次結果,看模型的整體訓練性能如何。特別是判斷訓練是否過擬合的時候非常有效。因此,代碼中包含了這部分內容。
好了,代碼終於來了。大家可以搞點大資料庫來測試,如果有MPI集群條件的話就更好了。
下面的代碼依賴numpy、scipy(用其稀疏矩陣加速計算)和mpi4py。其中mpi4py需要依賴openmpi和Cpython,可以參考我之前的博客進行安裝。
由於微信字數限制,此部分的代碼通過點擊閱讀原文查看
五、參考資料
[1]Semi-SupervisedLearning with Graphs.pdf
看完本文有收穫?請轉發分享給更多人
關注「大數據與機器學習文摘」,成為Top 1%
※用 Python 3 的 async / await 做非同步編程
※為什麼 Python 增長如此之快?
※讓你的 Python 代碼優雅又地道
※一個 Reentrant Error 引發的對 Python 信號機制的探索和思考
※初探 Python 3 的非同步 IO 編程
TAG:Python開發者 |