洋碼頭全非同步服務框架
作者介紹
羅時遷, 洋碼頭架構師
負責洋碼頭消息匯流排、門神、Falcon等基礎設施的設計與研發。
塗文傑, 洋碼頭高級架構師
負責洋碼頭基礎設施/交易/物流線的研發及管理工作。
本文約3500字,可參閱下面的大綱閱讀。
1. 全非同步的必要性
2. 開源Http非同步服務框架選型
2.1 Vert.x的線程模型
2.2 Vert.x開發的痛點
2.3 如何解決Vert.x痛點
3. 洋碼頭全非同步框架實現
3.1 設計介面AsyncCall
3.2 實現DubboAsyncCall
3.3 實現HttpAsyncCall
3.4 實現ProxyAsyncCall
3.5 關係型資料庫非同步調用
3.6 其他非關係型資料庫非同步調用
3.7 AsyncCall調用轉為同步獲取
3.8 AsyncCall使用示例
3.9 整合dubbo、vert.x、resteasy 實現以同步編碼方式編寫非同步Http Restful服務
4. 壓測結果
5. 應用情況
1. 全非同步的必要性
無論是傳統的tomcat/jetty等web容器還是常用的RPC框架(例如dubbo),其編程模型都是使用線程池同步處理業務邏輯。假設服務A 調用服務B,服務B響應時間為1秒,那服務A的單機吞吐量完全取決於服務A的線程池大小(設定服務B的吞吐量足夠大)。如果線程池為500,那A的tps則為500,在必須達到單機10000tps的要求下,我們怎麼處理呢?將線程池大小設為10000嗎?顯然不可能:jvm的一個線程對應os內核的一個真實線程,且jvm為每個線程設置了默認1M的棧空間,線程越多,內存佔用越大,cpu上下文切換越頻繁;在某個臨界點後,線程數增加,吞吐量不會上升甚至會下降。這裡,非同步的作用就顯現出來了,若服務A非同步調用服務B,在請求發出去之後,服務A的線程就可以處理其他請求,在服務B結果回來之後,服務A再響應之前的請求,因此服務A只需要少量線程就可以同時處理眾多並發請求,A的吞吐量取決於自身cpu/網路/內存,與被調用者B無關。全非同步的意思是服務A在一個請求的處理過程中,所有的I/O操作(遠程調用、資料庫訪問、本地文件操作等)都是非阻塞的,線程要麼在接收新請求,要麼在執行業務,從不浪費在等待I/O上。
2. 開源HTTP非同步服務框架選型
常見的Http非同步服務框架有Vert.x、SpringWebFlux、Play Framework以及Akka等,考慮到成熟度、易用性、社區等因素我們選用Vert.x。關於Vert.x請參考:https://vertx.io/
2.1 Vert.x的線程模型
Vert.x底層由netty實現,netty有多種線程模型,由用戶自行選擇,vert.x使用的是非同步非阻塞多線程模型。首先讓我們來理解netty中EventLoop的概念,一個EventLoop是一個線程,用來處理若干連接上的所有事件。其基本思想如下面代碼所示:連接上來事件了就處理,否則就等待。
再看下Vert.x Netty服務端啟動的一個代碼片段:
NioEventLoopGroup代表一組EventLoop,parentGroup用於接收TCP連接,初始化參數,而childGroup用來處理其餘的I/O事件。一般parentGroup線程設置為1,childGroup線程數默認為cpu核數*2。在某些特定場景下 (比如百萬客戶端並發連接時) 一個AcceptEventLoop可能會不足,需要將parentGroup線程數加大。
Channel(連接)和 EventLoop是綁定的,即一旦連接被分配到某個EventLoop,其相關的I/O、編解碼都在同一個EventLoop中,這樣可以確保這些操作都是線程安全的,一個EventLoop可以關聯多個Channel,EventLoopGroup負責平均分配channel到所有的EventLoop。當有多個Channel在同一EventLoop下時,如果此EventLoop執行某個event慢了,則這個線程上的其他event將會出現堆積,因此在EventLoop上執行event (各自業務)一定不能阻塞(bio,線程sleep,特別費cpu計算等),請參考don`t block me https://vertx.io/docs/vertx-core/java/#_don_t_block_me。在此線程模型下vert.x只需要少量的線程就可以實現超高的吞吐量。
2.2 Vert.x開發的痛點
vert.x開發的痛點其實是所有非同步開發共同的痛點,那就是Callback Hell。業務處理過程中只要有I/O操作,對I/O的結果處理必須放在I/O的callback里,因此業務邏輯會被拆成一個個不連貫的代碼塊,處理過程中的上下文(例如局部變數)丟失了,而且異常處理也變得非常麻煩。
以下是一個Callback Hell的例子:
該代碼要完成的功能其實很簡單:通過EventBus給service1發送消息,成功後將結果發送給service2,成功後繼續將service2返回的結果發送給service3。可以看出,全非同步後,代碼難理解,不好維護,不符合大家日常的同步開發邏輯。大家習慣的代碼風格是這樣的:
怎樣才能做到以寫同步代碼方式實現非同步調用呢?
2.3 如何解決Vert.x痛點
應用Quasar fiber協程庫(http://docs.paralleluniverse.co/quasar/),協程是用戶級輕量線程,使用協程可以用同步編程方式達到或接近於純非同步的性能,而沒有非同步帶來的Callback Hell, 相比於jvm thread默認1M的堆棧,一個fiber內存佔用才400位元組,一個應用可以輕鬆創建百萬個fiber。
| Fiber如何實現
fiber執行在ForkJoinPool中,不佔用調用者原有線程,newFiber(()->{...}).start()將新建一個fiber,並設置fiber state為STARTED,然後將fiber task submit到ForkJoinPool中。當ForkJoinPool執行此task時調用fiber.exec()方法, fiber.exec()設置state為RUNNING,之後運行fiber.run(),fiber.run()中會調用我們提供的Runnable.run()。當run() 中出現fiber.park()時(使用FiberAsync在執行非同步代碼後就會調用)將拋出SuspendExecution異常,fiber.exec()方法會catch此異常並將state切換到WAITING, fiber task執行完畢。當fiber.unPark()執行時(使用FiberAsync時調用其asyncCompleted(result)方法就會調用)設置state為RUNNING,並將fiber task重新submit到forkJoinPool中,之後將使用Continuation繼續執行未執行的代碼。下圖為fiber和fiber ForkJoinTask的狀態圖:
| Fiber Continuation如何實現
應用啟動時需要設置javaagent(示例: java -server -Xmx1024m -javaagent:libquasar-core-0.7.7.jar), 此javaagent在classloading的時候找到Suspendable註解或帶有聲明SuspendExecution異常的方法織入(weave)一些指令,用於Fiber task調用棧的保存和恢復。
下面偽代碼解釋大概織入了哪些代碼:
可以看出,主要是在park()前保存相關的局部變數和pc,然後fiber恢復執行(unpark)的時候通過pc(switchcase 跳轉的程序計數器) jump到park()之後的代碼並恢復局部變數繼續執行。
3. 洋碼頭全非同步框架實現
洋碼頭全非同步框架基於Vert.x、Quasar Fiber、Dubbo、RestEasy等構建一套易用、高效的服務框架,使開發人員能以傳統同步編碼方式開發全非同步服務。支持任意非同步操作轉同步(非同步http,非同步dubbo,非同步資料庫訪問等);支持資料庫事務;整合dubbo暴露非同步Http Restful服務。
3.1 設計介面AsyncCall
AsyncCall 表明這是一個非同步調用的抽象,AbstractAsyncCall用於設置別名等,具體有多種非同步調用實現,DubboAsyncCall處理dubbo非同步,HttpAsyncCall處理http非同步,ProxyAsyncCall可以處理任意非同步,比如mongo操作,線程池等
start方法將開始非同步調用,返回值為FutureResult類型。FutureResult類圖如下:
Handler、AsyncResult、Future介面都是Vert.x定義的。
AsyncResult介面下有以下方法:
result():返回非同步請求結果。
cause():返回非同步請求異常。
successed():返回非同步請求是否成功。
failed:返回非同步請求是否失敗。
Future介面下有以下方法:
isComplete()返回是否完成(successed || failed)。
setHandler(Handler)方法則是設置非同步請求完成後下一步需要執行的處理器。
complete(T)將非同步請求標記為完成,並將result設置為T。
fail(Throwable)將非同步請求標記為失敗,並設置異常。
FutureResult類實現了Future介面;FutureResult使用示例:
a) 同步直接完成示例,此段代碼將列印「sync complete」。
b) 非同步完成示例,此段代碼將列印「async complete」。
3.2 實現DubboAsyncCall
dubbo非同步調用使用方法是:在consumer為需要非同步調用的方法設置 async="true",然後通過RpcContext獲取到Future。此Future為jdk concurrent介面,提供的get()獲取返回結果T。使用過Future的都知道get()會阻塞當前線程直到dubbo調用獲取到了返回值,這將導致當前線程還是在同步,因此我們的DubboAsyncCall需要解決此問題。
查看dubbo源碼發現通過RpcContext拿到的future的實現類是FutureAdapter,FutureAdapter適配了DefaultFuture對象,DefaultFuture對象有setCallback方法用來設置回調。只要netty channel獲取到返回值就會調用DefaultFuture.receive(Channel,Response),相應的就會調用此回調,因此我們通過強制設置callback來得到通知,大概代碼如下圖:
3.3 實現HttpAsyncCall
我們選用Apache HttpAsyncClient作為http請求客戶端,提供get、post、post json等多種請求發送方式,通過設置ResponseClass,將響應反序列化為對應對象。
另提供Retrofit將HTTP API轉化為java介面方式進行非同步http調用,由於Retrofit底層默認使用okHttp,而okHttp使用的是okio,okio是阻塞的,因此使用自定義RetrofitCallAdapter將底層調用改為我們的HttpAsyncCall實現。關於Retrofit,請參考:http://square.github.io/retrofit/。
3.4 實現ProxyAsyncCall
實現非常簡單,通過設置一個Consumer,給調用方提供一個futureResult,由調用方主動調用futureResult.complete(T)完成。
3.5 關係型資料庫非同步調用
基於vertx-mysql-postgresql-client包,此包依賴了mysql和postgresql非同步客戶端,在此基礎上設計Connection類, 適配了Vert.x SQLConnection,包裝多個非同步方法,比如:
insert、update、delete的操作:例如AsyncCall updateForAsyncCall(String sql,JsonArray params)。
select操作:例如,AsyncCall queryForAsyncCall(String sql, JsonArray params)。
連接級別的操作:例如close() ,commit(),rollback(),setAutoCommit(),setTransactionIsolation()等。
實現PlatformTranactionManager介面,以支持spring事務。
3.6 其他非關係型資料庫非同步調用
使用Vert.x提供的非同步客戶端(如MongoDb client,Redis client等),並結合ProxyAsyncCall主動非同步調用futureResult.complete()。
3.7 AsyncCall調用轉為同步獲取
設計好了AsyncCall介面以及多種非同步實現,下面我們開始將AsyncCall調用轉為同步獲取。如下圖代碼所示:
使用AsyncUtil.awaitResult(AsyncCall)將調用Sync.awaitResult(),上述代碼可以知道AsyncUtil.awaitResult中h其實是AsyncAdaptor對象,asyncCall.start()將開始發送非同步請求並得到futureResult,在非同步結果得到之前,fiber將停止執行,停止執行期間不阻塞線程。futureResult.setHandler(h)表示非同步結果完成後將調用此h.handle()方法,而handle方法中如果是successed則將調用fiberAsync中asyncCompleted() 告訴fiberAsync完成了,fiber將繼續執行接下來的代碼,AsyncAdaptor.run()將返回值給AsyncUtil.awaitResult的調用方。
AsyncUtil.awaitResult(AsyncCall... asyncCall) 組裝多個非同步調用並發執行,所有響應成功或者任一失敗就返回。
AsyncUtil.awaitResultUntilAllCompleted (AsyncCall... asyncCall) 也是多個非同步調用並發執行,等待所有 asyncCall都完成(成功或失敗都算完成)才返回。
至此我們提到的Vert.xCallback Hell示例代碼就可以更改為以下同步代碼。
AsyncCallFactory是用於生成AsyncCall的工廠,提供生成多種非同步調用的方法,上述代碼生成HttpAsyncClient的非同步調用。
3.8 AsyncCall使用示例
--- 遠程調用示例 ---
--- 資料庫示例 ---
3.9 整合dubbo、vert.x、resteasy 實現以同步編碼方式編寫非同步Http Restful服務
基於dubbo rest協議,新增vertx server, 由vertx 創建http服務,通過vertx的bodyHandler 整合resteasy dispatcher將請求分發至對應rest資源,使用ServiceHelper工具類驗證請求、包裝fiber調用業務邏輯、輸出響應結果。
dubbo聲明rest協議,使用vertx Server,設置dubbo consumer非同步調用 async="true"。
暴露rest介面。
使用ServiceHelper工具類封裝 fiber,輸出請求的響應等。
具體service實現, 非同步調用dubbo服務。
4. 壓測結果
針對以下3個介面進行壓測(CPUcore:8,4G內存):
/async?t=&k= 此介面非同步調用dubbo服務(dubbo服務端 sleep了 t ,返回k數據大小)。
/async3?t=&k= 此介面非同步調用dubbo服務,調用了三次,按順序調用,成功返回一個調用下一個。
/async3concurrent?t=&k= 此介面非同步調用dubbo服務,並發調用了三次,按最慢的返回。
壓測結果如下。可以看出,應用全非同步框架,cpu能被充分使用,系統單機吞吐量輕鬆上萬。
5. 應用情況
該全非同步框架被廣泛應用于洋碼頭需要高吞吐量的服務/場景,在節省物理IT資源的同時大幅度提升了業務峰值處理能力,例如:
門神。顧名思義,運行在外網與內網之間,提供類waf(Web Application Firewall)的安全防護
直接面向用戶的頂層Facade類服務。此類服務組裝/依賴眾多底層服務,輸出洋碼頭業務價值。
全文完
近期分享預告:
洋碼頭應用監控平台Falcon
洋碼頭交易系統演進
關注【洋碼頭技術】,第一時間獲取由洋碼頭一線工程師奉獻的技術分享推送吧。
TAG:洋碼頭技術 |