當前位置:
首頁 > 知識 > 使用並發加速你的python程序(2)

使用並發加速你的python程序(2)

asyncio 版本

在開始研究asyncio示例代碼之前,我們來更多地討論一下asyncio是如何工作的。

asyncio基礎知識

這將是asycio的一個簡化版本。這裡有很多細節被忽略了,但它仍然解釋了它是如何工作的。

asyncio的一般概念是一個單個的Python對象,稱為事件循環,它控制每個任務如何以及何時運行。事件循環會關注每個任務並知道它處於什麼狀態。在實際中,任務可以處於許多狀態,但現在我們假設一個簡化的只有兩種狀態的事件循環。

就緒狀態將表明一個任務有工作要做,並且已經準備好運行,而等待狀態意味著該任務正在等待一些外部工作完成,例如網路操作。

我們簡化的事件循環維護兩個任務列表,每一個對應這些狀態。它會選擇一個就緒的任務,然後重新啟動它。該任務處於完全控制之中,直到它配合地將控制權交還給事件循環為止。

當正在運行的任務將控制權交還給事件循環時,事件循環將該任務放入就緒或等待列表中,然後遍歷等待列表中的每個任務,以查看I/O操作完成後某個任務是否已經就緒。時間循環知道就緒列表中的任務仍然是就緒的,因為它知道它們還沒有運行。

一旦所有的任務都重新排序到正確的列表中,事件循環將選擇下一個要運行的任務,然後重複這個過程。我們簡化的事件循環會選擇等待時間最長的任務並運行該任務。此過程會一直重複,直到事件循環結束。

asyncio的一個重要之處在於,如果沒有刻意去釋放控制權,任務是永遠不會放棄控制權的。它們在操作過程中從不會被打斷。這使得我們在asyncio中比在threading中能更容易地共享資源。你不必擔心代碼是否是線程安全的。

這是從高級層面來看待asyncio的運行過程的。如果你想更深入地了解更多細節,這個StackOverflow的答案(https://stackoverflow.com/a/51116910/6843734 )提供了一些很好的細節。

async 和 await

現在我們來討論添加到Python中的兩個新關鍵字: async和await。根據上面的討論,你可以將await看作是允許任務將控制權交還給事件循環的魔法。當你的代碼在等待函數調用時,這是一個信號,表明調用可能需要一段時間,並且任務應該放棄控制。

最簡單的方式是將async看作Python的一個標誌,它會告訴Python將要定義的函數會使用await。在某些情況下,這並不完全正確,比如非同步生成器(https://www.python.org/dev/peps/pep-0525/ ),但是它適用於多數情況,並在你開始時為你提供一個簡單的模型。

在下一段代碼中,你將看到一個例外,那就是async with語句,它會從你通常需要等待的對象創建一個上下文管理器。雖然語義略有不同,但思想是相同的: 將這個上下文管理器標記為可以交換出去的東西。

我相信你可以想像,在管理事件循環和任務之間的交互時存在一些複雜性。對於剛開始使用asyncio的開發人員來說,這些細節並不重要,但是你需要記住,任何調用await的函數都需要使用async進行標記。否則你會得到一個語法錯誤。

返回到代碼

現在,你已經基本了解了什麼是asyncio,我們來瀏覽一下示例代碼的asyncio版本,並找出它是如何工作的。注意,這個版本添加了aiohttp。在運行示例代碼之前,你應該先運行pip install aiohttp:

使用並發加速你的python程序(2)

打開今日頭條,查看更多圖片

這個版本比前兩個版本稍微複雜一些。它具有類似的結構,但是設置任務的工作量要比創建ThreadPoolExecutor多一些。讓我們從示例的頂部開始。

download_site ()

除了函數定義行上的async關鍵字和實際調用session.get()時的async with關鍵字之外,download_site()的頂部幾乎與threading版本相同。稍後你將看到為什麼Session可以在這裡傳遞,而不是使用線程本地存儲。

download_all_sites()

download_all_sites()與threading示例中的相比變化很大。

你可以在所有任務之間共享會話,因此會話在這裡作為上下文管理器創建。這些任務可以共享會話,因為它們都在同一個線程上運行。當會話處於糟糕(bad)狀態時,一個任務不可能中斷另一個任務。

在該上下文管理器中,它使用asyncio.ensure_future()創建一個任務列表,該列表還負責啟動這些任務。所有任務創建之後,這個函數會使用asyncio.gather()保持會話上下文處於活動狀態,直到所有任務都完成為止。

threading代碼也執行類似的操作,但是細節都是在ThreadPoolExecutor中進行處理。目前還沒有AsyncioPoolExecutor類。

然而,這裡的細節中隱藏著一個小而重要的變化。還記得我們是怎樣討論要創建線程的數量的嗎?在threading示例中,最優線程數是多少並不明顯。

asyncio的一個很酷的優點是它的伸縮性比threading好得多。與一個線程相比,創建一個任務需要更少的資源和更少的時間,因此創建和運行更多的任務也會運行良好。這個例子只是為每個要下載的站點創建了一個單獨的任務,運行效果非常好。

__main__

最後,asyncio的本質意味著你必須啟動事件循環並告訴它運行哪些任務。文件底部的__main__部分包含get_event_loop()和run_until_complete()函數的代碼。如果沒有其它東西的話,這兩個函數在命名那些調用函數方面做得很好。

如果你已經更新到Python 3.7, Python核心開發人員已經為你簡化了此語法。你只需要使用asyncio.run()代替拗口的asyncio.get_event_loop().run_until_complete()。

為什麼asyncio版本很棒

因為它真的很快! 在我的機器上進行的測試中,這是速度最快的代碼版本:

使用並發加速你的python程序(2)

執行時序圖看起來與threading示例中的情況非常相似。只是I/O請求都是由同一個線程完成的:.

使用並發加速你的python程序(2)

缺少像ThreadPoolExecutor這樣漂亮的包裝器使得這段代碼比threading示例稍微複雜一些。在這種情況下,你必須做一些額外的工作才能獲得更好的性能。

還有一個常見的爭論是,必須在適當的位置添加async和await是一個額外的複雜性。在某種程度上,這是正確的。這個爭論的另一個方面是,它迫使你考慮何時將給定的任務交換出去,這可以幫助你創建一個更好、更快的設計。

伸縮性問題在這裡也很突出。為每個站點使用一個線程運行上面的threading示例明顯比使用幾個線程運行的慢一些。但是,運行包含數百個任務的asyncio示例根本不會降低速度。

asyncio版本的問題

此時asyncio有兩個問題。你需要特殊的非同步版本的庫來充分利用asycio。如果你只是使用requests下載站點,那麼速度會慢得多,因為requests的設計目的不是通知事件循環它被阻塞了。隨著時間的推移,這個問題變得微不足道,因為越來越多的庫包含了asyncio。

另一個更微妙的問題是,如果其中一個任務不合作,那麼協作多任務處理的所有優勢都將不存在。代碼中的一個小錯誤可能會導致任務運行超時並長時間佔用處理器,使需要運行的其他任務無法運行。如果一個任務沒有將控制權交還給事件循環,則事件循環無法中斷它。

考慮到這一點,我們來開始討論一種完全不同的並發性——multiprocessing。

multiprocessing 版本

與前面的方法不同,代碼的multiprocessing版本充分利用了你的很酷的新計算機具有的多個CPU。或者,對我來說,就是我那台笨重的舊筆記本電腦。話不多說,我們先從代碼開始:

使用並發加速你的python程序(2)

這比asyncio示例要短得多,而且實際上看起來與threading示例非常類似,但是在深入研究代碼之前,讓我們快速瀏覽一下multiprocessing為你做了什麼。

簡單介紹multiprocessing

到目前為止,本文中的所有並發示例都只在計算機中的單個CPU或核心上運行。這樣做的原因與當前CPython的設計和全局解釋器鎖(或者GIL)有關。

本文不會深入探討GIL(https://realpython.com/python-gil/ )的原理和原因。你現在只需要知道這個示例的同步、threading和asyncio版本都運行在一個CPU上就足夠了。

標準庫中的multiprocessing旨在打破這種障礙,並使你的代碼在多個CPU上運行。在高級層面來看,它通過創建一個新的Python解釋器實例並在每個CPU上運行,然後將程序的一部分外包給每個CPU來實現這一點。

你可以想像,在當前的Python解釋器中啟動一個單獨的Python解釋器並不像啟動一個新的線程那麼快。這是一個重量級的操作,會伴隨著一些限制和困難,但是對於正確的問題,它可以產生巨大的影響。

multiprocessing 代碼

與我們的同步版本相比,這裡的代碼有一些小的更改。第一個更改位於download_all_sites()中。代替簡單地重複調用download_site(),它創建了一個multiprocessing.Pool對象,並讓它將download_site映射到可迭代的sites上。這在threading示例中應該很熟悉。

在這裡,Pool創建了許多單獨的Python解釋器進程,並讓每個進程在可迭代對象中的某些項上運行指定的函數,在我們的例子中,這個可迭代對象是站點列表。主進程與其他進程之間的通信由multiprocessing模塊為你處理。

你需要注意創建Pool的行。首先,它沒有指定要在Pool中創建多少個進程,儘管這是一個可選參數。默認情況下,multiprocessing.Pool()將確定你計算機中的CPU數量並自動與之匹配。這通常是最好的答案,在我們的例子中也是如此。

對於這個問題,增加進程的數量並沒有使程序變得更快。它實際上降低了速度,因為設置和銷毀所有這些進程的成本要大於並行處理I/O請求的好處。

接下來是該調用的initializer=set_global_session部分。請記住,Pool中的每個進程都有自己的內存空間。這意味著它們不能共享諸如Session對象之類的東西。你不希望每次調用函數時都創建一個新Session,而是希望為每個進程都創建一個。

initializer函數參數就是為這種情況而構建的。沒有辦法將返回值從initializer傳遞迴進程download_site()調用的函數,但是可以初始化一個全局session變數來保存每個進程的單個會話。因為每個進程都有自己的內存空間,所以每個進程的全局內存空間是不同的。

代碼的更改基本就是這些。代碼的其餘部分與你之前看到的非常相似。

為什麼 multiprocessing版本很棒

這個示例的multiprocessing版本非常棒,因為它相對容易設置,並且只需要很少的額外代碼。它還充分利用了計算機的CPU處理能力。這段代碼的執行時序圖如下:

使用並發加速你的python程序(2)

multiprocessing 版本的問題

這個版本的示例確實需要一些額外的設置,而且全局session對象很奇怪。你必須花一些時間考慮在每個進程中訪問哪些變數。

最後,它明顯比本例中的asyncio和threading版本要慢:

使用並發加速你的python程序(2)

這並不奇怪,因為I/O限制問題並不是multiprocessing存在的真正原因。在進入下一節並查看CPU限制的示例時,你將看到更多原因。

如何加速CPU密集型程序

我們來換個話題。到目前為止的所有示例都處理了I/ O密集型問題。現在,你將研究一個CPU密集型問題。正如你所看到的,一個I/ O密集型問題將花費大部分時間來等待外部操作(如網路調用)完成。另一方面,CPU密集型問題很少執行I/O操作,它的總體執行時間是影響它處理所需數據的速度的一個因素。

對於我們的示例,我們將使用一個傻瓜函數來創建一些需要很長時間在CPU上運行的東西。這個函數計算從0到傳入值的每個數字的平方和:

使用並發加速你的python程序(2)

如果你傳入了一個很大的數字,這可能需要一段時間。記住,這只是代碼的佔位符,它實際上做了一些有用的事情,並且需要大量的處理時間,比如計算方程的根或對大型數據結構進行排序。

CPU密集型的同步版本

現在我們來看看這個例子的非並發版本:

使用並發加速你的python程序(2)

這段代碼調用cpu_bound() 20次,每次調用都使用一個不同的較大的數值。它在一個CPU上的一個進程中的一個線程上完成所有這些。執行時序圖如下:

使用並發加速你的python程序(2)

與I/ O密集型的示例不同,CPU密集型的示例在運行時通常相當一致。在我的機器上,這個過程大約需要7.8秒:

使用並發加速你的python程序(2)

顯然我們可以做得更好。這都是在沒有並發的單個CPU上運行的。我們來看看能不能做得更好。

threading 和 asyncio 版本

你認為使用 threading 或 asyncio重寫這段代碼會加速多少?

如果你回答「一點也不會加速」,給自己一塊餅乾。如果你回答「它會讓速度慢下來」,那就給自己兩塊餅乾。

原因如下: 在上面的I/ O密集型示例中,大部分執行時間都花在等待緩慢的操作完成上。threading 和 asyncio允許你重疊等待的時間,而不是按順序執行,從而加快了速度。

然而,對於CPU密集型問題,是沒有等待的。CPU正在以最快的速度運行以完成這個問題。在Python中,線程和任務在同一個CPU上的同一個進程中運行。這意味著這一個CPU要執行所有非並發代碼的工作,以及設置線程或任務的額外工作。這個過程需要超過10秒:

使用並發加速你的python程序(2)

我已經編寫了這段代碼的一個threading版本,並將它與其他示例代碼放在GitHub 倉庫(https://github.com/realpython/materials/tree/master/concurrency-overview)中,這樣你就可以自己進行測試了。不過,我們現在先不看這個版本。

CPU密集型的multiprocessing 版本

現在你終於到達了 multiprocessing真正閃耀的地方。與其他並發庫不同, multiprocessing被顯式地設計為通過跨多個CPU來共享繁重的CPU工作負載。下面是它的執行時序圖:

使用並發加速你的python程序(2)

代碼如下:

使用並發加速你的python程序(2)

與非並發版本相比,這段代碼只需要更改一點點。你必須import multiprocessing,然後只需要更改從「循環遍曆數值」到「創建一個multiprocessing.Pool對象,並使用其.map()方法向空閑的工作進程發送單獨的編號」的部分。

這正是你為I/O密集型的multiprocessing代碼所做的,但是在這裡你不需要擔心Session對象。

如上所述,你應該注意 multiprocessing.Pool()構造函數的processes可選參數。你可以指定想要在Pool.中創建和管理多少Process對象。默認情況下,它將確定你的計算機中有多少個CPU,並為每個CPU創建一個進程。雖然這對於我們的簡單示例非常有用,但是在生產環境中你可能想要有更多的控制權。

另外,正如我們在關於threading的第一節中提到的,multiprocessing.Pool代碼是構建在Queue 和 Semaphore等構建塊之上的,對於使用其他語言編寫多線程和多進程代碼的人來說,這些構建塊是很熟悉的。

為什multiprocessing版本很棒

這個示例的multiprocessing版本非常棒,因為它相對容易設置,並且只需要很少的額外代碼。它還充分利用了計算機的CPU處理能力。

這就是上次我們查看multiprocessing時我所說的。最大的不同在於,這一次它顯然是最佳選擇。它在我的機器上運行只需要2.5秒:

使用並發加速你的python程序(2)

這比我們看到的其他選項要好得多。

multiprocessing 版本的問題

使用multiprocessing也有一些缺點。在這個簡單的例子中,它們並沒有真正顯示出來,但是將你的問題拆分,以便每個處理器能夠獨立工作,這有時是很困難的。

此外,許多解決方案都需要流程之間進行更多的通信。這可能會給你的解決方案增加一些複雜性,而非並發程序則不需要處理這些複雜性。

什麼時候使用並發

這裡已經介紹了很多基礎知識,所以我們來回顧一些關鍵思想,然後再討論一些決策點,這些決策點將幫助你確定要在項目中使用哪個並發模塊(如果可用的話)。

這個過程的第一步是決定你是否應該使用並發模塊。雖然這裡的示例使每個庫看起來都非常簡單,但是並發性總是伴隨著額外的複雜性,並且常常會導致難以發現的bug。

不要添加並發,直到出現已知的性能問題,然後確定需要哪種類型的並發。正如Donald Knuth所說,「不成熟的優化是編程中所有罪惡的根源(或者至少是大部分罪惡的根源)。」

一旦你決定要優化程序,下一步就是確定你的程序是CPU密集型還是I/O密集型。請記住,I/ O密集型程序是那些將大部分時間花在等待某些事情發生上的程序,而CPU密集型程序則將時間花在儘可能快地處理數據或運算數字上。

正如你所看到的,CPU密集型問題只會從使用 multiprocessing中獲益。而使用threading和syncio在解決這類問題上一點幫助也沒有。

對於I/ O密集型問題,Python社區中有一條通用的經驗法則:「可以使用時使用asyncio,必須使用時使用threading。」asyncio可以為這類程序提供最佳的速度提升,但是有時你需要使用一些沒有被移植的關鍵庫來利用asyncio。請記住,任何不將控制權交給事件循環的任務都會阻塞所有其他任務。

結論

現在你已經看過了Python中可用的基本並發類型:

  • threading
  • asyncio
  • multiprocessing

你已經了解了應該使用哪種並發方法來解決給定的問題,或者是否應該使用任何並發方法! 此外,你也對在使用並發時可能出現的一些問題有了很好的理解。

我希望你已經從本文中學到了很多,並且發現了並發性在你的項目中的巨大用處!


英文原文:https://realpython.com/python-concurrency/ 譯者:Nothing

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 Python部落 的精彩文章:

OpenCV-Python速查表:從導入圖像到人臉檢測
導致機器學習項目失敗的7個原因

TAG:Python部落 |