Flink-Table-SQL系列之source
source作為Table&SQL API的數據源,同時也是程序的入口。當前Flink的Table&SQL API整體而言支持三種source:Table source、DataSet以及DataStream,它們都通過特定的API註冊到Table環境對象。
我們先來看Table source,它直接以表對象作為source。這裡的表對象可細分為:
Flink以Table類定義的關係表對象,通過TableEnvironment的registerTable方法註冊;
外部source經過橋接而成的表對象,基礎抽象為TableSource,通過具體環境對象的registerTableSource;
下圖展示了,Table source被註冊時,對應的內部轉化圖(虛線表示對應關係):
由上圖可見,不管是直接註冊Table對象還是註冊外部source,在內部都直接對應了特定的XXXTable對象。
TableSource trait針對Streaming和Batch分部擴展有兩個trait,它們是StreamTableSource和BatchTableSource,它們各自都提供了從數據源轉換為核心對象(DataStream跟DataSource)的方法。
除了這三個基本的trait之外,還有一些特定對source的需求以獨立的trait提供以方便實現者自行組合,比如ProjectableTableSource這一trait,它支持將Projection下推(push-down)到TableSource。Flink內置實現的CsvTableSource就繼承了這一trait。
當前Flink所支持的TableSource大致上分為兩類:
CsvTableSouce:同時可用於Batch跟Streaming模式;
kafka系列TableSource:包含Kafka的各個版本(0.8,0.9,0.10)以及各種不同的格式(Json、Avro),基本上它們只支持Streaming模式,它們都依賴於各種kafka的connector;
使用方式如下:
// specify JSON field names and types val typeInfo = Types.ROW( Array("id", "name", "score"), Array(Types.INT, Types.STRING, Types.DOUBLE) ) val kafkaTableSource = new Kafka08JsonTableSource( kafkaTopic, kafkaProperties, typeInfo) tableEnvironment.registerTableSource("kafka-source", kafkaTableSource);
CsvTableSource的構建方式如下:
val csvTableSource = CsvTableSource .builder .path("/path/to/your/file.csv") .field("name", Types.STRING) .field("id", Types.INT) .field("score", Types.DOUBLE) .field("comments", Types.STRING) .fieldDelimiter("#") .lineDelimiter("$") .ignoreFirstLine .ignoreParseErrors .commentPrefix("%")
除了以TableSource作為Table&SQL的source,還支持通過特定的環境對象直接註冊DataStream、DataSet。註冊DataStream的示例如下:
val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val cust = env.fromElements(...) val ord = env.fromElements(...) // register the DataStream cust as table "Customers" with fields derived from the datastream tableEnv.registerDataStream("Customers", cust) // register the DataStream ord as table "Orders" with fields user, product, and amount tableEnv.registerDataStream("Orders", ord, user, product, amount)
註冊DataSet的示例如下:
val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val cust = env.fromElements(...) val ord = env.fromElements(...) // register the DataSet cust as table "Customers" with fields derived from the dataset tableEnv.registerDataSet("Customers", cust) // register the DataSet ord as table "Orders" with fields user, product, and amount tableEnv.registerDataSet("Orders", ord, user, product, amount)
以上,通過調用環境對象的registerXXX方法是一種顯式註冊的方式,除此之外,還有隱式註冊方式。隱式註冊方式,通過對DataStream跟DataSet對象增加的toTable方法來實現,使用方式示例如下:
val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // read a DataSet from an external source val ds: DataSet[(Long, String, Integer)] = env.readCsvFile(...) val table = ds.toTable(tableEnv, user, product, amount) val result = tableEnv.sql( s"SELECT SUM(amount) FROM $table WHERE product LIKE %Rubber% ")
我們知道DataStream跟DataSet原先是沒有toTable API的,如何為它們增加該API的呢?答案是利用了Scala的包對象(package object),該特性主要用於兼容舊版本的庫或對某些類型的API進行增強。具體而言,toTable API其實是實現在DataSetConversions和DataStreamConversions兩個類中,然後在包對象中對他們進行實例化。而定位到toTable的實現時,會看到它們其實是間接調用了特定環境對象的fromDataStream/fromDataSet方法並將當前的DataStream跟DataSet傳遞給這兩個方法並通過方法返回得到Table對象。fromDataStream/fromDataSet方法對在實現時會調用跟registerDataStream/registerDataSet方法對相同的內部註冊方法。
fromDataStream/fromDataSet方法通常主要的場景在於為DataStream/DataSet轉換為Table對象提供便利,它本身也進行了隱式註冊。然而,你也可以對得到的Table對象,再次調用registerTable來進行顯式註冊,不過通常沒有必要。
因此,綜合而言,註冊DataStream跟DataSet的對應關係如下:
以上我們已經分析了所有的Table source的註冊方式,有多種register系列方法並最終對應了內部各種XXXTable對象。稍顯混亂,其實這些XXXTable對象是有聯繫的,並且所有的register系列方法最終都調用了TableEnvironment的registerTableInternal方法。因此其實註冊Table source的內部原理是一致的,我們來分析一下。
TableEnvironment內部會以一個SchemaPlus類型的數據結構,它是Calcite中的數據結構,用來存儲被註冊的表、函數等在內的一系列對象(這些對象統稱為Calcite中的Schema)。由此可見它無法直接接受Flink自定義的類似於TableSouce這樣的對象,那麼這裡存在一個問題就是兩個框架的銜接問題。這就是Flink定義那麼多內部XXXTable類型的原因之一,我們來梳理一下它們之間的關係如下:
上圖中的XXXTable對象同時以括弧標明了在註冊時它是由什麼對象轉化而來。
※探索 Swift 4 中新的 String API
※R 非線性回歸入門——以 osf.io/79xtn 為例
※kubelet 源碼分析:statusManager和probeManager
※營銷的秘密:消費者想買的並不是產品
※應用沉睡之時:後台傳輸服務
TAG:推酷 |
※Flink數據源拆解分析(WikipediaEditsSource)
※從Spark Streaming到Apache Flink: 實時數據流在愛奇藝的演進
※Flink與Spark Streaming在與kafka結合的區別!
※Flink China 上海站Meetup 小小小總結
※開源實時數據處理系統Pulsar:一套搞定Kafka+Flink+DB
※一文讀懂Apache Flink技術
※Apache Flink 數據流編程模型
※【Flink專題】Flink 應用
※KSQL與Flink SQL的比較
※阿里「豪擲」9000萬歐元收購 Flink 創始公司Data Artisans
※Apache Flink改進批次作業恢復功能
※360深度實踐:Flink與Storm協議級對比
※用Flink取代JStorm,今日頭條的遷移過程與後續計劃
※技術分享:Flink 基本的 API
※從Storm到Flink:大數據處理的開源系統及編程模型
※Spark or Flink?小孩子才做選擇題?我全都要!
※Spark比拼Flink:下一代大數據計算引擎之爭,誰主沉浮?
※首次嘗試Flink的一些 感受
※大數據「重磅炸彈」:實時計算框架 Flink
※為什麼Flink會成為下一代大數據處理框架的標準?