如何利用並發性加速你的python程序(二):I/O 綁定程序加速
雷鋒網 AI 科技評論按,本文是工程師 Jim Anderson 分享的關於「通過並發性加快 python 程序的速度」的文章的第二部分,主要內容是 I/O 綁定程序加速相關。
在上一篇中,我們已經講過了相關的概念:什麼是並發?什麼是並行? I/O 綁定和 CPU 綁定等。在這裡,我們將對一些 python 並發方法進行比較,包括線程、非同步和多進程,在程序中何時使用並發性以及使用哪個模塊。
當然,本文假設讀者對 python 有一個基本的了解,並且使用 python3.6 及以上版來運行示例。你可以從 Real python GitHub repo下載示例。
如何加速 I/O 綁定程序
讓我們從關注 I/O 綁定程序和一個常見問題開始:通過網路下載內容。在我們的例子中,你將從一些站點下載網頁,但這個過程可能會產生任何故障。它只是更容易可視化。
同步版本
我們將從這個任務的非並發版本開始。注意,這個程序需要請求模塊。在運行這個程序之前,你需要運行 pip 安裝請求,這可能需要使用 virtualenv 命令。此版本根本不使用並發:
import requests
import time
def download_site(url, session):
with session.get(url) as response:
print(f"Read {len(response.content)} from {url}")
def download_all_sites(sites):
with requests.Session as session:
for url in sites:
download_site(url, session)
if __name__ == "__main__":
sites = [
"http://www.jython.org",
"http://olympus.realpython.org/dice",
] * 80
start_time = time.time
download_all_sites(sites)
duration = time.time - start_time
print(f"Downloaded {len(sites)} in {duration} seconds")
如你所見,這是一個相當短的程序。download_site可以從 URL 下載內容並列印它的大小。要指出的一個小問題是,我們正在使用來自 Session 的會話對象。
直接從 requests 中使用 get,但創建一個 Session 對象允許 requests 執行一些花哨的網路技巧從而真正加快速度是可能的。
download_all_sites創建 Session,然後瀏覽站點列表,依次下載每個站點。最後,它列印出這個過程花費了多長時間,這樣你就可以滿意地看到在下面的示例中並發性對我們有多大幫助。
這個程序的處理圖看起來很像上一節中的 I/O 綁定圖。
注意:網路流量取決於許多因素,這些因素可能在每秒都在變化。我已經看到由於網路問題,這些測試案例從一次運行跳轉到另一次的時間加倍了。
為什麼同步版本很重要
這個版本的代碼最棒的特點是,它很簡單,編寫和調試相對容易。代碼的思路更加直接,所以你可以預測它將如何運作。
同步版本的問題
和我們提供的其他解決方案相比,同步版本最大的問題是,它的速度相對較慢。以下是我的機器上的最終輸出示例:
注意:你得到的結果可能會和上面有很大差異。運行這個腳本時,需要的時間從 14.2 秒到 21.9 秒不等。在本文中,時間取三次運行中最快的一次所花的時間,在這種情況下,兩種方法之間的差異仍然很明顯。
然而,運行速度變慢並不總是一個大問題。如果你正在運行的程序使用同步版本運行只需要 2 秒,並且很少運行,那麼可能不需要添加並發性。
如果你的程序經常運行怎麼辦?如果運行程序需要幾個小時怎麼辦?讓我們繼續使用線程重寫這個程序以實現並發性。
線程版本
正如你可能猜測的那樣,編寫線程程序需要付出更多的努力。然而,對於簡單的案例,你可能會驚訝於它所花費的額外努力是如此之少。下面是同一個程序的線程版本:
import concurrent.futures
import requests
import threading
import time
thread_local = threading.local
def get_session:
if not getattr(thread_local, "session", None):
thread_local.session = requests.Session
return thread_local.session
def download_site(url):
session = get_session
with session.get(url) as response:
print(f"Read {len(response.content)} from {url}")
def download_all_sites(sites):
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
executor.map(download_site, sites)
if __name__ == "__main__":
sites = [
"http://www.jython.org",
"http://olympus.realpython.org/dice",
] * 80
start_time = time.time
download_all_sites(sites)
duration = time.time - start_time
print(f"Downloaded {len(sites)} in {duration} seconds")
當你添加線程時,整體結構是相同的,因此你只需要做一些更改。download_all_sites從在每個站點調用一次函數改為更複雜的結構。
在這個版本中,你正在創建一個 ThreadPoolExecutor,這看起來很複雜。我們可以把它分解為:ThreadPoolExecutor=thread+pool+executor。
這個對象將創建一個線程池,每個線程都可以並發運行。最後,執行器會控制池中每個線程的運行方式和運行時間。請求將在池中執行。
標準庫將 ThreadPoolExecutor 實現為上下文管理器,這樣你就可以使用 with 語法來管理線程池的創建和釋放。
一旦有了 ThreadPoolExecutor,就可以很方便地使用它的.map方法。此方法在列表中的每個站點上運行傳入函數。最重要的是,它使用所管理的線程池自動並發地運行它們。
那些學習其他語言,甚至是 python 2 的用戶可能想知道,在處理線程時,通常用來管理細節的對象和函數在哪裡,比如 thread.start、thread.join和 queue。
這些仍然存在,你可以使用它們來實現對線程運行方式的細粒度控制。但是,從 python3.2 開始,標準庫添加了一個執行器,如果不需要細粒度的控制,它可以為你管理許多細節。
我們的示例中另一個有趣的變化是,每個線程都需要創建自己的 requests.session對象。當你查看請求文檔時,不一定很容易分辨出來,但是讀到這個問題時,你似乎很清楚每個線程需要單獨的 Session。
這是線程處理的一個有趣又困難的問題之一。因為操作系統可以控制一個任務何時被中斷及另一個任務何時開始,所以在線程之間共享的任何數據都需要受到保護,保證線程安全。很遺憾,requests.session不是線程安全的。
根據數據是什麼以及如何使用它,有幾種策略可以使數據訪問線程安全。其中之一是使用線程安全的數據結構,如 python 隊列模塊中的 queue。
另一種策略是線程本地存儲。Threading.local 創建一個看起來像全局的對象,但它對於每個線程來說是不一樣的。在你的示例中,這是通過 threadLocal 和 get_session完成的:
threadLocal = threading.localdef get_session:
if getattr(threadLocal, "session", None) is None:
threadLocal.session = requests.Session
return threadLocal.session
ThreadLocal 是在線程模塊中專門解決這個問題的。看起來有點奇怪,但你只想創建這些對象中的一個,而不是為每個線程創建一個對象。對象本身負責分離不同線程對不同數據的訪問過程。
當調用 get_session時,它查找的 session 和它運行的特定線程是對應的。因此,每個線程在第一次調用 get_session時將創建一個會話,然後後續在其整個生命周期內簡單地調用該會話。
最後,一個關於選擇線程數的簡短說明。你可以看到示例代碼使用了 5 個線程。你可以隨意調整這個數字的大小,看看總的時間是如何變化的。你可能認為每次下載只有一個線程是最快的,但實際上不是這樣,至少在我的系統中不是這樣。我發現,線程數目在 5 到 10 個之間時,速度是最快的。如果超過這個值,那麼創建和銷毀線程所產生的額外開銷將抵消任何節省時間所帶來的好處。
這裡的難點在於,正確的線程數不是從一個任務到另一個任務中的常量。需要進行一些實驗才能得到結果。
為什麼線程版本很重要
它很快!這裡是我測試中最快的一次。記住,非並發版本需要 14 秒以上的時間:
它的執行時序圖如下所示:
它使用多個線程同時向網站發出多個打開的請求,允許你的程序重疊等待時間並更快地獲得最終結果!
線程版本的問題
正如你從示例中看到的,要實現這一點需要更多的代碼,而且你真的需要考慮在線程之間需要共享哪些數據。
線程可以以巧妙且難以檢測的方式進行交互。這些交互可能導致隨機的、間歇性的錯誤,且這些錯誤很難找到。
非同步(asyncio)版本
在你開始檢查非同步版本示例代碼之前,讓我們詳細討論一下非同步的工作原理。
非同步基礎
這將是 asycio 的簡化版本。這裡有許多細節被掩蓋了,但它仍然說明了它是如何工作的。
asyncio 的一般概念是,一個被稱為事件循環的 python 對象控制每個任務的運行方式和時間。這個對象清楚地知道每個任務處於什麼狀態。實際上,任務可以處於許多狀態,但現在讓我們設想一個簡化的事件循環,它只有兩個狀態。
就緒狀態指的是任務有工作要做並且準備運行,而等待狀態意味著任務正在等待一些外部事情完成,例如網路操作。簡化的事件循環維護兩個任務列表,分別對應這兩個狀態。它選擇一個已經就緒的任務,然後重新開始運行。該任務處於完全控制狀態,直到它將控制項送回事件循環。
當正在運行的任務將控制權交還給事件循環時,事件循環將該任務放入就緒或等待列表,然後遍歷等待列表中的每個任務,以查看完成 I/O 操作後該任務是否已就緒。它知道就緒列表中的任務仍然是就緒狀態,因為它們尚未運行。
一旦所有的任務都被重新排序到正確的列表中,事件循環就會選擇下一個要運行的任務。簡化的事件循環選擇等待時間最長的任務並運行該任務。此過程重複,直到事件循環完成。
asyncio 的一個重要點是,如果不是有意為之,任務永遠不會放棄控制。任務在執行的過程中從不會被打斷。這使得我們在非同步中比在線程中更容易進行資源共享。你不需要擔心線程安全問題。
async 和 await
現在讓我們來談談添加到 python 中的兩個新關鍵字:async 和 await。根據上面的討論,你可以將 await 視為允許任務將控制權交回事件循環的一種魔力。當你的代碼等待函數調用時,await 是一個信號,表明調用可能需要花費一段時間,並且任務應該放棄控制。
最簡單的方法是將 async 看作是 python 的標誌,告訴它將使用 await 定義函數。在有些情況下,這不是完全正確的,比如非同步生成器,但它適用於許多情況,並在開始時為你提供一個簡單的模型。
你將在下一個代碼中看到的一個例外是 async with 語句,它通常從你的等待的對象創建一個上下文管理器。雖然語義有點不同,但其思想是相同的:將這個上下文管理器標記為可以替換的東西。
我確信你可以想像到,在管理事件循環和任務之間的交互時有一些複雜性。對於以 asyncio 開始的開發人員來說,這些細節並不重要,但是你需要記住,任何調用 await 的函數都需要標記為 async。否則將出現語法錯誤。雷鋒網
回到代碼
既然你已經基本了解了什麼是 asyncio,那麼讓我們瀏覽一下示例代碼的 asyncio 版本,並了解它是如何工作的。請注意,此版本添加了 aiohtp。在運行它之前,應該先運行 pip install aiohtp:
import asyncio
import time
import aiohttp
async def download_site(session, url):
async with session.get(url) as response:
print("Read {0} from {1}".format(response.content_length, url))
async def download_all_sites(sites):
async with aiohttp.ClientSession as session:
tasks =
for url in sites:
task = asyncio.ensure_future(download_site(session, url))
tasks.append(task)
await asyncio.gather(*tasks, return_exceptions=True)
if __name__ == "__main__":
sites = [
"http://www.jython.org",
"http://olympus.realpython.org/dice",
] * 80
start_time = time.time
asyncio.get_event_loop.run_until_complete(download_all_sites(sites))
duration = time.time - start_time
print(f"Downloaded {len(sites)} sites in {duration} seconds")
這個版本比前兩個版本要複雜一些。它有一個類似的結構,但是啟動任務的工作量比創建線程池執行器的工作量要多一些。讓我們從示例的頂部開始。
download_site
頂部的 download_site與線程版本幾乎相同,但函數定義行上的 async 關鍵字和實際調用 session.get時的 async with 關鍵字除外。稍後你將看到為什麼可以在這裡傳遞 session,而不是使用線程本地存儲。
download_all_sites
download_all_sites 中可以看到線程示例中最大的變化。
你可以在所有任務之間共享會話,因此該會話在此處創建為上下文管理器。任務可以共享會話,因為它們都在同一線程上運行。會話處於錯誤狀態時,一個任務無法中斷另一個任務。
在該上下文管理器中,它使用 asyncio.secure_future創建一個任務列表,該列表還負責啟動它們。創建所有任務後,此函數使用 asyncio.gather完成會話內容的變動,直到所有任務完成。
線程代碼的作用與此類似,但在 ThreadPoolExecutor 中可以方便地處理細節。當前沒有 asyncioPoolExecutor 類。
然而,這裡的細節中隱藏著一個小而重要的變化。還記得之前我們討論過要創建的線程數嗎?在線程示例中,線程的最佳數量並不明顯。
asyncio 的一個很酷的優點是它的規模遠遠優於線程。與線程相比,每項任務創建所需的資源和時間要少得多,因此創建和運行更多的資源和時間能很好地工作。這個例子只是為每個要下載的站點創建一個單獨的任務,這個任務運行得很好。雷鋒網
__main__
最後,非同步的本質意味著你必須啟動事件循環,並告訴它要運行哪些任務。文件底部的__main__部分包含 get_event_loop 的代碼,然後運行 run_until_complete。如果沒有別的,他們在命名這些函數方面做得很好。
如果你已經更新到 python 3.7,那麼 python 核心開發人員會為你簡化這種語法。不需要分辨那種情況下使用 asyncio.get_event_loop,那種情況下使用 run_until_complete,你只需使用 asyncio.run。
為什麼 asyncio 版本很重要
它真的很快!在我的機器上進行的所有測試中,這是代碼運行最快的版本:
執行時序圖與線程示例中所發生的情況非常相似。只是 I/O 請求都是由同一線程完成的:
缺少線程池執行器,使得這段代碼比線程示例要複雜一些。在這種情況下,你需要做一些額外的工作來獲得更好的性能。
還有一個常見的論點是,在合適的位置添加 async 和 await 是一個複雜的問題。在某種程度上,這是事實。這個論點的另一個方面是,它迫使你思考何時交換給定的任務,這可以幫助你設計出一份更好、更快的代碼。
規模問題在這裡也很突出。為每個站點運行上面的線程示例明顯比用少量線程運行它慢。運行帶有數百個任務的 asyncio 示例並沒有減慢速度。
asyncio 版本的問題
現在 asyncio 有幾個問題。為了充分利用 asyncio,你需要特殊的 asyncio 版本的庫。如果你只是使用下載站點的請求,那麼速度會慢得多,因為請求不是用來通知事件循環它被阻塞了。隨著時間的推移,這個問題越來越少,因為越來越多的庫採用 asyncio。
另一個更微妙的問題是,如果其中一個任務不合作,那麼協作多任務的所有優勢都會消失。代碼中的一個小錯誤會導致一個任務運行,並長時間佔用處理器,從而使其他需要運行的任務處於等待狀態。如果任務沒有將控制權交還給事件循環,則無法中斷事件循環。考慮到這一點,讓我們來看看一種完全不同的並發、多處理方法。
多處理版本
與前面的方法不同,多處理版本的代碼充分利用了新計算機的多個 CPU。讓我們從代碼開始:
import requests
import multiprocessing
import time
session = None
def set_global_session:
global session
if not session:
session = requests.Session
def download_site(url):
with session.get(url) as response:
name = multiprocessing.current_process.name
print(f"{name}:Read {len(response.content)} from {url}")
def download_all_sites(sites):
with multiprocessing.Pool(initializer=set_global_session) as pool:
pool.map(download_site, sites)
if __name__ == "__main__":
sites = [
"http://www.jython.org",
"http://olympus.realpython.org/dice",
] * 80
start_time = time.time
download_all_sites(sites)
duration = time.time - start_time
print(f"Downloaded {len(sites)} in {duration} seconds")
這比 asyncio 示例短得多,實際上,它看起來與線程示例非常相似,但是在我們深入研究代碼之前,讓我們快速了解一下多處理對你會有什麼幫助。
簡述多處理
到目前為止,本文中的所有並發示例都只在計算機的單個 CPU 或核上運行。其原因與當前的 cpython 的設計以及所謂的全局解釋器鎖(globalinterpretorlock,簡稱 gil)有關。
標準庫中的多處理設計正是為了改變這種狀態而設計的,它使你能在多個 CPU 上運行代碼。在高層,它是通過創建一個新的 python 解釋器實例在每個 CPU 上運行,然後釋放出程序的一部分來實現的。
在當前的 python 解釋器中啟動一個新線程的速度不如單獨啟動一個 python 解釋器的速度快。這是一個重要的操作,存在一些限制和困難,但對某些問題來說,它可以產生巨大的差異。
多處理代碼
代碼與我們的同步版本相比有一些小的變化。第一個區別位於 download_all_sites中。它不是簡單地重複調用 download_site,而是創建一個 multiprocessing.pool 對象,並讓它將 download_site 映射到不可訪問的站點。和線程示例相比,這點比較相似。
這裡所發生的是,池(pool)創建了許多單獨的 python 解釋器進程,並讓每個進程在某些項上運行指定的函數,在我們的例子中是在站點列表上運行指定的函數。主進程和其他進程之間的通信由多處理模塊為你處理。
創造池的那條線值得你注意。首先,它不指定要在池中創建多少進程,儘管這是一個可選參數。默認情況下,multiprocessing.pool將確定計算機中的 CPU 數量並與之匹配。這通常是最好的答案,在我們的例子中也是如此。
對於這個問題,增加進程的數量並不能提高速度。相反,它實際上會降低速度,因為啟動和刪除所有這些進程的成本大於並行執行 I/O 請求的好處。
接下來,我們得到該調用的 initializer=set_global_session 部分。請記住,池中的每個進程都有自己的內存空間,這意味著它們不能共享會話對象之類的東西。你不會希望每次調用函數時都創建新會話,而是希望為每個進程創建一個會話。
初始化功能參數就是為這種情況而生成的。無法將返回值從初始值設定項傳遞迴由進程 download_site調用的函數,但可以初始化全局會話變數以保存每個進程的單個會話。因為每個進程都有自己的內存空間,所以每個進程的全局空間都不同。
這就是所有要說的啦,其餘的代碼與你以前看到的非常相似。
為什麼多處理版本很重要
這個例子的多處理版本非常好,因為它相對容易啟動,並且只需要很少的額外代碼。它還充分利用了計算機中的 CPU 資源。此代碼的執行時序圖如下所示:
多處理版本的問題
這個版本的示例確實需要一些額外的設置,而且全局會話對象很奇怪。你必須花費一些時間來考慮在每個流程中訪問哪些變數。
最後,它明顯比本例中的非同步和線程版本慢:
這並不奇怪,因為 I/O 綁定問題並不是多處理存在的真正原因。在進入下一節並查看 CPU 綁定示例時,你將看到更多內容。
本文之前還有相關概念介紹:如何利用並發性加速你的python程序(一):相關概念
以及接下來的一篇是:如何利用並發性加速你的python程序(三):CPU 綁定程序加速
via:https://www.leiphone.com/news/201901/JfoLltRClm3bZzuB.html?type=preview
※美國將尋求引渡孟晚舟:會向加拿大政府提出正式請求
※20秒視頻曝光!小米摺疊屏手機來了?
TAG:雷鋒網 |