當前位置:
首頁 > 知識 > 使用 Python 進行分散式系統協調

使用 Python 進行分散式系統協調

點擊上方「

Python開發

」,選擇「置頂公眾號」


關鍵時刻,第一時間送達!






筆者之前的博文提到過,隨著大數據時代的到來,分散式是解決大數據問題的一個主要手段,隨著越來越多的分散式的服務,如何在分散式的系統中對這些服務做協調變成了一個很棘手的問題。今天我們就來看看如何使用Python,利用開源對分散式服務做協調。



在對分散式的應用做協調的時候,主要會碰到以下的應用場景:






  • 業務發現(service discovery)找到分散式系統中存在那些可用的服務和節點



  • 名字服務 (name service)通過給定的名字知道到對應的資源



  • 配置管理 (configuration management)如何在分散式的節點中共享配置文件,保證一致性。



  • 故障發現和故障轉移 (failure detection and failover)當某一個節點出故障的時候,如何檢測到並通知其它節點, 或者把想用的服務轉移到其它的可用節點



  • 領導選舉(leader election)如何在眾多的節點中選舉一個領導者,來協調所有的節點



  • 分散式的鎖 (distributed exclusive lock)如何通過鎖在分散式的服務中進行同步



  • 消息和通知服務 (message queue and notification)如何在分散式的服務中傳遞消息,以通知的形式對事件作出主動的響應




有許多的開源軟體試圖解決以上的全部或者部分問題,例如ZooKeeper,consul,doozerd等等,我們現在就看看它們是如何做的。




ZooKeeper




ZooKeeper是使用最廣泛,也是最有名的解決分散式服務的協調問題的開源軟體了,它最早和Hadoop一起開發,後來成為了Apache的頂級項目,很多開源的項目都在使用ZooKeeper,例如大名鼎鼎的Kafka。




Zookeeper本身是一個分散式的應用,通過對共享的數據的管理來實現對分散式應用的協調。



ZooKeeper使用一個樹形目錄作為數據模型,這個目錄和文件目錄類似,目錄上的每一個節點被稱作ZNodes。







ZooKeeper提供基本的API來操縱和控制Znodes,包括對節點的創建,刪除,設置和獲取數據,獲得子節點等。




除了這些基本的操作,ZooKeeper還提供了一些配方(Recipe),其實就是一些常見的用例,例如鎖,兩階段提交,領導選舉等等。




ZooKeeper本身是用Java開發的,所以對Java的支持是最自然的。它同時還提供了C語言的綁定。



Kazoo是一個非常成熟的Zookeeper Python客戶端,我們這就看看如果使用Python來調用ZooKeeper。(注意,運行以下的例子,需要在本地啟動ZooKeeper的服務)




基本操作




以下的例子現實了對Znode的基本操作,首先要創建一個客戶端的連接,並啟動客戶端。然後我們可以利用該客戶端對Znode做增刪改,取內容的操作。最後推出客戶端。





from

 

kazoo

.

client

 

import

 

KazooClient


 


import

 

logging


logging

.

basicConfig

()


 


zk

 

=

 

KazooClient

(

hosts

=

"127.0.0.1:2181"

)


zk

.

start

()


 


# Ensure a path, create if necessary


zk

.

ensure_path

(

"/test/zk1"

)


 


# Create a node with data


zk

.

create

(

"/test/zk1/node"

,

 

b

"a test value"

)


 


# Determine if a node exists


if

 

zk

.

exists

(

"/test/zk1"

)

:


    

print

 

"the node exist"


 


# Print the version of a node and its data


data

,

 

stat

 

=

 

zk

.

get

(

"/test/zk1"

)


print

(

"Version: %s, data: %s"

 

%

 

(

stat

.

version

,

 

data

.

decode

(

"utf-8"

)))


 


# List the children


children

 

=

 

zk

.

get_children

(

"/test/zk1"

)


print

(

"There are %s children with names %s"

 

%

 

(

len

(

children

),

 

children

))


 


zk

.

stop

()




通過對ZNode的操作,我們可以完成一些分散式服務協調的基本需求,包括名字服務,配置服務,分組等等。




故障檢測(Failure Detection)




在分散式系統中,一個最基本的需求就是當某一個服務出問題的時候,能夠通知其它的節點或者某個管理節點。




ZooKeeper提供ephemeral Node的概念,當創建該Node的服務退出或者異常中止的時候,該Node會被刪除,所以我們就可以利用這種行為來監控服務運行狀態。




以下是worker的代碼





from

 

kazoo

.

client

 

import

 

KazooClient


import

 

time


 


import

 

logging


logging

.

basicConfig

()


 


zk

 

=

 

KazooClient

(

hosts

=

"127.0.0.1:2181"

)


zk

.

start

()


 


# Ensure a path, create if necessary


zk

.

ensure_path

(

"/test/failure_detection"

)


 


# Create a node with data


zk

.

create

(

"/test/failure_detection/worker"

,


          

value

=

b

"a test value"

,

 

ephemeral

=

True

)


 


while

 

True

:


    

print

 

"I am alive!"


    

time

.

sleep

(

3

)


 


zk

.

stop

()




以下的monitor 代碼,監控worker服務是否運行。





from

 

kazoo

.

client

 

import

 

KazooClient


 


import

 

time


 


import

 

logging


logging

.

basicConfig

()


 


zk

 

=

 

KazooClient

(

hosts

=

"127.0.0.1:2181"

)


zk

.

start

()


 


# Determine if a node exists


while

 

True

:


    

if

 

zk

.

exists

(

"/test/failure_detection/worker"

)

:


        

print

 

"the worker is alive!"


    

else

:


        

print

 

"the worker is dead!"


        

break


    

time

.

sleep

(

3

)


 


zk

.

stop

()




領導選舉




Kazoo直接提供了領導選舉的API,使用起來非常方便。





from

 

kazoo

.

client

 

import

 

KazooClient


import

 

time


import

 

uuid


 


import

 

logging


logging

.

basicConfig

()


 


my

_

id 

=

 

uuid

.

uuid4

()


 


def

 

leader_func

()

:


    

print

 

"I am the leader {}"

.

format

(

str

(

my_id

))


    

while

 

True

:


        

print

 

"{} is working! "

.

format

(

str

(

my_id

))


        

time

.

sleep

(

3

)


 


zk

 

=

 

KazooClient

(

hosts

=

"127.0.0.1:2181"

)


zk

.

start

()


 


election

 

=

 

zk

.

Election

(

"/electionpath"

)


 


# blocks until the election is won, then calls


# leader_func()


election

.

run

(

leader_func

)


 


zk

.

stop

()




你可以同時運行多個worker,其中一個會獲得Leader,當你殺死當前的leader後,會有一個新的leader被選出。




分散式鎖




鎖的概念大家都熟悉,當我們希望某一件事在同一時間只有一個服務在做,或者某一個資源在同一時間只有一個服務能訪問,這個時候,我們就需要用到鎖。





from

 

kazoo

.

client

 

import

 

KazooClient


import

 

time


import

 

uuid


 


import

 

logging


logging

.

basicConfig

()


 


my

_

id 

=

 

uuid

.

uuid4

()


 


def

 

work

()

:


    

print

 

"{} is working! "

.

format

(

str

(

my_id

))


 


zk

 

=

 

KazooClient

(

hosts

=

"127.0.0.1:2181"

)


zk

.

start

()


 


lock

 

=

 

zk

.

Lock

(

"/lockpath"

,

 

str

(

my_id

))


 


print

 

"I am {}"

.

format

(

str

(

my_id

))


 


while

 

True

:


    

with

 

lock

:


        

work

()


    

time

.

sleep

(

3

)

    


 


zk

.

stop

()




當你運行多個worker的時候,不同的worker會試圖獲取同一個鎖,然而只有一個worker會工作,其它的worker必須等待獲得鎖後才能執行。




監視




ZooKeeper提供了監視(Watch)的功能,當節點的數據被修改的時候,監控的function會被調用。我們可以利用這一點進行配置文件的同步,發消息,或其他需要通知的功能。





from

 

kazoo

.

client

 

import

 

KazooClient


import

 

time


 


import

 

logging


logging

.

basicConfig

()


 


zk

 

=

 

KazooClient

(

hosts

=

"127.0.0.1:2181"

)


zk

.

start

()


 


@

zk

.

DataWatch

(

"/path/to/watch"

)


def

 

my_func

(

data

,

 

stat

)

:


    

if

 

data

:


        

print

 

"Data is %s"

 

%

 

data


        

print

 

"Version is %s"

 

%

 

stat

.

version

 


    

else

 

:


        

print

 

"data is not available"


 


while

 

True

:


    

time

.

sleep

(

10

)


 


zk

.

stop

()




除了我們上面列舉的內容外,Kazoo還提供了許多其他的功能,例如:計數,租約,隊列等等,大家有興趣可以參考它的文檔




Consul




Consul是用Go開發的分散式服務協調管理的工具,它提供了服務發現,健康檢查,Key/Value存儲等功能,並且支持跨數據中心的功能。




Consul提供ZooKeeper類似的功能,它的基於HTTP的API可以方便的和各種語言進行綁定。自然Python也在列。




與Zookeeper有所差異的是Consul通過基於Client/Server架構的Agent部署來支持跨Data Center的功能。







Consul在Cluster傷的每一個節點都運行一個Agent,這個Agent可以使Server或者Client模式。Client負責到Server的高效通信,相對為無狀態的。 Server負責包括選舉領導節點,維護cluster的狀態,對所有的查詢做響應,跨數據中心的通信等等。




KV基本操作




類似於Zookeeper,Consul支持對KV的增刪查改的操作。





import

 

consul


 


c

 

=

 

consul

.

Consul

()


 


# set data for key foo


c

.

kv

.

put

(

"foo"

,

 

"bar"

)


 


# poll a key for updates


index

 

=

 

None


while

 

True

:


    

index

,

 

data

 

=

 

c

.

kv

.

get

(

"foo"

,

 

index

=

index

)


    

print

 

data

[

"Value"

]


    


c

.

kv

.

delete

(

"foo"

)




這裡和ZooKeeper對Znode的操作幾乎是一樣的。




服務發現(Service Discovery)和健康檢查(Health Check)




Consul的另一個主要的功能是用於對分散式的服務做管理,用戶可以註冊一個服務,同時還提供對服務做健康檢測的功能。




首先,用戶需要定義一個服務。





{


  

"service"

:

 

{


    

"name"

:

 

"redis"

,


    

"tags"

:

 

[

"master"

],


    

"address"

:

 

"127.0.0.1"

,


    

"port"

:

 

8000

,


    

"checks"

:

 

[


      

{


        

"script"

:

 

"/usr/local/bin/check_redis.py"

,


        

"interval"

:

 

"10s"


      

}


    

]


  

}}




其中,服務的名字是必須的,其它的欄位可以自選,包括了服務的地址,埠,相應的健康檢查的腳本。當用戶註冊了一個服務後,就可以通過Consul來查詢該服務,獲得該服務的狀態。




Consul支持三種Check的模式:






  • 調用一個外部腳本(Script),在該模式下,consul定時會調用一個外部腳本,通過腳本的返回內容獲得對應服務的健康狀態。



  • 調用HTTP,在該模式下,consul定時會調用一個HTTP請求,返回2XX,則為健康;429 (Too many request)是警告。其它均為不健康



  • 主動上報,在該模式下,服務需要主動調用一個consul提供的HTTP PUT請求,上報健康狀態。




Python API提供對應的介面,大家可以參考 http://python-consul.readthedocs.org/en/latest/






  • Consul.Agent.Service



  • Consul.Agent.Check




Consul的Health Check和Zookeeper的Failure Detection略有不同,ZooKeeper可以利用ephemeral Node來檢測服務的狀態,Consul的Health Check,通過調用腳本,HTTP或者主動上報的方式檢查服務的狀態,更為靈活,可以獲得等多的信息,但是也需要做更多的工作。




故障檢測(Failure Detection)




Consul提供Session的概念,利用Session可以檢查服務是否存活。




對每一個服務我們都可以創建一個session對象,注意這裡我們設置了ttl,consul會以ttl的數值為間隔時間,持續的對session的存活做檢查。對應的在服務中,我們需要持續的renew session,保證session是合法的。





import

 

consul


import

 

time


 


c

 

=

 

consul

.

Consul

()


 


s

 

=

 

c

.

session

.

create

(

name

=

"worker"

,

behavior

=

"delete"

,

ttl

=

10

)


 


print

 

"session id is {}"

.

format

(

s

)


 


while

 

True

:


    

c

.

session

.

renew

(

s

)


    

print

 

"I am alive ..."


    

time

.

sleep

(

3

)




Moniter代碼用於監控worker相關聯的session的狀態,但發現worker session已經不存在了,就做出響應的處理。





import

 

consul


import

 

time


 


def

 

is_session_exist

(

name

,

 

sessions

)

:


    

for

 

s

 

in

 

sessions

:


        

if

 

s

[

"Name"

]

 

==

 

name

:


            

return

 

True


 


    

return

 

False


 


c

 

=

 

consul

.

Consul

()


 


while

 

True

:


    

index

,

 

sessions

 

=

 

c

.

session

.

list

()


    

if

 

is_session_exist

(

"worker"

,

 

sessions

)

:


        

print

 

"worker is alive ..."


    

else

:


        

print

 

"worker is dead!"


        

break


    

time

.

sleep

(

3

)




這裡注意,因為是基於ttl(最小10秒)的檢測,從業務中斷到被檢測到,至少有10秒的時延,對應需要實時響應的情景,並不適用。Zookeeper使用ephemeral Node的方式時延相對短一點,但也非實時。




領導選舉和分散式的鎖




無論是Consul本身還是Python客戶端,都不直接提供Leader Election的功能,但是這篇文檔(http://www.consul.io/docs/guides/leader-election.html)介紹了如何利用Consul的KV存儲來實現Leader Election,利用Consul的KV功能,可以很方便的實現領導選舉和鎖的功能。




當對某一個Key做put操作的時候,可以創建一個session對象,設置一個acquire標誌為該 session,這樣就獲得了一個鎖,獲得所得客戶則是被選舉的leader。




代碼如下:





import

 

consul


import

 

time


 


c

 

=

 

consul

.

Consul

()


 


def

 

request_lead

(

namespace

,

 

session_id

)

:


    

lock

 

=

 

c

.

kv

.

put

(

leader_namespace

,

"leader check"

,

 

acquire

=

session_id

)


    

return

 

lock


 


def

 

release_lead

(

session_id

)

:


    

c

.

session

.

destroy

(

session_id

)


 


def

 

whois_lead

(

namespace

)

:


    

index

,

value

 

=

 

c

.

kv

.

get

(

namespace

)


    

session

 

=

 

value

.

get

(

"Session"

)


    

if

 

session

 

is

 

None

:


        

print

 

"No one is leading, maybe in electing"


    

else

:


        

index

,

 

value

 

=

 

c

.

session

.

info

(

session

)


        

print

 

"{} is leading"

.

format

(

value

[

"ID"

])


 


def

 

work_non_block

()

:


    

print

 

"working"


 


def

 

work_block

()

:


    

while

 

True

:


        

print

 

"working"


        

time

.

sleep

(

3

)


 


leader

_

namespace 

=

 

"leader/test"


 


## initialize leader key/value node


leader_index

,

 

leader

_

node 

=

 

c

.

kv

.

get

(

leader_namespace

)


 


if

 

leader

_

node 

is

 

None

:


    

c

.

kv

.

put

(

leader_namespace

,

"a leader test"

)


 


while

 

True

:


    

whois_lead

(

leader_namespace

)


    

session

_

id 

=

 

c

.

session

.

create

(

ttl

=

10

)


    

if

 

request_lead

(

leader_namespace

,

session_id

)

:


        

print

 

"I am now the leader"


        

work_block

()


        

release_lead

(

session_id

)


    

else

:


        

print

 

"wait leader elected!"


    

time

.

sleep

(

3

)




利用同樣的機制,可以方便的實現鎖,信號量等分散式的同步操作。




監視




Consul的Agent提供了Watch的功能,然而Python客戶端並沒有相應的介面。




etcd




etcd是另一個用GO開發的分散式協調應用,它提供一個分散式的Key/Value存儲來進行共享的配置管理和服務發現。




同樣的etcd使用基於HTTP的API,可以靈活的進行不同語言的綁定,我們用的是這個客戶端https://github.com/jplana/python-etcd




基本操作





import

 

etcd


 


client

 

=

 

etcd

.

Client

()

 


client

.

write

(

"/nodes/n1"

,

 

1

)


print

 

client

.

read

(

"/nodes/n1"

).

value




etcd對節點的操作和ZooKeeper類似,不過etcd不支持ZooKeeper的ephemeral Node的概念,要監控服務的狀態似乎比較麻煩。




分散式鎖




etcd支持分散式鎖,以下是一個例子。





import

 

sys


 


sys

.

path

.

append

(

"../../"

)


 


import

 

etcd


 


import

 

uuid


import

 

time


 


my

_

id 

=

 

uuid

.

uuid4

()


 


def

 

work

()

:


    

print

 

"I get the lock {}"

.

format

(

str

(

my_id

))


 


client

 

=

 

etcd

.

Client

()

 


 


lock

 

=

 

etcd

.

Lock

(

client

,

 

"/customerlock"

,

 

ttl

=

60

)


 


with

 

lock

 

as

 

my_lock

:


    

work

()


    

lock

.

is_locked

()

  

# True


    

lock

.

renew

(

60

)


lock

.

is_locked

()

  

# False




老版本的etcd支持leader election,但是在最新版該功能被deprecated了,參見https://coreos.com/etcd/docs/0.4.7/etcd-modules/




其它




我們針對分散式協調的功能討論了三個不同的開源應用,其實還有許多其它的選擇,我這裡就不一一介紹,大家有興趣可以訪問以下的鏈接:






  • eureka https://github.com/Netflix/eurekaNetflix開發的定位服務,應用於fail over和load balance的功能



  • curator http://curator.apache.org/基於ZooKeeper的更高層次的封裝



  • doozerd https://github.com/ha/doozerd基於GO的高可靠,分散式的數據存儲,過去兩年已經不活躍



  • openreplica http://openreplica.org/基於Python開發的,面向對象的介面的分散式應用協調的工具



  • serf http://www.serfdom.io/serf提供輕量級的cluster成員管理,故障檢測(failure detection)和協調。開發基於GO語言。Consul使用了serf提供的功能



  • noah https://github.com/lusis/Noah基於ruby的ZooKeeper實現,過去三年不活躍



  • copy cat https://github.com/kuujo/copycat基於日誌的分散式協調的框架,使用Java開發




總結




ZooKeeper無疑是分散式協調應用的最佳選擇,功能全,社區活躍,用戶群體很大,對所有典型的用例都有很好的封裝,支持不同語言的綁定。缺點是,整個應用比較重,依賴於Java,不支持跨數據中心。




Consul作為使用Go語言開發的分散式協調,對業務發現的管理提供很好的支持,他的HTTP API也能很好的和不同的語言綁定,並支持跨數據中心的應用。缺點是相對較新,適合喜歡嘗試新事物的用戶。




etcd是一個更輕量級的分散式協調的應用,提供了基本的功能,更適合一些輕量級的應用來使用。




參考




如果大家對於分散式系統的協調想要進行更多的了解,可以閱讀一下的鏈接:






  • http://stackoverflow.com/questions/6047917/zookeeper-alternatives-cluster-coordination-service



  • http://txt.fliglio.com/2014/05/encapsulated-services-with-consul-and-confd/



  • http://txt.fliglio.com/2013/12/service-discovery-with-docker-docker-links-and-beyond/



  • http://www.serfdom.io/intro/vs-zookeeper.html



  • http://devo.ps/blog/zookeeper-vs-doozer-vs-etcd/



  • https://www.digitalocean.com/community/articles/how-to-set-up-a-serf-cluster-on-several-ubuntu-vps



  • http://www.slideshare.net/JyrkiPulliainen/taming-pythons-with-zoo-keeper-ep2013?qid=e1267f58-090d-4147-9909-ec673525e76b&v=qf1&b=&from_search=8



  • http://muratbuffalo.blogspot.com/2014/09/paper-summary-tango-distributed-data.html



  • https://developer.yahoo.com/blogs/hadoop/apache-zookeeper-making-417.html



  • http://www.knewton.com/tech/blog/2014/12/eureka-shouldnt-use-zookeeper-service-discovery/



  • http://codahale.com/you-cant-sacrifice-partition-tolerance/






  • 來源:naughty




  • my.oschina.net/taogang/blog/410864



  • Python開發整理髮布,轉載請聯繫作者獲得授


【點擊成為Java大神】

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 Python開發 的精彩文章:

python 黑魔法 ---上下文管理器(contextor)
用Python從零開始創建區塊鏈

TAG:Python開發 |