【Flink專題】Flink 應用
Apache Flink 是一個支持有狀態的計算的框架,它可以用來處理有邊界的數據流和無邊界的數據流。Flink 提供了多種不同抽象級別的API,並且提供對於常見的用例提供專用的函數庫。
一、為流式應用構建好的模塊
可以構建的並且被流式處理框架執行的應用類型是由框架是怎麼來控制流、狀態和事件來決定的。下面,我們將描述這些流式處理應用的構建塊(building blocks),並且解釋flink是怎麼處理他們的。
1、流(Streams)
很明顯,流是數據流處理的最基本的方面。然而,流的不同特性會影響這個流可以或者應該怎麼樣來處理。Flink是一個全能的處理框架,它可以處理任何種類的流。
有邊界的和無邊界的流:流可能是有邊界的或者無邊界的,比如固定大小的數據集。Flink具有專門特性來處理無邊屆的流,但是也有專門來處理有邊界的流的操作。
實時的和歷史的(Recorded)流:所有的數據是以流的形式產生的。有兩種方式處理數據:當它產生的時候實時的處理,或者把他持久化到一個存儲系統,比如文件系統或對象存儲,稍後再處理它。Flink應用可以處理這兩種流。
2、狀態(State)
每一個有意義的流式應用都是有狀態的,除非那些單獨轉換的事件不需要狀態。任何一個運行基本的業務邏輯的應用都需要記住事件或者中間結果以在後續的某一個時間點訪問他們,比如下一個事件被接收到或者某一個特定的時間段。
應用狀態是Flink的最優秀的特性。下面你可以看到Flink關於狀態處理的所有特性:
多狀態基元(Primitives):Flink為不同的數據結構都提供了狀態基元(primitives),比如原子的值、list或者map。開發者可以基於函數的訪問模型選擇最高效的狀態基元。
可插入式的狀態後端(Pluggable State Backends):應用的狀態是被可插入式的狀態後端管理和做檢查點的。Flink的特點是不同的狀態後端都存儲在內存或者RocksDB,RocksDB是一個非常高效的基於磁碟的內嵌數據存儲。常見的狀態後端也是可插入式的。
exactly-once狀態一致性:Flink的檢查點和回復演算法保證了在萬一失敗時應用狀態的一致性。因此,失敗可以非常容易的處理掉並且不影響應用的正確性。
非常大的的狀態信息:FLink可以保存幾TB的應用狀態信息,因為它是非同步的並且增量的檢查點演算法。
可伸縮的應用:Flink可以重新分配狀態到更多或者更少的工作節點,因此它支持有狀態的應用的伸縮。
3、Time
時間是流式應用的另一個重要的組成部分。大部分的事件流有其固有的時間語義,因為每一個時間都是在特定的時間點生產的。此外,非常多常見的流式計算都是基於時間的,比如窗口聚合、會話流程(sessionization)、模式檢測以及基於時間的關聯。流式處理的一個重要方面就是應用應該怎麼來控制(measures)時間,比如事件時間和處理時間的不同。
Flink提供了豐富的時間相關的特性:
Event-time Mode:使用event時間與依賴處理流的應用基於event的時間戳來計算結果。因此,無論是處理記錄好的event或者實時的event,事件時間處理允許精確的和保持一致性的結果;
Watermark Support:Flink在event-time應用中使用水印(wartermark)來處理(reason)時間。對於權衡延時和結果的計算來說,watermark是一個靈活的機制。
Late Data Handling:當使用watermark並且以event-time模式處理stream的時候,很有可能在一些相關的event到達之前,計算已經完成了。這些event被稱作遲到的event。Flink提供了多種特性來處理late event,比如通過側輸出重新路由他們( rerouting them via side outputs),然後更新先前完成的結果。
Processing-time Mode:除了event-time模式之外,Flink也支持處理時間的語義,它是通過正在執行的機器的時鐘時間來觸發計算的執行的。處理時間模式適用於對於可以忍受近似結果的有需求的低延時的應用。
二、分層的API
Flink提供了三層API,每層API針對不同的用例,在簡潔性和表達性之間提供了不同的權衡。
下面我們簡要的介紹每一個API,討論他的應用程序並且展示代碼示例。
1、處理函數(ProcessFunctions)
處理函數 是Flink提供的最具有表現力的介面。Flink 提供的ProcessFuntion是用來處理來自於一個或者兩個輸入流或者一段時間窗內的聚合的獨立event。ProcessFunction提供了對時間和狀態的細粒度控制。一個ProcessFunction可以任意的修改它的狀態,並且也可以註冊定時器在將來觸發一個回調函數。因此,ProcessFunction可以根據需要為有很多有狀態的事件驅動的應用實現複雜的單事件業務邏輯。
下面的例子展示了用來操作KeyedStream並且可以匹配START 和 END 的KeyedProcessFunction,這個函數會記錄他的狀態的時間戳,並且在四小時之內註冊一個定時器。如果在定時器執行之前接收到END event,這個函數會計算END和START這段區間,清空狀態並且返回值。另外,這個定時器僅僅觸發來清空狀態。
/** * Matches keyed START and END events and computes the difference between * both elements" timestamps. The first String field is the key attribute, * the second String attribute marks START and END events. */public static class StartEndDuration extends KeyedProcessFunction, Tuple2> { private ValueState startTime; @Override public void open(Configuration conf) { // obtain state handle startTime = getRuntimeContext() .getState(new ValueStateDescriptor("startTime", Long.class)); } /** Called for each processed event. */ @Override public void processElement( Tuple2 in, Context ctx, Collector> out) throws Exception { switch (in.f1) { case "START": // set the start time if we receive a start event. startTime.update(ctx.timestamp()); // register a timer in four hours from the start event. ctx.timerService() .registerEventTimeTimer(ctx.timestamp() + 4 * 60 * 60 * 1000); break; case "END": // emit the duration between start and end event Long sTime = startTime.value(); if (sTime != null) { out.collect(Tuple2.of(in.f0, ctx.timestamp() - sTime)); // clear the state startTime.clear(); } default: // do nothing } } /** Called when a timer fires. */ @Override public void onTimer( long timestamp, OnTimerContext ctx, Collector> out) { // Timeout interval exceeded. Cleaning up the state. startTime.clear(); }}
這個例子說明了KeyedProcessFunction的表現力,但是也強調了它是一個相當冗長的介面。
2、DataStream API
DataStream API為很多常用的流式計算操作提供了基元,比如窗口(windowing)、記錄的轉換(record-at-a-time transformations),並且通過查詢外部存儲來豐富event。DataStream API對於Java和Scala都是可用的,並且它是基於函數的,比如map()、reduce()以及aggregate()。函數可以通過擴展介面或者Java或Scala的lambda表達式來定義。
下例展示了如果對點擊流進行會話處理,並且計算每個會話的點擊次數。
DataStream clicks = ...DataStream> result = clicks // project clicks to userId and add a 1 for counting .map( // define function by implementing the MapFunction interface. new MapFunction>() { @Override public Tuple2 map(Click click) { return Tuple2.of(click.userId, 1L); } }) // key by userId (field 0) .keyBy(0) // define session window with 30 minute gap .window(EventTimeSessionWindows.withGap(Time.minutes(30L))) // count clicks per session. Define function as lambda function. .reduce((a, b) -> Tuple2.of(a.f0, a.f1 + b.f1));
3、SQL & Table API
Flink提供了兩種關係型的API,Table API 和 SQL 。對於批處理和流處理來說,這兩種API是一致的,比如無邊界的實時的流或者有邊界的記錄好的流產生相同的結果,都是使用相同的語義來執行查詢。Table API 和 SQL 使用 Apache Calcite 進行轉換、校驗和查詢優化。他們可以無縫的與DataStream和DataSet API結合,並且支持用戶定義的分層級的(scalar)、聚合的、表值(table-value)類型的函數。
Flink的關係型API目的是為了簡化數據分析、數據流水(data pipeline)以及ETL應用的定義。
下面的例子展示了會話處理點擊流並且計算每個會話的點擊數量的SQL 查詢語句。這是與DataStream API例子中相同的場景。
SELECT userId, COUNT(*)FROM clicksGROUP BY SESSION(clicktime, INTERVAL "30" MINUTE), userId
三、函數庫(Libraries)
對於通常的數據處理用例,FLink提供了幾種函數庫。這些函數庫通常嵌入在API中,而不是完全自包含的。因此,他們可以在API的所有特性中獲益,並且與其他函數庫集成。
複雜事件處理(CEP):對於事件流來說,模式檢測是一個非常常見的用例。Flink』s CEP library provides an API to specify patterns of events (think of regular expressions or state machines). The CEP library is integrated with Flink』s DataStream API, such that patterns are evaluated on DataStreams. Applications for the CEP library include network intrusion detection, business process monitoring, and fraud detection.
DataSet API:The DataSet API is Flink』s core API for batch processing applications. The primitives of the DataSet API includemap,reduce,(outer) join,co-group, anditerate. All operations are backed by algorithms and data structures that operate on serialized data in memory and spill to disk if the data size exceed the memory budget. The data processing algorithms of Flink』s DataSet API are inspired by traditional database operators, such as hybrid hash-join or external merge-sort.
Gelly:Gelly is a library for scalable graph processing and analysis. Gelly is implemented on top of and integrated with the DataSet API. Hence, it benefits from its scalable and robust operators. Gelly features built-in algorithms, such as label propagation, triangle enumeration, and page rank, but provides also a Graph API that eases the implementation of custom graph algorithms.
TAG:一線城市 |