當前位置:
首頁 > 知識 > 分散式隊列神器 Celery

分散式隊列神器 Celery

點擊上方「

Python開發

」,選擇「置頂公眾號」


關鍵時刻,第一時間送達!






Celery 是什麼?




Celery 是一個由 Python 編寫的簡單、靈活、可靠的用來處理大量信息的分散式系統,它同時提供操作和維護分散式系統所需的工具。




Celery 專註於實時任務處理,支持任務調度。




說白了,它是一個分散式隊列的管理工具,我們可以用 Celery 提供的介面快速實現並管理一個分散式的任務隊列。




1.快速入門



(本文以 Celery4.0 為基礎進行書寫)




首先,我們要理解 Celery 本身不是任務隊列,它是管理分散式任務隊列的工具,或者換一種說法,它封裝好了操作常見任務隊列的各種操作,我們用它可以快速進行任務隊列的使用與管理,當然你也可以自己看 rabbitmq 等隊列的文檔然後自己實現相關操作都是沒有問題的。




Celery 是語言無關的,雖然它是用 Python 實現的,但他提供了其他常見語言的介面支持。只是如果你恰好使用 Python 進行開發那麼使用 Celery 就自然而然了。




想讓 Celery 運行起來我們要明白幾個概念:




1.1 Brokers




brokers 中文意思為中間人,在這裡就是指任務隊列本身,Celery 扮演生產者和消費者的角色,brokers 就是生產者和消費者存放/拿取產品的地方(隊列)




常見的 brokers 有 rabbitmq、redis、Zookeeper 等




1.2 Result Stores / backend




顧名思義就是結果儲存的地方,隊列中的任務運行完後的結果或者狀態需要被任務發送者知道,那麼就需要一個地方儲存這些結果,就是 Result Stores 了



常見的 backend 有 redis、Memcached 甚至常用的數據都可以。




1.3 Workers




就是 Celery 中的工作者,類似與生產/消費模型中的消費者,其從隊列中取出任務並執行




1.4 Tasks



就是我們想在隊列中進行的任務咯,一般由用戶、觸發器或其他操作將任務入隊,然後交由 workers 進行處理。




理解以上概念後我們就可以快速實現一個隊列的操作:




這裡我們用 redis 當做 celery 的 broker 和 backend。




(其他 brokers 與 backend 支持看這裡(http://docs.celeryproject.org/en/latest/getting-started/brokers/index.html))




安裝 Celery 和 redis 以及 python 的 redis 支持:




apt

-

get install

redis

-

server


pip install redis


pip install

celery




這裡需要注意如果你的 celery 是 4.0 及以上版本請確保 python 的 redis 庫版本在 2.10.4 及以上,否則會出現 redis 連接 timeout 的錯誤,具體參考(https://github.com/celery/celery/issues/3580)




然後,我們需要寫一個task:




#tasks.py


from celery import Celery


 


app

=

Celery

(

"tasks"

,

  

backend

=

"redis://localhost:6379/0"

,

broker

=

"redis://localhost:6379/0"

)

#配置好celery的backend和broker


 


@

app

.

task

  

#普通函數裝飾為 celery task


def add

(

x

,

y

)

:


    

return

x

+

y




OK,到這裡,broker 我們有了,backend 我們有了,task 我們也有了,現在就該運行 worker 進行工作了,在 tasks.py 所在目錄下運行:





celery -A tasks worker --loglevel=info




意思就是運行 tasks 這個任務集合的 worker 進行工作(當然此時broker中還沒有任務,worker此時相當於待命狀態)




最後一步,就是觸發任務啦,最簡單方式就是再寫一個腳本然後調用那個被裝飾成 task 的函數:





#trigger.py


from tasks import add


result

=

add

.

delay

(

4

,

4

)

#不要直接 add(4, 4),這裡需要用 celery 提供的介面 delay 進行調用


while

not

result

.

ready

()

:


    

time

.

sleep

(

1

)


print

"task done: {0}"

.

format

(

result

.

get

())




運行此腳本




delay 返回的是一個 AsyncResult 對象,裡面存的就是一個非同步的結果,當任務完成時result.ready() 為 true,然後用 result.get() 取結果即可。




到此,一個簡單的 celery 應用就完成啦。




2. 進階用法




經過快速入門的學習後,我們已經能夠使用 Celery 管理普通任務,但對於實際使用場景來說這是遠遠不夠的,所以我們需要更深入的去了解 Celery 更多的使用方式。




首先來看之前的task:





@

app

.

task

  

#普通函數裝飾為 celery task


def add

(

x

,

y

)

:


    

return

x

+

y




這裡的裝飾器app.task實際上是將一個正常的函數修飾成了一個 celery task 對象,所以這裡我們可以給修飾器加上參數來決定修飾後的 task 對象的一些屬性。




首先,我們可以讓被修飾的函數成為 task 對象的綁定方法,這樣就相當於被修飾的函數 add 成了 task 的實例方法,可以調用 self 獲取當前 task 實例的很多狀態及屬性。




其次,我們也可以自己複寫 task 類然後讓這個自定義 task 修飾函數 add ,來做一些自定義操作。




2.1 根據任務狀態執行不同操作




任務執行後,根據任務狀態執行不同操作需要我們複寫 task 的 on_failure、on_success 等方法:





# tasks.py


class

MyTask

(

Task

)

:


    

def on_success

(

self

,

retval

,

task_id

,

args

,

kwargs

)

:


        

print

"task done: {0}"

.

format

(

retval

)


        

return

super

(

MyTask

,

self

).

on_success

(

retval

,

task_id

,

args

,

kwargs

)


    


    

def on_failure

(

self

,

exc

,

task_id

,

args

,

kwargs

,

einfo

)

:


        

print

"task fail, reason: {0}"

.

format

(

exc

)


        

return

super

(

MyTask

,

self

).

on_failure

(

exc

,

task_id

,

args

,

kwargs

,

einfo

)


 


@

app

.

task

(

base

=

MyTask

)


def add

(

x

,

y

)

:


    

return

x

+

y




嗯, 然後繼續運行 worker:





celery -A tasks worker --loglevel=info




運行腳本,得到:







再修改下tasks:





@

app

.

task

  

#普通函數裝飾為 celery task


def add

(

x

,

y

)

:


    

raise KeyError


    

return

x

+

y




重新運行 worker,再運行 trigger.py:







可以看到,任務執行成功或失敗後分別執行了我們自定義的 on_failure、on_success




2.2 綁定任務為實例方法





# tasks.py


from

celery

.

utils

.

log import get_task_logger


 


logger

=

get_task_logger

(

__name__

)


@

app

.

task

(

bind

=

True

)


def add

(

self

,

x

,

y

)

:


    

logger

.

info

(

self

.

request

.

__dict__

)


    

return

x

+

y




然後重新運行:







執行中的任務獲取到了自己執行任務的各種信息,可以根據這些信息做很多其他操作,例如判斷鏈式任務是否到結尾等等。




關於 celery.task.request 對象的詳細數據可以看這裡(http://docs.celeryproject.org/en/latest/userguide/tasks.html#task-request-info)




2.3 任務狀態回調




實際場景中得知任務狀態是很常見的需求,對於 Celery 其內建任務狀態有如下幾種:







當我們有個耗時時間較長的任務進行時一般我們想得知它的實時進度,這裡就需要我們自定義一個任務狀態用來說明進度並手動更新狀態,從而告訴回調當前任務的進度,具體實現:





# tasks.py


from celery import Celery


import

time


 


@

app

.

task

(

bind

=

True

)


def test_mes

(

self

)

:


    

for

i

in

xrange

(

1

,

11

)

:


        

time

.

sleep

(

0.1

)


        

self

.

update_state

(

state

=

"PROGRESS"

,

meta

=

{

"p"

:

i*

10

})


    

return

"finish"




然後在 trigger.py 中增加:





# trigger.py


from task import

add

,

test_mes


import sys


 


def pm

(

body

)

:


    

res

=

body

.

get

(

"result"

)


    

if

body

.

get

(

"status"

)

==

"PROGRESS"

:


        

sys

.

stdout

.

write

(

" 任務進度: {0}%"

.

format

(

res

.

get

(

"p"

)))


        

sys

.

stdout

.

flush

()


    

else

:


        

print

" "


        

print

res


r

=

test_mes

.

delay

()


print

r

.

get

(

on_message

=

pm

,

propagate

=

False

)




然後運行任務:







2.4 定時/周期任務




Celery 進行周期任務也很簡單,只需要在配置中配置好周期任務,然後在運行一個周期任務觸發器( beat )即可:




新建 Celery 配置文件 celery_config.py:





# celery_config.py


from datetime import timedelta


from

celery

.

schedules import crontab


 


CELERYBEAT_SCHEDULE

=

{


    

"ptask"

:

{


        

"task"

:

"tasks.period_task"

,


        

"schedule"

:

timedelta

(

seconds

=

5

),


    

},


}


 


CELERY_RESULT_BACKEND

=

"redis://localhost:6379/0"




配置中 schedule 就是間隔執行的時間,這裡可以用 datetime.timedelta 或者 crontab 甚至太陽系經緯度坐標進行間隔時間配置,具體可以參考這裡(http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#crontab-schedules)




如果定時任務涉及到 datetime 需要在配置中加入時區信息,否則默認是以 utc 為準。例如中國可以加上:





CELERY_TIMEZONE = "Asia/Shanghai"




然後在 tasks.py 中增加要被周期執行的任務:





# tasks.py


app

=

Celery

(

"tasks"

,

backend

=

"redis://localhost:6379/0"

,

broker

=

"redis://localhost:6379/0"

)


app

.

config_from_object

(

"celery_config"

)


 


@

app

.

task

(

bind

=

True

)


def period_task

(

self

)

:


    

print

"period task done: {0}"

.

format

(

self

.

request

.

id

)




然後重新運行 worker,接著再運行 beat:





celery -A task beat







可以看到周期任務運行正常~




2.5 鏈式任務




有些任務可能需由幾個子任務組成,此時調用各個子任務的方式就變的很重要,盡量不要以同步阻塞的方式調用子任務,而是用非同步回調的方式進行鏈式任務的調用:




錯誤示範





@

app

.

task


def update_page_info

(

url

)

:


    

page

=

fetch_page

.

delay

(

url

).

get

()


    

info

=

parse_page

.

delay

(

url

,

page

).

get

()


    

store_page_info

.

delay

(

url

,

info

)


 


@

app

.

task


def fetch_page

(

url

)

:


    

return

myhttplib

.

get

(

url

)


 


@

app

.

task


def parse_page

(

url

,

page

)

:


    

return

myparser

.

parse_document

(

page

)


 


@

app

.

task


def store_page_info

(

url

,

info

)

:


    

return

PageInfo

.

objects

.

create

(

url

,

info

)




正確示範1





def update_page_info

(

url

)

:


    

# fetch_page -> parse_page -> store_page


    

chain

=

fetch_page

.

s

(

url

)

|

parse_page

.

s

()

|

store_page_info

.

s

(

url

)


    

chain

()


 


@

app

.

task

()


def fetch_page

(

url

)

:


    

return

myhttplib

.

get

(

url

)


 


@

app

.

task

()


def parse_page

(

page

)

:


    

return

myparser

.

parse_document

(

page

)


 


@

app

.

task

(

ignore_result

=

True

)


def store_page_info

(

info

,

url

)

:


    

PageInfo

.

objects

.

create

(

url

=

url

,

info

=

info

)




正確示範2





fetch_page.apply_async((url), link=[parse_page.s(), store_page_info.s(url)])




鏈式任務中前一個任務的返回值默認是下一個任務的輸入值之一 ( 不想讓返回值做默認參數可以用 si() 或者 s(immutable=True) 的方式調用 )。




這裡的 s() 是方法 celery.signature() 的快捷調用方式,signature 具體作用就是生成一個包含調用任務及其調用參數與其他信息的對象,個人感覺有點類似偏函數的概念:先不執行任務,而是把任務與任務參數存起來以供其他地方調用。




2.6 調用任務




前面講了調用任務不能直接使用普通的調用方式,而是要用類似 add.delay(2, 2) 的方式調用,而鏈式任務中又用到了 apply_async 方法進行調用,實際上 delay 只是 apply_async 的快捷方式,二者作用相同,只是 apply_async 可以進行更多的任務屬性設置,比如 callbacks/errbacks 正常回調與錯誤回調、執行超時、重試、重試時間等等,具體參數可以參考這裡(http://docs.celeryproject.org/en/latest/reference/celery.app.task.html#celery.app.task.Task.apply_async)




2.7 關於 AsyncResult




AsyncResult 主要用來儲存任務執行信息與執行結果,有點類似 tornado 中的 Future 對象,都有儲存非同步結果與任務執行狀態的功能,對於寫 js 的朋友,它有點類似 Promise 對象,當然在 Celery 4.0 中已經支持了 promise 協議,只需要配合 gevent 一起使用就可以像寫 js promise 一樣寫回調:





import

gevent

.

monkey


monkey

.

patch_all

()


 


import time


from celery import Celery


 


app

=

Celery

(

broker

=

"amqp://"

,

backend

=

"rpc"

)


 


@

app

.

task


def add

(

x

,

y

)

:


    

return

x

+

y


 


def on_result_ready

(

result

)

:


    

print

(

"Received result for id %r: %r"

%

(

result

.

id

,

result

.

result

,))


 


add

.

delay

(

2

,

2

).

then

(

on_result_ready

)




要注意的是這種 promise 寫法現在只能用在 backend 是 RPC (amqp) 或 Redis 時。 並且獨立使用時需要引入 gevent 的猴子補丁,可能會影響其他代碼。 官方文檔給的建議是這個特性結合非同步框架使用更合適,例如 tornado、 twisted 等。




delay 與 apply_async 生成的都是 AsyncResult 對象,此外我們還可以根據 task id 直接獲取相關 task 的 AsyncResult: AsyncResult(task_id=xxx)




關於 AsyncResult 更詳細的內容,可以參考這裡(http://docs.celeryproject.org/en/latest/reference/celery.result.html?highlight=result#celery.result.AsyncResult)




利用 Celery 進行分散式隊列管理、開發將會大幅提升開發效率,關於 Celery 更詳細的使用大家可以去參考詳細的官方文檔(http://docs.celeryproject.org/en/latest/index.html)





  • 來源:rapospectre



  • segmentfault.com/a/1190000008022050



  • Python開發整理髮布,轉載請聯繫作者獲得授


【點擊成為Java大神】

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 Python開發 的精彩文章:

使用 Python 進行分散式系統協調

TAG:Python開發 |