當前位置:
首頁 > 最新 > Flink-Table-SQL系列之source

Flink-Table-SQL系列之source

source作為Table&SQL API的數據源,同時也是程序的入口。當前Flink的Table&SQL API整體而言支持三種source:Table source、DataSet以及DataStream,它們都通過特定的API註冊到Table環境對象。

我們先來看Table source,它直接以表對象作為source。這裡的表對象可細分為:

Flink以Table類定義的關係表對象,通過TableEnvironment的registerTable方法註冊;

外部source經過橋接而成的表對象,基礎抽象為TableSource,通過具體環境對象的registerTableSource;

下圖展示了,Table source被註冊時,對應的內部轉化圖(虛線表示對應關係):

由上圖可見,不管是直接註冊Table對象還是註冊外部source,在內部都直接對應了特定的XXXTable對象。

TableSource trait針對Streaming和Batch分部擴展有兩個trait,它們是StreamTableSource和BatchTableSource,它們各自都提供了從數據源轉換為核心對象(DataStream跟DataSource)的方法。

除了這三個基本的trait之外,還有一些特定對source的需求以獨立的trait提供以方便實現者自行組合,比如ProjectableTableSource這一trait,它支持將Projection下推(push-down)到TableSource。Flink內置實現的CsvTableSource就繼承了這一trait。

當前Flink所支持的TableSource大致上分為兩類:

CsvTableSouce:同時可用於Batch跟Streaming模式;

kafka系列TableSource:包含Kafka的各個版本(0.8,0.9,0.10)以及各種不同的格式(Json、Avro),基本上它們只支持Streaming模式,它們都依賴於各種kafka的connector;

使用方式如下:

// specify JSON field names and types val typeInfo = Types.ROW( Array("id", "name", "score"), Array(Types.INT, Types.STRING, Types.DOUBLE) ) val kafkaTableSource = new Kafka08JsonTableSource( kafkaTopic, kafkaProperties, typeInfo) tableEnvironment.registerTableSource("kafka-source", kafkaTableSource);

CsvTableSource的構建方式如下:

val csvTableSource = CsvTableSource .builder .path("/path/to/your/file.csv") .field("name", Types.STRING) .field("id", Types.INT) .field("score", Types.DOUBLE) .field("comments", Types.STRING) .fieldDelimiter("#") .lineDelimiter("$") .ignoreFirstLine .ignoreParseErrors .commentPrefix("%")

除了以TableSource作為Table&SQL的source,還支持通過特定的環境對象直接註冊DataStream、DataSet。註冊DataStream的示例如下:

val env = StreamExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val cust = env.fromElements(...) val ord = env.fromElements(...) // register the DataStream cust as table "Customers" with fields derived from the datastream tableEnv.registerDataStream("Customers", cust) // register the DataStream ord as table "Orders" with fields user, product, and amount tableEnv.registerDataStream("Orders", ord, user, product, amount)

註冊DataSet的示例如下:

val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) val cust = env.fromElements(...) val ord = env.fromElements(...) // register the DataSet cust as table "Customers" with fields derived from the dataset tableEnv.registerDataSet("Customers", cust) // register the DataSet ord as table "Orders" with fields user, product, and amount tableEnv.registerDataSet("Orders", ord, user, product, amount)

以上,通過調用環境對象的registerXXX方法是一種顯式註冊的方式,除此之外,還有隱式註冊方式。隱式註冊方式,通過對DataStream跟DataSet對象增加的toTable方法來實現,使用方式示例如下:

val env = ExecutionEnvironment.getExecutionEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // read a DataSet from an external source val ds: DataSet[(Long, String, Integer)] = env.readCsvFile(...) val table = ds.toTable(tableEnv, user, product, amount) val result = tableEnv.sql( s"SELECT SUM(amount) FROM $table WHERE product LIKE %Rubber% ")

我們知道DataStream跟DataSet原先是沒有toTable API的,如何為它們增加該API的呢?答案是利用了Scala的包對象(package object),該特性主要用於兼容舊版本的庫或對某些類型的API進行增強。具體而言,toTable API其實是實現在DataSetConversions和DataStreamConversions兩個類中,然後在包對象中對他們進行實例化。而定位到toTable的實現時,會看到它們其實是間接調用了特定環境對象的fromDataStream/fromDataSet方法並將當前的DataStream跟DataSet傳遞給這兩個方法並通過方法返回得到Table對象。fromDataStream/fromDataSet方法對在實現時會調用跟registerDataStream/registerDataSet方法對相同的內部註冊方法。

fromDataStream/fromDataSet方法通常主要的場景在於為DataStream/DataSet轉換為Table對象提供便利,它本身也進行了隱式註冊。然而,你也可以對得到的Table對象,再次調用registerTable來進行顯式註冊,不過通常沒有必要。

因此,綜合而言,註冊DataStream跟DataSet的對應關係如下:

以上我們已經分析了所有的Table source的註冊方式,有多種register系列方法並最終對應了內部各種XXXTable對象。稍顯混亂,其實這些XXXTable對象是有聯繫的,並且所有的register系列方法最終都調用了TableEnvironment的registerTableInternal方法。因此其實註冊Table source的內部原理是一致的,我們來分析一下。

TableEnvironment內部會以一個SchemaPlus類型的數據結構,它是Calcite中的數據結構,用來存儲被註冊的表、函數等在內的一系列對象(這些對象統稱為Calcite中的Schema)。由此可見它無法直接接受Flink自定義的類似於TableSouce這樣的對象,那麼這裡存在一個問題就是兩個框架的銜接問題。這就是Flink定義那麼多內部XXXTable類型的原因之一,我們來梳理一下它們之間的關係如下:

上圖中的XXXTable對象同時以括弧標明了在註冊時它是由什麼對象轉化而來。

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 推酷 的精彩文章:

探索 Swift 4 中新的 String API
R 非線性回歸入門——以 osf.io/79xtn 為例
kubelet 源碼分析:statusManager和probeManager
營銷的秘密:消費者想買的並不是產品
應用沉睡之時:後台傳輸服務

TAG:推酷 |

您可能感興趣

Flink數據源拆解分析(WikipediaEditsSource)
從Spark Streaming到Apache Flink: 實時數據流在愛奇藝的演進
Flink與Spark Streaming在與kafka結合的區別!
Flink China 上海站Meetup 小小小總結
開源實時數據處理系統Pulsar:一套搞定Kafka+Flink+DB
一文讀懂Apache Flink技術
Apache Flink 數據流編程模型
【Flink專題】Flink 應用
KSQL與Flink SQL的比較
阿里「豪擲」9000萬歐元收購 Flink 創始公司Data Artisans
Apache Flink改進批次作業恢復功能
360深度實踐:Flink與Storm協議級對比
用Flink取代JStorm,今日頭條的遷移過程與後續計劃
技術分享:Flink 基本的 API
從Storm到Flink:大數據處理的開源系統及編程模型
Spark or Flink?小孩子才做選擇題?我全都要!
Spark比拼Flink:下一代大數據計算引擎之爭,誰主沉浮?
首次嘗試Flink的一些 感受
大數據「重磅炸彈」:實時計算框架 Flink
為什麼Flink會成為下一代大數據處理框架的標準?