當前位置:
首頁 > 知識 > Python流處理

Python流處理


      Faust是一個流處理庫,將kafka流中的思想移植到Python中。


它被用於

Robinhood

去構建高性能的分散式系統和實時數據通道,每天處理數十億的數據。


Faust同時提供流處理和事件處理,同類型的工具分享例如:

Kafka Streams, Apache Spark/Storm/Samza/Flink


它不需要使用一個DSL,僅需要用到Python!這意味著你在做流處理的時候可以使用所有你喜歡的Python庫:

NumPy, PyTorch, Pandas, NLTK, Django, Flask, SQLAlchemy

等等。


 


由於需要使用新的async/await語法和變數類型注釋方法,Faust需要使用Python3.6以上的版本。


這裡有一個處理輸入命令流的示例:



這個agent裝飾器定義了一個「流處理器」,它本質上是一個Kafka topic,並且可以對接收到的每個事件做一些處理。

這個agent是一個async def的函數,因此它還可以非同步執行其他操作,如web請求。



這個系統可以持久化狀態,執行方式類似於資料庫。表被命名成分散式的key/value儲存,你可以使用常規的Python字典來做這件事。


     

在每台機器上的本地用c++編寫的超快嵌入式資料庫(被稱為RocksDB)存儲表。


     表還可以存儲可選的「窗口」聚合計數,以便跟蹤「前一天的單擊次數」或「前一個小時的單擊次數」。與Kafka流一樣,我們支持滾動、跳躍和滑動時間窗口,舊窗口可以過期以阻止數據填充。


為了提高可靠性,我們使用Kafka topic作為「預寫日誌」。當一個密鑰被更改時,我們將其發布到更新的日誌上。備用節點使用這個更新日誌來保存數據的精確副本,並在任何節點發生故障時支持立即恢復。


       

對於用戶來說,表只是一個字典,但是數據在重新啟動和跨節點複製之間存在,所以在故障發生時其他節點可以自動接管。



您可以通過URL統計頁面瀏覽數量:



發送到Kafka topic的數據是分區的,這意味著點擊數將用URL的這種方式進行分片。因此,同一個URL的每個計數都會立刻被傳遞給同一個Faust worker實例。


Faust支持任何類型的流數據:位元組、Unicode和序列化結構,同時也支持使用現代Python語法的「模型」來描述流中的keys和value是如何被序列化的。



Faust是靜態類型的,使用mypy類型檢查器,所以您在編寫應用程序時可以充分利用靜態類型的優勢。


Faust源代碼很小,組織良好,是學習Kafka流實現的好資源。



在引言頁學習更多關於Faust的知識


去閱讀更多關於Faust,系統請求,安裝指導,論壇資源等等,或者直接訪問快速開始的教程。在一個編寫流處理的應用中去查看關於Faust應用,然後通過使用者手冊深入探討。深層

次的信息都根據不同主題在這個手冊中進行說明


 


Faust是…



簡介


Faust非常容易使用。在學習其他的流處理方法時,你總是需要從一個複雜的hello-world工程和相應的基礎要求開始學習。Faust僅僅需要Kafka,剩下的就是只需要Python,如果你知道Python的話你就可以直接使用Faust去做流處理的工作了,並且它可以整合和他相關的一切。


      這兒有一個簡單的應用程序你可以做:源代碼是Python的



您可能會被async和await這兩個關鍵字嚇到,但是您在使用Faust時不需要知道asyncio是如何工作的:只要模仿這些例子就可以得到您想要的結果。


      


示例應用程序啟動兩個任務:一個是處理流,另一個是向流發送事件的後台線程。在實際的應用程序中,您的系統將向Kafka topic發布事件,您的處理器可以從Kafka topic獲取事件信息,並且只需要後台線程將數據輸入到我們的示例中。


 

高可用性


Faust是高度可用的,並且可以在網路問題和伺服器崩潰中生存下來。在節點失敗的情況下,它可以自動恢復,並且表將接管備用節點。


 


分散式的


根據您的應用程序的需要啟動更多實例。


 


快速

一個單內核的Faust worker實例已經可以每秒處理數萬個事件,我們有理由相信,一旦我們能夠支持一個更優化的Kafka客戶端,吞吐量就會增加。


 


靈活性


Faust就是Python,而流是一個無限的非同步迭代器。如果您知道如何使用Python,那麼您已經知道如何使用Faust,它可以與您喜歡的Python庫一起使用,比如Django、Flask、SQLAlchemy、NTLK、NumPy、Scikit、TensorFlow等等。


 


安裝


您可以通過Python包或從源文件中安裝Faust


使用pip安裝它:




綁定


Faust還定義了一組setuptools擴展,可以用來安裝Faust,並且有一個給定特性的依賴關係。


您可以在您的需求中或在pip命令行中使用方括弧來指定它們。使用逗號分隔多個包:



 


以下的綁定均是有效的:

商店



 


最優化



 


感測器



 


事件循環



 


調試





下載並從源文件中安裝




下載最新的Faust版本的網址是:http: //pypi.python.org/pypi/faust


您可以這樣安裝它:



如果當前沒有使用virtualenv,則必須以特權用戶的身份執行最後一個命令。


 


使用開發版本


您可以使用以下pip命令安裝Faust的最新版本:



 


常見問題


Faust可以在Django/Flask/etc上使用嗎?


使用gevent


這種方法適用於任何可以與gevent一起工作的阻塞Python庫。


使用gevent需要您安裝aiogevent模塊,您可以將其作為Faust的包進行安裝:



然後要真正的使用gevent作為事件循環,您要麼在faust程序中使用-L <faust --loop>


命令:



要麼在你腳本的前面加入

import mode.loop.gevent



記住:非常重要的一點是,它位於模塊的最頂端,並且在導入庫之前執行。


 


使用eventlet


這種方法適用於任何可以使用eventlet的阻塞Python庫。


使用eventlet需要您安裝aioeventlet模塊,您可以將其安裝為與Faust一起的捆綁包。



然後要實際使用eventlet作為事件循環,您要麼

在faust程序中

使用-L <faust --loop>


命令:



要麼在你腳本的前面加入

import mode.loop.gevent



警告


非常重要的是,它位於模塊的最頂端,並且在導入庫之前執行。


 


Faust可以在Tornado上使用嗎?


可以!使用tornado.platform.asyncio鏈接:

http://www.tornadoweb.org/en/stable/asyncio.html


 


Faust可以在Twisted上使用嗎?


可以!使用asyncio反應器實現:

https://twistedmatrix.com/documents/17.1.0/api/twisted.internet.asyncioreactor.html


 


是否支持Python3.5或者更早的版本?


目前還沒有支持Python 3.5的計劃,但是歡迎您為這個項目做出貢獻。


以下是實現這一目標所需的一些步驟




  • 源代碼轉換以重寫變數注釋到注釋


示例,代碼:





  • 重寫非同步函數的源代碼轉換


示例,代碼:



必須重寫:





你將支持Python2嗎?


目前還沒有支持Python 2的計劃,但是歡迎您為項目做貢獻(上面問題中的細節也與Python 2相關)。


 


在本地運行Faust應用程序時,我得到的打開文件的最大數量超過了RocksDB的錯誤。我該怎麼解決這個問題呢


您可能需要增加打開文件的最大數量的限制。下面的文章解釋了如何在OS X上這麼做:

https://blog.dekstroza.io/ulimit-shenanigans-on-osx-el-capitan




資源




問題跟蹤


如果你有任何意見,問題,或者煩惱,請記錄下來作為我們的問題追蹤報告:

https://github.com/robinhood/faust/issues/


 


Wiki


https://wiki.github.com/robinhood/faust/


 


許可證


該軟體在新的BSD許可下獲得許可。有關完整的許可文本,請參閱頂部分發目錄中的許可文件。


 


貢獻


Faust的發展發生在GitHub:

 

https://github.com/robinhood/faust


我們非常鼓勵您參與Faust的發展。


請務必也閱讀文件中對Faust的貢獻部分。


 


編碼規範


在項目代碼庫、問題跟蹤器、聊天室和郵件列表中進行交互的每個人都應該遵循《Faust行為準則》。


 


作為這些項目的貢獻者和維護者,為了培養開放和受歡迎的社區,我們承諾尊重所有通過報告問題、發布特性請求、更新文檔、提交合併請求或補丁和其他活動的人。


 


我們致力於使參與這些項目的每個人都無騷擾體驗,不論其經驗水平、性別、性別認同和表現、性取向、殘疾、個人外貌、體型、種族、種族、年齡、宗教或國籍。


 


參與者不良行為包括:




  • 性化的語言或意象的使用



  • 個人人身攻擊



  • 惡意破壞或侮辱/侮辱性的評論



  • 公共或者私人的騷擾



  • 未經明確許可,發布他人的私人信息,如住址或電子地址



  • 其他不道德或不專業的行為。


 


項目維護人員有權利和責任刪除、編輯或拒絕評論、提交、代碼、wiki編輯、問題和其他與行為準則不一致的貢獻。通過採用這一行為準則,項目維護者承諾在管理這個項目的每個方面都公平、一致地應用這些原則。不遵守或執行行為準則的項目維護者可能被永久地從項目團隊中刪除。


 


當個人代表項目或社區時,此行為準則適用於項目空間和公共空間。


 


可以通過創建一個問題或聯繫一個或多個項目負責人來舉報虐待、騷擾或其他不可接受行為。


本行為守則改編自Contributor Covenant 1.2.0版本,可在

http://contributor-covenant.org/version/1/2/0/

查閱

.





英文原文:https://github.com/robinhood/faust


譯者:不倒翁

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 Python程序員 的精彩文章:

Redis 深度歷險:核心原理與應用實踐
Python模塊和包初探

TAG:Python程序員 |