當前位置:
首頁 > 知識 > Flink數據源拆解分析(WikipediaEditsSource)

Flink數據源拆解分析(WikipediaEditsSource)

Wikipedia Edit Stream是Flink官網上的經典demo,功能是實時處理來自維基百科的消息,消息的內容是當前每個用戶對維基內容的操作,地址是:https://ci.apache.org/projects/flink/flink-docs-release-1.2/quickstart/run_example_quickstart.html

在demo中,WikipediaEditsSource類作為數據源負責向Flink提供實時消息,今天咱們一起來分析其源碼,了解Flink是怎麼獲取到來自Wiki的實時數據的,這對我們今後做自定義數據源也有很好的參考作用;

官方解釋

以下是官網對消息來源的說明,維基百科提供了一個IRC協議的通道,從這個通道可以獲取對維基百科所做的編輯行為的日誌:

Wikipedia provides an IRC channel where all edits to the wiki are logged.

1

IRC是應用層協議,更多細節請看:https://en.wikipedia.org/wiki/Internet_Relay_Chat

繼承關係

先看WikipediaEditsSource類的繼承關係,做個初步了解,如下圖:

Flink數據源拆解分析(WikipediaEditsSource)

打開今日頭條,查看更多圖片

如上圖所示,RichFunction介面負責資源開啟關閉以及環境上下文,而SourceFunction介面則是和數據生產行為的開始和停止有關,這些介面最終都在WikipediaEditSource實現;

構造方法

通過構造方法來了解有哪些參數被確定了:

//遠程連接的域名

public static final String DEFAULT_HOST = "irc.wikimedia.org";

//遠程連接的埠

public static final int DEFAULT_PORT = 6667;

//IRC協議的channel

public static final String DEFAULT_CHANNEL = "#en.wikipedia";

private final String host;

private final int port;

private final String channel;

public WikipediaEditsSource() {

this(DEFAULT_HOST, DEFAULT_PORT, DEFAULT_CHANNEL);

}

public WikipediaEditsSource(String host, int port, String channel) {

this.host = host;

this.port = port;

this.channel = Objects.requireNonNull(channel);

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

通過上述代碼可以見到,數據的來源是irc.wikimedia.org這個網址;

主業務代碼

主要的業務邏輯是WikipediaEditsSource的run方法,該方法在任務啟動的時候會被StreamSource.run方法調用:

@Override

public void run(SourceContext<WikipediaEditEvent> ctx) throws Exception {

try (WikipediaEditEventIrcStream ircStream = new WikipediaEditEventIrcStream(host, port)) {

// 創建一個IRC協議的連接

ircStream.connect();

//進入指定的channel

ircStream.join(channel);

try {

while (isRunning) {

//從阻塞隊列中獲取數據

WikipediaEditEvent edit = ircStream.getEdits().poll(100, TimeUnit.MILLISECONDS);

//如果取到了數據,就調用ctx.collect方法,將數據生產到Flink環境,給其他operator使用

if (edit != null) {

ctx.collect(edit);

}

}

} finally {

//結束時要向伺服器發送數據表示離開

ircStream.leave(channel);

}

}

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

上面的代碼,我們挑幾處重要的展開看一看;

和維基百科消息伺服器建立連接後做的事情

為了弄明白Flink是如何與維基百科的數據源建立連接的,先把ircStream.connect()這段代碼展開,對應的是IRCConnection類的connect方法:

public void connect() throws IOException {

if (level != 0) // otherwise disconnected or connect

throw new SocketException("Socket closed or already open ("+ level +")");

IOException exception = null;

Socket s = null;

for (int i = 0; i < ports.length && s == null; i++) {

try {

//建立的是普通Socket連接

s = new Socket(host, ports[i]);

exception = null;

} catch (IOException exc) {

if (s != null)

s.close();

s = null;

exception = exc;

}

}

if (exception != null)

throw exception; // connection wasn"t successful at any port

prepare(s);

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

上述代碼表明,Flink與維基百科的數據源伺服器之間建立的是普通的Socket連接,至於IRC協議,都是在這個Socket連接的通道里的一些讀寫操作;

上面的prepare方法比較關鍵,展開看看:

protected void prepare(Socket s) throws IOException {

if (s == null)

throw new SocketException("Socket s is null, not connected");

socket = s;

level = 1;

s.setSoTimeout(timeout);

in = new BufferedReader(new InputStreamReader(s.getInputStream(),

encoding));

out = new PrintWriter(new OutputStreamWriter(s.getOutputStream(),

encoding));

//IRCConnection是Thread的子類,執行start方法就表明會啟動一個線程來執行IRCConnection的run方法

start();

//遵守IRC協議約定,發送一些註冊相關的內容

register();

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

可以看出,prepare方法做了兩個重要的事情:啟動一個子線程、發送IRC協議的註冊信息,接下來看啟動的子線程做了什麼;

打開IRCConnection的run方法:

public void run() {

try {

String line;

while (!isInterrupted()) {

line = in.readLine();

if (line != null)

get(line);

else

close();

}

} catch (IOException exc) {

close();

}

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

run方法中的內容很簡單,就是讓這個子線程負責讀取遠端發送的字元串,每讀到一行就調用get方法去處理;

get方法的內容很多,做的事情是根據IRC協議解析這個字元串再做不同的處理,這裡我們只要關注下面這段,就是收到一條業務消息後如何處理:

//每當有人編輯了維基百科,這裡就會收到一條command為PRIVMSG的記錄

if (command.equalsIgnoreCase("PRIVMSG")) { // MESSAGE

IRCUser user = p.getUser();

String middle = p.getMiddle();

String trailing = p.getTrailing();

for (int i = listeners.length - 1; i >= 0; i--)

//調用listener的onPrivmsg方法

listeners[i].onPrivmsg(middle, user, trailing);

}

1

2

3

4

5

6

7

8

9

如上所示,每收到一條遠端發來的消息,都會調用listener的onPrivmsg方法,這裡的註冊的linstener是WikipediaIrcChannelListener對象;

打開WikipediaIrcChannelListener的onPrivmsg方法,看看收到消息後做了什麼:

@Override

public void onPrivmsg(String target, IRCUser user, String msg) {

LOG.debug("[{}] {}: {}.", target, user.getNick(), msg);

//根據消息構造一個WikipediaEditEvent對象,就是Flink的業務流程中用到的數據對象

WikipediaEditEvent event = WikipediaEditEvent.fromRawEvent(

System.currentTimeMillis(),

target,

msg);

if (event != null) {

//eidts是個阻塞隊列,WikipediaEditEvent被放入隊列

if (!edits.offer(event)) {

LOG.debug("Dropping message, because of full queue.");

}

}

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

上面的代碼已經分析把主要邏輯展現出來了,從Socket讀到的數據被解析成Flink實時計算時用到的WikipediaEditEvent對象後,被放入阻塞隊列中,這也就是負責讀取的子線程的主要工作了;

如何消費隊列中的數據

前面的分析中我們得知:收到的數據被放入了阻塞隊列中,現在回到WikipediaEditsSource的run方法再看看,這裡面就有從阻塞隊列取出數據的操作:

while (isRunning) {

//從阻塞隊列中獲取數據

WikipediaEditEvent edit = ircStream.getEdits().poll(100, TimeUnit.MILLISECONDS);

//如果取到了數據,就調用ctx.collect方法,將數據生產到Flink環境,給其他operator使用

if (edit != null) {

ctx.collect(edit);

}

}

1

2

3

4

5

6

7

8

如上所示,一個while循環不停的從阻塞隊列中獲取數據,取到了就調用SourceContext的collect,把一條數據生產到在Flink環境中,給後面的流程使用;

小結

至此,WikipediaEditsSource源碼的分析就完成了,在此小結一下:

和irc.wikimedia.org這個網站建立Socket連接;

連接建立後,讀寫相關的內容都是基於IRC協議的,這是個應用層的協議,有自己的格式、關鍵字、命令字等約定,本次分析中我們沒有花太多時間在這個協議上,有興趣的讀者在這裡了解更多:https://en.wikipedia.org/wiki/Internet_Relay_Chat

啟動一個子線程讀取Socket信息,收到數據後,構造成WikipediaEditEvent對象,放入阻塞隊列中;

原先的那個線程在一個while循環中從阻塞隊列中取數據,如果取到了數據就調用ctx.collect方法,這樣數據就生產到了Flink環境,其他operator就可以使用了;

以上就是拆解WikipediaEditsSource的過程,現在我們對Flink數據源有了更進一步的了解,後續在開發自定義數據源的時候也有了參考實現;

---------------------

作者:博陵精騎

原文:https://blog.csdn.net/boling_cavalry/article/details/85221446

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 程序員小新人學習 的精彩文章:

CSS實現點擊事件及實踐
後台返回json數據和前台解析json數據

TAG:程序員小新人學習 |