使用 Python 進行分散式系統協調
點擊上方「
Python開發
」,選擇「置頂公眾號」
關鍵時刻,第一時間送達!
筆者之前的博文提到過,隨著大數據時代的到來,分散式是解決大數據問題的一個主要手段,隨著越來越多的分散式的服務,如何在分散式的系統中對這些服務做協調變成了一個很棘手的問題。今天我們就來看看如何使用Python,利用開源對分散式服務做協調。
在對分散式的應用做協調的時候,主要會碰到以下的應用場景:
業務發現(service discovery)找到分散式系統中存在那些可用的服務和節點
名字服務 (name service)通過給定的名字知道到對應的資源
配置管理 (configuration management)如何在分散式的節點中共享配置文件,保證一致性。
故障發現和故障轉移 (failure detection and failover)當某一個節點出故障的時候,如何檢測到並通知其它節點, 或者把想用的服務轉移到其它的可用節點
領導選舉(leader election)如何在眾多的節點中選舉一個領導者,來協調所有的節點
分散式的鎖 (distributed exclusive lock)如何通過鎖在分散式的服務中進行同步
消息和通知服務 (message queue and notification)如何在分散式的服務中傳遞消息,以通知的形式對事件作出主動的響應
有許多的開源軟體試圖解決以上的全部或者部分問題,例如ZooKeeper,consul,doozerd等等,我們現在就看看它們是如何做的。
ZooKeeper
ZooKeeper是使用最廣泛,也是最有名的解決分散式服務的協調問題的開源軟體了,它最早和Hadoop一起開發,後來成為了Apache的頂級項目,很多開源的項目都在使用ZooKeeper,例如大名鼎鼎的Kafka。
Zookeeper本身是一個分散式的應用,通過對共享的數據的管理來實現對分散式應用的協調。
ZooKeeper使用一個樹形目錄作為數據模型,這個目錄和文件目錄類似,目錄上的每一個節點被稱作ZNodes。
ZooKeeper提供基本的API來操縱和控制Znodes,包括對節點的創建,刪除,設置和獲取數據,獲得子節點等。
除了這些基本的操作,ZooKeeper還提供了一些配方(Recipe),其實就是一些常見的用例,例如鎖,兩階段提交,領導選舉等等。
ZooKeeper本身是用Java開發的,所以對Java的支持是最自然的。它同時還提供了C語言的綁定。
Kazoo是一個非常成熟的Zookeeper Python客戶端,我們這就看看如果使用Python來調用ZooKeeper。(注意,運行以下的例子,需要在本地啟動ZooKeeper的服務)
基本操作
以下的例子現實了對Znode的基本操作,首先要創建一個客戶端的連接,並啟動客戶端。然後我們可以利用該客戶端對Znode做增刪改,取內容的操作。最後推出客戶端。
from
kazoo
.
client
import
KazooClient
import
logging
logging
.
basicConfig
()
zk
=
KazooClient
(
hosts
=
"127.0.0.1:2181"
)
zk
.
start
()
# Ensure a path, create if necessary
zk
.
ensure_path
(
"/test/zk1"
)
# Create a node with data
zk
.
create
(
"/test/zk1/node"
,
b
"a test value"
)
# Determine if a node exists
if
zk
.
exists
(
"/test/zk1"
)
:
"the node exist"
# Print the version of a node and its data
data
,
stat
=
zk
.
get
(
"/test/zk1"
)
(
"Version: %s, data: %s"
%
(
stat
.
version
,
data
.
decode
(
"utf-8"
)))
# List the children
children
=
zk
.
get_children
(
"/test/zk1"
)
(
"There are %s children with names %s"
%
(
len
(
children
),
children
))
zk
.
stop
()
通過對ZNode的操作,我們可以完成一些分散式服務協調的基本需求,包括名字服務,配置服務,分組等等。
故障檢測(Failure Detection)
在分散式系統中,一個最基本的需求就是當某一個服務出問題的時候,能夠通知其它的節點或者某個管理節點。
ZooKeeper提供ephemeral Node的概念,當創建該Node的服務退出或者異常中止的時候,該Node會被刪除,所以我們就可以利用這種行為來監控服務運行狀態。
以下是worker的代碼
from
kazoo
.
client
import
KazooClient
import
time
import
logging
logging
.
basicConfig
()
zk
=
KazooClient
(
hosts
=
"127.0.0.1:2181"
)
zk
.
start
()
# Ensure a path, create if necessary
zk
.
ensure_path
(
"/test/failure_detection"
)
# Create a node with data
zk
.
create
(
"/test/failure_detection/worker"
,
value
=
b
"a test value"
,
ephemeral
=
True
)
while
True
:
"I am alive!"
time
.
sleep
(
3
)
zk
.
stop
()
以下的monitor 代碼,監控worker服務是否運行。
from
kazoo
.
client
import
KazooClient
import
time
import
logging
logging
.
basicConfig
()
zk
=
KazooClient
(
hosts
=
"127.0.0.1:2181"
)
zk
.
start
()
# Determine if a node exists
while
True
:
if
zk
.
exists
(
"/test/failure_detection/worker"
)
:
"the worker is alive!"
else
:
"the worker is dead!"
break
time
.
sleep
(
3
)
zk
.
stop
()
領導選舉
Kazoo直接提供了領導選舉的API,使用起來非常方便。
from
kazoo
.
client
import
KazooClient
import
time
import
uuid
import
logging
logging
.
basicConfig
()
my
_
id=
uuid
.
uuid4
()
def
leader_func
()
:
"I am the leader {}"
.
format
(
str
(
my_id
))
while
True
:
"{} is working! "
.
format
(
str
(
my_id
))
time
.
sleep
(
3
)
zk
=
KazooClient
(
hosts
=
"127.0.0.1:2181"
)
zk
.
start
()
election
=
zk
.
Election
(
"/electionpath"
)
# blocks until the election is won, then calls
# leader_func()
election
.
run
(
leader_func
)
zk
.
stop
()
你可以同時運行多個worker,其中一個會獲得Leader,當你殺死當前的leader後,會有一個新的leader被選出。
分散式鎖
鎖的概念大家都熟悉,當我們希望某一件事在同一時間只有一個服務在做,或者某一個資源在同一時間只有一個服務能訪問,這個時候,我們就需要用到鎖。
from
kazoo
.
client
import
KazooClient
import
time
import
uuid
import
logging
logging
.
basicConfig
()
my
_
id=
uuid
.
uuid4
()
def
work
()
:
"{} is working! "
.
format
(
str
(
my_id
))
zk
=
KazooClient
(
hosts
=
"127.0.0.1:2181"
)
zk
.
start
()
lock
=
zk
.
Lock
(
"/lockpath"
,
str
(
my_id
))
"I am {}"
.
format
(
str
(
my_id
))
while
True
:
with
lock
:
work
()
time
.
sleep
(
3
)
zk
.
stop
()
當你運行多個worker的時候,不同的worker會試圖獲取同一個鎖,然而只有一個worker會工作,其它的worker必須等待獲得鎖後才能執行。
監視
ZooKeeper提供了監視(Watch)的功能,當節點的數據被修改的時候,監控的function會被調用。我們可以利用這一點進行配置文件的同步,發消息,或其他需要通知的功能。
from
kazoo
.
client
import
KazooClient
import
time
import
logging
logging
.
basicConfig
()
zk
=
KazooClient
(
hosts
=
"127.0.0.1:2181"
)
zk
.
start
()
@
zk
.
DataWatch
(
"/path/to/watch"
)
def
my_func
(
data
,
stat
)
:
if
data
:
"Data is %s"
%
data
"Version is %s"
%
stat
.
version
else
:
"data is not available"
while
True
:
time
.
sleep
(
10
)
zk
.
stop
()
除了我們上面列舉的內容外,Kazoo還提供了許多其他的功能,例如:計數,租約,隊列等等,大家有興趣可以參考它的文檔
Consul
Consul是用Go開發的分散式服務協調管理的工具,它提供了服務發現,健康檢查,Key/Value存儲等功能,並且支持跨數據中心的功能。
Consul提供ZooKeeper類似的功能,它的基於HTTP的API可以方便的和各種語言進行綁定。自然Python也在列。
與Zookeeper有所差異的是Consul通過基於Client/Server架構的Agent部署來支持跨Data Center的功能。
Consul在Cluster傷的每一個節點都運行一個Agent,這個Agent可以使Server或者Client模式。Client負責到Server的高效通信,相對為無狀態的。 Server負責包括選舉領導節點,維護cluster的狀態,對所有的查詢做響應,跨數據中心的通信等等。
KV基本操作
類似於Zookeeper,Consul支持對KV的增刪查改的操作。
import
consul
c
=
consul
.
Consul
()
# set data for key foo
c
.
kv
.
put
(
"foo"
,
"bar"
)
# poll a key for updates
index
=
None
while
True
:
index
,
data
=
c
.
kv
.
get
(
"foo"
,
index
=
index
)
data
[
"Value"
]
c
.
kv
.
delete
(
"foo"
)
這裡和ZooKeeper對Znode的操作幾乎是一樣的。
服務發現(Service Discovery)和健康檢查(Health Check)
Consul的另一個主要的功能是用於對分散式的服務做管理,用戶可以註冊一個服務,同時還提供對服務做健康檢測的功能。
首先,用戶需要定義一個服務。
{
"service"
:
{
"name"
:
"redis"
,
"tags"
:
[
"master"
],
"address"
:
"127.0.0.1"
,
"port"
:
8000
,
"checks"
:
[
{
"script"
:
"/usr/local/bin/check_redis.py"
,
"interval"
:
"10s"
}
]
}}
其中,服務的名字是必須的,其它的欄位可以自選,包括了服務的地址,埠,相應的健康檢查的腳本。當用戶註冊了一個服務後,就可以通過Consul來查詢該服務,獲得該服務的狀態。
Consul支持三種Check的模式:
調用一個外部腳本(Script),在該模式下,consul定時會調用一個外部腳本,通過腳本的返回內容獲得對應服務的健康狀態。
調用HTTP,在該模式下,consul定時會調用一個HTTP請求,返回2XX,則為健康;429 (Too many request)是警告。其它均為不健康
主動上報,在該模式下,服務需要主動調用一個consul提供的HTTP PUT請求,上報健康狀態。
Python API提供對應的介面,大家可以參考 http://python-consul.readthedocs.org/en/latest/
Consul.Agent.Service
Consul.Agent.Check
Consul的Health Check和Zookeeper的Failure Detection略有不同,ZooKeeper可以利用ephemeral Node來檢測服務的狀態,Consul的Health Check,通過調用腳本,HTTP或者主動上報的方式檢查服務的狀態,更為靈活,可以獲得等多的信息,但是也需要做更多的工作。
故障檢測(Failure Detection)
Consul提供Session的概念,利用Session可以檢查服務是否存活。
對每一個服務我們都可以創建一個session對象,注意這裡我們設置了ttl,consul會以ttl的數值為間隔時間,持續的對session的存活做檢查。對應的在服務中,我們需要持續的renew session,保證session是合法的。
import
consul
import
time
c
=
consul
.
Consul
()
s
=
c
.
session
.
create
(
name
=
"worker"
,
behavior
=
"delete"
,
ttl
=
10
)
"session id is {}"
.
format
(
s
)
while
True
:
c
.
session
.
renew
(
s
)
"I am alive ..."
time
.
sleep
(
3
)
Moniter代碼用於監控worker相關聯的session的狀態,但發現worker session已經不存在了,就做出響應的處理。
import
consul
import
time
def
is_session_exist
(
name
,
sessions
)
:
for
s
in
sessions
:
if
s
[
"Name"
]
==
name
:
return
True
return
False
c
=
consul
.
Consul
()
while
True
:
index
,
sessions
=
c
.
session
.
list
()
if
is_session_exist
(
"worker"
,
sessions
)
:
"worker is alive ..."
else
:
"worker is dead!"
break
time
.
sleep
(
3
)
這裡注意,因為是基於ttl(最小10秒)的檢測,從業務中斷到被檢測到,至少有10秒的時延,對應需要實時響應的情景,並不適用。Zookeeper使用ephemeral Node的方式時延相對短一點,但也非實時。
領導選舉和分散式的鎖
無論是Consul本身還是Python客戶端,都不直接提供Leader Election的功能,但是這篇文檔(http://www.consul.io/docs/guides/leader-election.html)介紹了如何利用Consul的KV存儲來實現Leader Election,利用Consul的KV功能,可以很方便的實現領導選舉和鎖的功能。
當對某一個Key做put操作的時候,可以創建一個session對象,設置一個acquire標誌為該 session,這樣就獲得了一個鎖,獲得所得客戶則是被選舉的leader。
代碼如下:
import
consul
import
time
c
=
consul
.
Consul
()
def
request_lead
(
namespace
,
session_id
)
:
lock
=
c
.
kv
.
put
(
leader_namespace
,
"leader check"
,
acquire
=
session_id
)
return
lock
def
release_lead
(
session_id
)
:
c
.
session
.
destroy
(
session_id
)
def
whois_lead
(
namespace
)
:
index
,
value
=
c
.
kv
.
get
(
namespace
)
session
=
value
.
get
(
"Session"
)
if
session
is
None
:
"No one is leading, maybe in electing"
else
:
index
,
value
=
c
.
session
.
info
(
session
)
"{} is leading"
.
format
(
value
[
"ID"
])
def
work_non_block
()
:
"working"
def
work_block
()
:
while
True
:
"working"
time
.
sleep
(
3
)
leader
_
namespace=
"leader/test"
## initialize leader key/value node
leader_index
,
leader
_
node=
c
.
kv
.
get
(
leader_namespace
)
if
leader
_
nodeis
None
:
c
.
kv
.
put
(
leader_namespace
,
"a leader test"
)
while
True
:
whois_lead
(
leader_namespace
)
session
_
id=
c
.
session
.
create
(
ttl
=
10
)
if
request_lead
(
leader_namespace
,
session_id
)
:
"I am now the leader"
work_block
()
release_lead
(
session_id
)
else
:
"wait leader elected!"
time
.
sleep
(
3
)
利用同樣的機制,可以方便的實現鎖,信號量等分散式的同步操作。
監視
Consul的Agent提供了Watch的功能,然而Python客戶端並沒有相應的介面。
etcd
etcd是另一個用GO開發的分散式協調應用,它提供一個分散式的Key/Value存儲來進行共享的配置管理和服務發現。
同樣的etcd使用基於HTTP的API,可以靈活的進行不同語言的綁定,我們用的是這個客戶端https://github.com/jplana/python-etcd
基本操作
import
etcd
client
=
etcd
.
Client
()
client
.
write
(
"/nodes/n1"
,
1
)
client
.
read
(
"/nodes/n1"
).
value
etcd對節點的操作和ZooKeeper類似,不過etcd不支持ZooKeeper的ephemeral Node的概念,要監控服務的狀態似乎比較麻煩。
分散式鎖
etcd支持分散式鎖,以下是一個例子。
import
sys
sys
.
path
.
append
(
"../../"
)
import
etcd
import
uuid
import
time
my
_
id=
uuid
.
uuid4
()
def
work
()
:
"I get the lock {}"
.
format
(
str
(
my_id
))
client
=
etcd
.
Client
()
lock
=
etcd
.
Lock
(
client
,
"/customerlock"
,
ttl
=
60
)
with
lock
as
my_lock
:
work
()
lock
.
is_locked
()
# True
lock
.
renew
(
60
)
lock
.
is_locked
()
# False
老版本的etcd支持leader election,但是在最新版該功能被deprecated了,參見https://coreos.com/etcd/docs/0.4.7/etcd-modules/
其它
我們針對分散式協調的功能討論了三個不同的開源應用,其實還有許多其它的選擇,我這裡就不一一介紹,大家有興趣可以訪問以下的鏈接:
eureka https://github.com/Netflix/eurekaNetflix開發的定位服務,應用於fail over和load balance的功能
curator http://curator.apache.org/基於ZooKeeper的更高層次的封裝
doozerd https://github.com/ha/doozerd基於GO的高可靠,分散式的數據存儲,過去兩年已經不活躍
openreplica http://openreplica.org/基於Python開發的,面向對象的介面的分散式應用協調的工具
serf http://www.serfdom.io/serf提供輕量級的cluster成員管理,故障檢測(failure detection)和協調。開發基於GO語言。Consul使用了serf提供的功能
noah https://github.com/lusis/Noah基於ruby的ZooKeeper實現,過去三年不活躍
copy cat https://github.com/kuujo/copycat基於日誌的分散式協調的框架,使用Java開發
總結
ZooKeeper無疑是分散式協調應用的最佳選擇,功能全,社區活躍,用戶群體很大,對所有典型的用例都有很好的封裝,支持不同語言的綁定。缺點是,整個應用比較重,依賴於Java,不支持跨數據中心。
Consul作為使用Go語言開發的分散式協調,對業務發現的管理提供很好的支持,他的HTTP API也能很好的和不同的語言綁定,並支持跨數據中心的應用。缺點是相對較新,適合喜歡嘗試新事物的用戶。
etcd是一個更輕量級的分散式協調的應用,提供了基本的功能,更適合一些輕量級的應用來使用。
參考
如果大家對於分散式系統的協調想要進行更多的了解,可以閱讀一下的鏈接:
http://stackoverflow.com/questions/6047917/zookeeper-alternatives-cluster-coordination-service
http://txt.fliglio.com/2014/05/encapsulated-services-with-consul-and-confd/
http://txt.fliglio.com/2013/12/service-discovery-with-docker-docker-links-and-beyond/
http://www.serfdom.io/intro/vs-zookeeper.html
http://devo.ps/blog/zookeeper-vs-doozer-vs-etcd/
https://www.digitalocean.com/community/articles/how-to-set-up-a-serf-cluster-on-several-ubuntu-vps
http://www.slideshare.net/JyrkiPulliainen/taming-pythons-with-zoo-keeper-ep2013?qid=e1267f58-090d-4147-9909-ec673525e76b&v=qf1&b=&from_search=8
http://muratbuffalo.blogspot.com/2014/09/paper-summary-tango-distributed-data.html
https://developer.yahoo.com/blogs/hadoop/apache-zookeeper-making-417.html
http://www.knewton.com/tech/blog/2014/12/eureka-shouldnt-use-zookeeper-service-discovery/
http://codahale.com/you-cant-sacrifice-partition-tolerance/
來源:naughty
my.oschina.net/taogang/blog/410864
Python開發整理髮布,轉載請聯繫作者獲得授
權
【點擊成為Java大神】
![](https://pic.pimg.tw/zzuyanan/1488615166-1259157397.png)
![](https://pic.pimg.tw/zzuyanan/1482887990-2595557020.jpg)
※python 黑魔法 ---上下文管理器(contextor)
※用Python從零開始創建區塊鏈
TAG:Python開發 |