Flink數據源拆解分析(WikipediaEditsSource)
Wikipedia Edit Stream是Flink官網上的經典demo,功能是實時處理來自維基百科的消息,消息的內容是當前每個用戶對維基內容的操作,地址是:https://ci.apache.org/projects/flink/flink-docs-release-1.2/quickstart/run_example_quickstart.html
在demo中,WikipediaEditsSource類作為數據源負責向Flink提供實時消息,今天咱們一起來分析其源碼,了解Flink是怎麼獲取到來自Wiki的實時數據的,這對我們今後做自定義數據源也有很好的參考作用;
官方解釋
以下是官網對消息來源的說明,維基百科提供了一個IRC協議的通道,從這個通道可以獲取對維基百科所做的編輯行為的日誌:
Wikipedia provides an IRC channel where all edits to the wiki are logged.
1
IRC是應用層協議,更多細節請看:https://en.wikipedia.org/wiki/Internet_Relay_Chat
繼承關係
先看WikipediaEditsSource類的繼承關係,做個初步了解,如下圖:
打開今日頭條,查看更多圖片如上圖所示,RichFunction介面負責資源開啟關閉以及環境上下文,而SourceFunction介面則是和數據生產行為的開始和停止有關,這些介面最終都在WikipediaEditSource實現;
構造方法
通過構造方法來了解有哪些參數被確定了:
//遠程連接的域名
public static final String DEFAULT_HOST = "irc.wikimedia.org";
//遠程連接的埠
public static final int DEFAULT_PORT = 6667;
//IRC協議的channel
public static final String DEFAULT_CHANNEL = "#en.wikipedia";
private final String host;
private final int port;
private final String channel;
public WikipediaEditsSource() {
this(DEFAULT_HOST, DEFAULT_PORT, DEFAULT_CHANNEL);
}
public WikipediaEditsSource(String host, int port, String channel) {
this.host = host;
this.port = port;
this.channel = Objects.requireNonNull(channel);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
通過上述代碼可以見到,數據的來源是irc.wikimedia.org這個網址;
主業務代碼
主要的業務邏輯是WikipediaEditsSource的run方法,該方法在任務啟動的時候會被StreamSource.run方法調用:
@Override
public void run(SourceContext<WikipediaEditEvent> ctx) throws Exception {
try (WikipediaEditEventIrcStream ircStream = new WikipediaEditEventIrcStream(host, port)) {
// 創建一個IRC協議的連接
ircStream.connect();
//進入指定的channel
ircStream.join(channel);
try {
while (isRunning) {
//從阻塞隊列中獲取數據
WikipediaEditEvent edit = ircStream.getEdits().poll(100, TimeUnit.MILLISECONDS);
//如果取到了數據,就調用ctx.collect方法,將數據生產到Flink環境,給其他operator使用
if (edit != null) {
ctx.collect(edit);
}
}
} finally {
//結束時要向伺服器發送數據表示離開
ircStream.leave(channel);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
上面的代碼,我們挑幾處重要的展開看一看;
和維基百科消息伺服器建立連接後做的事情
為了弄明白Flink是如何與維基百科的數據源建立連接的,先把ircStream.connect()這段代碼展開,對應的是IRCConnection類的connect方法:
public void connect() throws IOException {
if (level != 0) // otherwise disconnected or connect
throw new SocketException("Socket closed or already open ("+ level +")");
IOException exception = null;
Socket s = null;
for (int i = 0; i < ports.length && s == null; i++) {
try {
//建立的是普通Socket連接
s = new Socket(host, ports[i]);
exception = null;
} catch (IOException exc) {
if (s != null)
s.close();
s = null;
exception = exc;
}
}
if (exception != null)
throw exception; // connection wasn"t successful at any port
prepare(s);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
上述代碼表明,Flink與維基百科的數據源伺服器之間建立的是普通的Socket連接,至於IRC協議,都是在這個Socket連接的通道里的一些讀寫操作;
上面的prepare方法比較關鍵,展開看看:
protected void prepare(Socket s) throws IOException {
if (s == null)
throw new SocketException("Socket s is null, not connected");
socket = s;
level = 1;
s.setSoTimeout(timeout);
in = new BufferedReader(new InputStreamReader(s.getInputStream(),
encoding));
out = new PrintWriter(new OutputStreamWriter(s.getOutputStream(),
encoding));
//IRCConnection是Thread的子類,執行start方法就表明會啟動一個線程來執行IRCConnection的run方法
start();
//遵守IRC協議約定,發送一些註冊相關的內容
register();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
可以看出,prepare方法做了兩個重要的事情:啟動一個子線程、發送IRC協議的註冊信息,接下來看啟動的子線程做了什麼;
打開IRCConnection的run方法:
public void run() {
try {
String line;
while (!isInterrupted()) {
line = in.readLine();
if (line != null)
get(line);
else
close();
}
} catch (IOException exc) {
close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
run方法中的內容很簡單,就是讓這個子線程負責讀取遠端發送的字元串,每讀到一行就調用get方法去處理;
get方法的內容很多,做的事情是根據IRC協議解析這個字元串再做不同的處理,這裡我們只要關注下面這段,就是收到一條業務消息後如何處理:
//每當有人編輯了維基百科,這裡就會收到一條command為PRIVMSG的記錄
if (command.equalsIgnoreCase("PRIVMSG")) { // MESSAGE
IRCUser user = p.getUser();
String middle = p.getMiddle();
String trailing = p.getTrailing();
for (int i = listeners.length - 1; i >= 0; i--)
//調用listener的onPrivmsg方法
listeners[i].onPrivmsg(middle, user, trailing);
}
1
2
3
4
5
6
7
8
9
如上所示,每收到一條遠端發來的消息,都會調用listener的onPrivmsg方法,這裡的註冊的linstener是WikipediaIrcChannelListener對象;
打開WikipediaIrcChannelListener的onPrivmsg方法,看看收到消息後做了什麼:
@Override
public void onPrivmsg(String target, IRCUser user, String msg) {
LOG.debug("[{}] {}: {}.", target, user.getNick(), msg);
//根據消息構造一個WikipediaEditEvent對象,就是Flink的業務流程中用到的數據對象
WikipediaEditEvent event = WikipediaEditEvent.fromRawEvent(
System.currentTimeMillis(),
target,
msg);
if (event != null) {
//eidts是個阻塞隊列,WikipediaEditEvent被放入隊列
if (!edits.offer(event)) {
LOG.debug("Dropping message, because of full queue.");
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
上面的代碼已經分析把主要邏輯展現出來了,從Socket讀到的數據被解析成Flink實時計算時用到的WikipediaEditEvent對象後,被放入阻塞隊列中,這也就是負責讀取的子線程的主要工作了;
如何消費隊列中的數據
前面的分析中我們得知:收到的數據被放入了阻塞隊列中,現在回到WikipediaEditsSource的run方法再看看,這裡面就有從阻塞隊列取出數據的操作:
while (isRunning) {
//從阻塞隊列中獲取數據
WikipediaEditEvent edit = ircStream.getEdits().poll(100, TimeUnit.MILLISECONDS);
//如果取到了數據,就調用ctx.collect方法,將數據生產到Flink環境,給其他operator使用
if (edit != null) {
ctx.collect(edit);
}
}
1
2
3
4
5
6
7
8
如上所示,一個while循環不停的從阻塞隊列中獲取數據,取到了就調用SourceContext的collect,把一條數據生產到在Flink環境中,給後面的流程使用;
小結
至此,WikipediaEditsSource源碼的分析就完成了,在此小結一下:
和irc.wikimedia.org這個網站建立Socket連接;
連接建立後,讀寫相關的內容都是基於IRC協議的,這是個應用層的協議,有自己的格式、關鍵字、命令字等約定,本次分析中我們沒有花太多時間在這個協議上,有興趣的讀者在這裡了解更多:https://en.wikipedia.org/wiki/Internet_Relay_Chat
啟動一個子線程讀取Socket信息,收到數據後,構造成WikipediaEditEvent對象,放入阻塞隊列中;
原先的那個線程在一個while循環中從阻塞隊列中取數據,如果取到了數據就調用ctx.collect方法,這樣數據就生產到了Flink環境,其他operator就可以使用了;
以上就是拆解WikipediaEditsSource的過程,現在我們對Flink數據源有了更進一步的了解,後續在開發自定義數據源的時候也有了參考實現;
---------------------
作者:博陵精騎
原文:https://blog.csdn.net/boling_cavalry/article/details/85221446
※CSS實現點擊事件及實踐
※後台返回json數據和前台解析json數據
TAG:程序員小新人學習 |