當前位置:
首頁 > 知識 > Storm源碼分析之Trident源碼分析

Storm源碼分析之Trident源碼分析

@(STORM)[storm]

  • Storm源碼分析之四 Trident源碼分析
  • 一概述
  • 0小結
  • 1簡介
  • 2關鍵類
  • 1Spout的創建
  • 2spout的消息流
  • 3spout調用的整體流程
  • 4spout如何被 載入到拓撲中
  • 二Spout
  • 一Spout的創建
  • 1ItridentSpout
  • 2BatchCoordinator
  • 3Emmitter
  • 4一個示例
  • 二spout實際的消息流
  • 1MasterBatchCoordinator
  • 2TridentSpoutCoordinator
  • 3TridentSpoutExecutor
  • 三bolt
  • 一概述
  • 1組件的基本關係
  • 2用戶視角與源碼視角
  • 二基礎類
  • 1Stream
  • 1成員變數
  • 2projectionValidation
  • 3project
  • 2Node SpoutNode PartitionNode ProcessorNode
  • 詳細分析見書
  • 3Group
  • 1成員變數
  • 2構造方法
  • 3outgoingNodes
  • 4incommingNodes
  • 4GraphGrouper
  • 1成員變數
  • 2構造方法
  • 3reindex
  • 4nodeGroup
  • 5outgoingGroups
  • 6incomingGroups
  • 7merge
  • 8mergeFully
  • 四在TridentTopologyBuilder中設置Spoutbolt
  • 一參考內容
  • 一概述
  • 二基礎類
  • 1GlobalStreamId
  • 三TridentTopology
  • 1生成bolt的名稱genBoltIds
  • 2添加節點addNode
  • 3添加節點addSourceNode
  • 四TridentTopologyBuilder

一、概述

0、小結

TridentTopologyBuilder與TridentTopology調用MBC/TSC/TSE設置spout與2個bolt,而這三個類通過調用用戶代碼Spout中定義的Coordinator與Emitter完成真正的邏輯。

最後構建好的拓撲會提交至nimbus,nimbus開始調度這個拓撲,開始運行。

1、簡介

trident是storm的更高層次抽象,相對storm,它主要提供了3個方面的好處:

(1)提供了更高層次的抽象,將常用的count,sum等封裝成了方法,可以直接調用,不需要自己實現。

(2)以批次代替單個元組,每次處理一個批次的數據。

(3)提供了事務支持,可以保證數據均處理且只處理了一次。

本文介紹了在一個Trident拓撲中,spout是如何被產生並被調用的。首先介紹了用戶如何創建一個Spout以及其基本原理,然後介紹了Spout的實際數據流,最後解釋了在創建topo時如何設置一個Spout。

2、關鍵類

MaterBatchCorodeinator —————> ITridentSpout.Coordinator#isReady

|

|

v

TridentSpoutCoordinator —————> ITridentSpout.Coordinator#[initialTransaction, success, close]

|

|

v

TridentSpoutExecutor —————> ITridentSpout.Emitter#(emitBatch, success(),close)

Spout中涉及2組類,第一組類定義了用戶如何創建一個Spout,這些用戶的代碼會被第二組的類調用。第二組類定義了實際的數據流是如何發起並傳送的。

(1)Spout的創建

涉及三個類:ItridentSpout, BatchCoordinator, Emitter,其中後面2個是第一個的內部類。

用戶創建一個Spout需要實現上述三個介面。比如storm-kafka中的Spout就是實現了這3個介面或者其子介面。

(2)spout的消息流

也是涉及三個類:MasterBatchCoordinator, TridentSpoutCoordinator, TridentSpoutExecutor。它們除了自身固定的邏輯以外,還會調用用戶的代碼,就是上面介紹的Spout代碼。

它們的定義分別為:

MasterBatchCoordinator extends BaseRichSpout

TridentSpoutCoordinator implements IBasicBolt

TridentSpoutExecutor implements ITridentBatchBolt

  • 1
  • 2
  • 3
  • 4

可以看出來,MasterBatchCoordinator才是真正的spout,另外2個都是bolt。

MasterBatchCoordinator會調用用戶定義的BatchCoordinator的isReady()方法,返回true的話,則會發送一個id為batch的消息流,從而開始一個數據流轉。TridentSpoutCoordinator接到MBC的

batch的消息流,從而開始一個數據流轉。TridentSpoutCoordinator接到MBC的batch流後,會調用BatchCoordinator的initialTransaction()初始化一個消息,並繼續向外發送 batch流。TridentSpoutExecutor接到

batch流。TridentSpoutExecutor接到batch流後,會調用用戶代碼中的TridentSpoutExecutor#emitBatch()方法,開始發送實際的業務數據。

3、spout調用的整體流程

1、MasterBatchCoordinator是Trident中真正的Spout,它可以包含多個TridentSpoutCoordinator的節點。MBC在nextTuple()中向外發送id為batch的流,作為整個數據流的起點。MBC會先判斷正在處理的事務數是否少於

m

axTransactionActive,是的話就繼續向外發送

batch的流,作為整個數據流的起點。MBC會先判斷正在處理的事務數是否少於maxTransactionActive,是的話就繼續向外發送batch流。

if(_activeTx.size() < _maxTransactionActive) {

Long curr = _currTransaction;

for(int i=0; i<_maxTransactionActive; i++) {

if(!_activeTx.containsKey(curr) && isReady(curr)) {

// by using a monotonically increasing attempt id, downstream tasks

// can be memory efficient by clearing out state for old attempts

// as soon as they see a higher attempt id for a transaction

Integer attemptId = _attemptIds.get(curr);

if(attemptId==null) {

attemptId = 0;

} else {

attemptId++;

}

_attemptIds.put(curr, attemptId);

for(TransactionalState state: _states) {

state.setData(CURRENT_ATTEMPTS, _attemptIds);

}

TransactionAttempt attempt = new TransactionAttempt(curr, attemptId);

_activeTx.put(curr, new TransactionStatus(attempt));

_collector.emit(BATCH_STREAM_ID, new Values(attempt), attempt);

_throttler.markEvent();

}

curr = nextTransactionId(curr);

}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

2、TSC收到batch流後,在execute()方法中,繼續向外發送

batch流後,在execute()方法中,繼續向外發送batch流。

long txid = attempt.getTransactionId();

Object prevMeta = _state.getPreviousState(txid);

Object meta = _coord.initializeTransaction(txid, prevMeta, _state.getState(txid));

_state.overrideState(txid, meta);

collector.emit(MasterBatchCoordinator.BATCH_STREAM_ID, new Values(attempt, meta));

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

3、TSE收到$batch流後,調用用戶Emitter類中的emitBatch()方法,開始向外發送數據。

_collector.setBatch(info.batchId);

_emitter.emitBatch(attempt, input.getValue(1), _collector);

_activeBatches.put(attempt.getTransactionId(), attempt);

  • 1
  • 2
  • 3
  • 4

4、當整個消息被成功處理完後,會調用MBC的ack()方法,ack方法會將事務的狀態從PROCESSING改為PROCESSED:

if(status.status==AttemptStatus.PROCESSING) {

status.status = AttemptStatus.PROCESSED;

}

  • 1
  • 2
  • 3
  • 4

當然,如果fail掉了,則會調用fail()方法。

當sync()方法接收到事務狀態為PROCESSED時,將其改為COMMITTING的狀態,並向外發送id為$commit的流。

if(maybeCommit!=null && maybeCommit.status == AttemptStatus.PROCESSED) {

maybeCommit.status = AttemptStatus.COMMITTING;

_collector.emit(COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt);

}

  • 1
  • 2
  • 3
  • 4
  • 5

5、TSE處理$commit流

if(input.getSourceStreamId().equals(MasterBatchCoordinator.COMMIT_STREAM_ID)) {

if(attempt.equals(_activeBatches.get(attempt.getTransactionId()))) {

((ICommitterTridentSpout.Emitter) _emitter).commit(attempt);

_activeBatches.remove(attempt.getTransactionId());

} else {

throw new FailedException("Received commit for different transaction attempt");

}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

收到$commit流的節點會開始提交操作,但trident會按事務號順序提交事務的,所以由提交bolt來決定是否現在提交,還是先緩存下來之後再提交。

6、當$commit流處理完後,MBC的ack()方法會被再次調用,同時向外發送$success流

else if(status.status==AttemptStatus.COMMITTING) {

//如果當前狀態是COMMITTING,則將事務從_activeTx及_attemptIds去掉,並發送$success流。

_activeTx.remove(tx.getTransactionId());

_attemptIds.remove(tx.getTransactionId());

_collector.emit(SUCCESS_STREAM_ID, new Values(tx));

_currTransaction = nextTransactionId(tx.getTransactionId());

for(TransactionalState state: _states) {

state.setData(CURRENT_TX, _currTransaction);

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

7、TSC處理$commit流

if(tuple.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {

_state.cleanupBefore(attempt.getTransactionId());

_coord.success(attempt.getTransactionId());

}

  • 1
  • 2
  • 3
  • 4
  • 5

8、TSE處理$success流

else if(input.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {

// valid to delete before what"s been committed since

// those batches will never be accessed again

_activeBatches.headMap(attempt.getTransactionId()).clear();

_emitter.success(attempt);

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

至此整個流程全部完成。

總結說就是消息是從MasterBatchCoordinator開始的,它是一個真正的spout,而TridentSpoutCoordinator與TridentSpoutExecutor都是bolt,MasterBatchCoordinator發起協調消息,最後的結果是TridentSpoutExecutor發送業務消息。而發送協調消息與業務消息的都是調用用戶Spout中BatchCoordinator與Emitter中定義的代碼。

可以參考《storm源碼分析》P458的流程圖

Storm源碼分析之Trident源碼分析

4、spout如何被 載入到拓撲中

(1)在TridentTopologyBuilder的buildTopololg方法中設置了topo的相關信息

(2)在TridentTopology中調用newStream方法,將spout節點加入拓撲。

包括MBC, TSC, TSE等均是在上面2個類中被調用,從而形成一個完整的拓撲。


二、Spout

(一)Spout的創建

1、ItridentSpout

在Trident中用戶定義的Spout需要實現ItridentSpout介面。我們先看看ItridentSpout的定義

package storm.trident.spout;

import backtype.storm.task.TopologyContext;

import storm.trident.topology.TransactionAttempt;

import backtype.storm.tuple.Fields;

import java.io.Serializable;

import java.util.Map;

import storm.trident.operation.TridentCollector;

public interface ITridentSpout<T> extends Serializable {

public interface BatchCoordinator<X> {

X initializeTransaction(long txid, X prevMetadata, X currMetadata);

void success(long txid);

boolean isReady(long txid)

void close();

}

public interface Emitter<X> {

void emitBatch(TransactionAttempt tx, X coordinatorMeta, TridentCollector collector);

void success(TransactionAttempt tx);

void close();

}

BatchCoordinator<T> getCoordinator(String txStateId, Map conf, TopologyContext context);

Emitter<T> getEmitter(String txStateId, Map conf, TopologyContext context);

Map getComponentConfiguration();

Fields getOutputFields();

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

它有2個內部介面,分別是BatchCoordinator和Emitter,分別是用於協調的Spout介面和發送消息的Bolt介面。實現一個Spout的主要工作就在於實現這2個介面,創建實際工作的Coordinator和Emitter。Spout中提供了2個get方法用於分別用於指定使用哪個Coordinator和Emitter類,這些類會由用戶定義。稍後我們再分析Coordinator和Emitter的內容。

除此之外,還提供了getComponentConfiguration用於獲取配置信息,getOutputFields獲取輸出field。

我們再看看2個內部介面的代碼。

2、BatchCoordinator

public interface BatchCoordinator<X> {

X initializeTransaction(long txid, X prevMetadata, X currMetadata);

void success(long txid);

boolean isReady(long txid);

void close();

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

(1)initializeTransaction方法返回一個用戶定義的事務元數據。X是用戶自定義的與事務相關的數據類型,返回的數據會存儲到zk中。

其中txid為事務序列號,prevMetadata是前一個事務所對應的元數據。若當前事務為第一個事務,則其為空。currMetadata是當前事務的元數據,如果是當前事務的第一次嘗試,則為空,否則為事務上一次嘗試所產生的元數據。

(2)isReady方法用於判斷事務所對應的數據是否已經準備好,當為true時,表示可以開始一個新事務。其參數是當前的事務號。

BatchCoordinator中實現的方法會被部署到多個節點中運行,其中isReady是在真正的Spout(MasterBatchCoordinator)中執行的,其餘方法在TridentSpoutCoordinator中執行。

3、Emmitter

public interface Emitter<X> {

void emitBatch(TransactionAttempt tx, X coordinatorMeta, TridentCollector collector);

void success(TransactionAttempt tx);

void close();

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

消息發送節點會接收協調spout的$batch和$success流。

(1)當收到$batch消息時,節點便調用emitBatch方法來發送消息。

(2)當收到$success消息時,會調用success方法對事務進行後處理

4、一個示例

參考 DiagnosisEventSpout

(1)Spout的代碼

package com.packtpub.storm.trident.spout;

import backtype.storm.task.TopologyContext;

import backtype.storm.tuple.Fields;

import storm.trident.spout.ITridentSpout;

import java.util.Map;

@SuppressWarnings("rawtypes")

public class DiagnosisEventSpout implements ITridentSpout<Long> {

private static final long serialVersionUID = 1L;

BatchCoordinator<Long> coordinator = new DefaultCoordinator();

Emitter<Long> emitter = new DiagnosisEventEmitter();

@Override

public BatchCoordinator<Long> getCoordinator(String txStateId, Map conf, TopologyContext context) {

return coordinator;

}

@Override

public Emitter<Long> getEmitter(String txStateId, Map conf, TopologyContext context) {

return emitter;

}

@Override

public Map getComponentConfiguration() {

return null;

}

@Override

public Fields getOutputFields() {

return new Fields("event");

}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

(2)BatchCoordinator的代碼

package com.packtpub.storm.trident.spout;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import storm.trident.spout.ITridentSpout.BatchCoordinator;

import java.io.Serializable;

public class DefaultCoordinator implements BatchCoordinator<Long>, Serializable {

private static final long serialVersionUID = 1L;

private static final Logger LOG = LoggerFactory.getLogger(DefaultCoordinator.class);

@Override

public boolean isReady(long txid) {

return true;

}

@Override

public void close() {

}

@Override

public Long initializeTransaction(long txid, Long prevMetadata, Long currMetadata) {

LOG.info("Initializing Transaction [" + txid + "]");

return null;

}

@Override

public void success(long txid) {

LOG.info("Successful Transaction [" + txid + "]");

}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

(3)Emitter的代碼

package com.packtpub.storm.trident.spout;

import com.packtpub.storm.trident.model.DiagnosisEvent;

import storm.trident.operation.TridentCollector;

import storm.trident.spout.ITridentSpout.Emitter;

import storm.trident.topology.TransactionAttempt;

import java.io.Serializable;

import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.atomic.AtomicInteger;

public class DiagnosisEventEmitter implements Emitter<Long>, Serializable {

private static final long serialVersionUID = 1L;

AtomicInteger successfulTransactions = new AtomicInteger(0);

@Override

public void emitBatch(TransactionAttempt tx, Long coordinatorMeta, TridentCollector collector) {

for (int i = 0; i < 10000; i++) {

List<Object> events = new ArrayList<Object>();

double lat = new Double(-30 + (int) (Math.random() * 75));

double lng = new Double(-120 + (int) (Math.random() * 70));

long time = System.currentTimeMillis();

String diag = new Integer(320 + (int) (Math.random() * 7)).toString();

DiagnosisEvent event = new DiagnosisEvent(lat, lng, time, diag);

events.add(event);

collector.emit(events);

}

}

@Override

public void success(TransactionAttempt tx) {

successfulTransactions.incrementAndGet();

}

@Override

public void close() {

}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

(4)最後,在創建topo時指定spout

TridentTopology topology = new TridentTopology();

DiagnosisEventSpout spout = new DiagnosisEventSpout();

Stream inputStream = topology.newStream("event", spout);

  • 1
  • 2
  • 3
  • 4

(二)spout實際的消息流

以上的內容說明了如何在用戶代碼中創建一個Spout,以及其基本原理。但創建Spout後,它是怎麼被載入到拓撲真正的Spout中呢?我們繼續看trident的實現。

1、MasterBatchCoordinator

總體而言,MasterBatchCoordinator作為一個數據流的真正起點:

* 首先調用open方法完成初始化,包括讀取之前的拓撲處理到的事務序列號,最多同時處理的tuple數量,每個事務的嘗試次數等。

* 然後nextTuple會改變事務的狀態,或者是創建事務並發送$batch流。

* 最後,ack方法會根據流的狀態向外發送$commit流,或者是重新調用sync方法,開始創建新的事務。

總而言之,MasterBatchCoordinator作為拓撲數據流的真正起點,通過循環發送協調信息,不斷的處理數據流。MasterBatchCoordinator的真正作用在於協調消息的起點,裡面所有的map,如_activeTx,_attemptIds等都只是為了保存當前正在處理的情況而已。

(1)MasterBatchCoordinator是一個真正的spout

public class MasterBatchCoordinator extends BaseRichSpout

  • 1
  • 2

一個Trident拓撲的真正邏輯就是從MasterBatchCoordinator開始的,先調用open方法完成一些初始化,然後是在nextTuple中發送$batch和$commit流。

(2)看一下open方法

@Override

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

_throttler = new WindowedTimeThrottler((Number)conf.get(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS), 1);

for(String spoutId: _managedSpoutIds) {

//每個MasterBatchSpout可以處理多個ITridentSpout,這裡將多個spout的元數據放到_states這個Map中。稍後再看看放進來的是什麼內容。

_states.add(TransactionalState.newCoordinatorState(conf, spoutId));

}

//從zk中獲取當前的transation事務序號,當拓撲新啟動時,需要從zk恢復之前的狀態。也就是說zk存儲的是下一個需要提交的事務序號,而不是已經提交的事務序號。

_currTransaction = getStoredCurrTransaction();

_collector = collector;

//任何時刻中,一個spout task最多可以同時處理的tuple數量,即已經emite,但未acked的tuple數量。

Number active = (Number) conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);

if(active==null) {

_maxTransactionActive = 1;

} else {

_maxTransactionActive = active.intValue();

}

//每一個事務的當前嘗試編號,即_currTransaction這個事務序號中,各個事務的嘗試次數。

_attemptIds = getStoredCurrAttempts(_currTransaction, _maxTransactionActive);

for(int i=0; i<_spouts.size(); i++) {

//將各個Spout的Coordinator保存在_coordinators這個List中。

String txId = _managedSpoutIds.get(i);

_coordinators.add(_spouts.get(i).getCoordinator(txId, conf, context));

}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

(3)再看一下nextTuple()方法,它只調用了sync()方法,主要完成了以下功能:

* 如果事務狀態是PROCESSED,則將其狀態改為COMMITTING,然後發送commit流。接收到

commit流。接收到commit流的節點會調用finishBatch方法,進行事務的提交和後處理

* 如果_activeTx.size()小於_maxTransactionActive,則新建事務,放到_activeTx中,同時向外發送$batch流,等待Coordinator的處理。( 當ack方法被 調用時,這個事務會被從_activeTx中移除)

注意:當前處於acitve狀態的應該是序列在[_currTransaction,_currTransaction+_maxTransactionActive-1]之間的事務。

private void sync() {

// note that sometimes the tuples active may be less than max_spout_pending, e.g.

// max_spout_pending = 3

// tx 1, 2, 3 active, tx 2 is acked. there won"t be a commit for tx 2 (because tx 1 isn"t committed yet),

// and there won"t be a batch for tx 4 because there"s max_spout_pending tx active

//判斷當前事務_currTransaction是否為PROCESSED狀態,如果是的話,將其狀態改為COMMITTING,然後發送$commit流。接收到$commit流的節點會調用finishBatch方法,進行事務的提交和後處理。

TransactionStatus maybeCommit = _activeTx.get(_currTransaction);

if(maybeCommit!=null && maybeCommit.status == AttemptStatus.PROCESSED) {

maybeCommit.status = AttemptStatus.COMMITTING;

_collector.emit(COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt);

}

//用於產生一個新事務。最多存在_maxTransactionActive個事務同時運行,當前active的事務序號區間處於[_currTransaction,_currTransaction+_maxTransactionActive-1]之間。注意只有在當前

//事務結束之後,系統才會初始化新的事務,所以系統中實際活躍的事務可能少於_maxTransactionActive。

if(_active) {

if(_activeTx.size() < _maxTransactionActive) {

Long curr = _currTransaction;

//創建_maxTransactionActive個事務。

for(int i=0; i<_maxTransactionActive; i++) {

//如果事務序號不存在_activeTx中,則創建新事務,並發送$batch流。當ack被調用時,這個序號會被remove掉,詳見ack方法。

if(!_activeTx.containsKey(curr) && isReady(curr)) {

// by using a monotonically increasing attempt id, downstream tasks

// can be memory efficient by clearing out state for old attempts

// as soon as they see a higher attempt id for a transaction

Integer attemptId = _attemptIds.get(curr);

if(attemptId==null) {

attemptId = 0;

} else {

attemptId++;

}

//_activeTx記錄的是事務序號和事務狀態的map,而_activeTx則記錄事務序號與嘗試次數的map。

_attemptIds.put(curr, attemptId);

for(TransactionalState state: _states) {

state.setData(CURRENT_ATTEMPTS, _attemptIds);

}

//TransactionAttempt包含事務序號和嘗試編號2個變數,對應於一個具體的事務。

TransactionAttempt attempt = new TransactionAttempt(curr, attemptId);

_activeTx.put(curr, new TransactionStatus(attempt));

_collector.emit(BATCH_STREAM_ID, new Values(attempt), attempt);

_throttler.markEvent();

}

//如果事務序號已經存在_activeTx中,則curr遞增,然後再循環檢查下一個。

curr = nextTransactionId(curr);

}

}

}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

完整代碼見最後。

(4)繼續往下,看看ack方法。

@Override

public void ack(Object msgId) {

//獲取某個事務的狀態

TransactionAttempt tx = (TransactionAttempt) msgId;

TransactionStatus status = _activeTx.get(tx.getTransactionId());

if(status!=null && tx.equals(status.attempt)) {

//如果當前狀態是PROCESSING,則改為PROCESSED

if(status.status==AttemptStatus.PROCESSING) {

status.status = AttemptStatus.PROCESSED;

} else if(status.status==AttemptStatus.COMMITTING) {

//如果當前狀態是COMMITTING,則將事務從_activeTx及_attemptIds去掉,並發送$success流。

_activeTx.remove(tx.getTransactionId());

_attemptIds.remove(tx.getTransactionId());

_collector.emit(SUCCESS_STREAM_ID, new Values(tx));

_currTransaction = nextTransactionId(tx.getTransactionId());

for(TransactionalState state: _states) {

state.setData(CURRENT_TX, _currTransaction);

}

}

//由於有些事務狀態已經改變,需要重新調用sync()繼續後續處理,或者發送新tuple。

sync();

}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

(5)還有fail方法和declareOutputFileds方法。

@Override

public void fail(Object msgId) {

TransactionAttempt tx = (TransactionAttempt) msgId;

TransactionStatus stored = _activeTx.remove(tx.getTransactionId());

if(stored!=null && tx.equals(stored.attempt)) {

_activeTx.tailMap(tx.getTransactionId()).clear();

sync();

}

}

@Override

public void declareOutputFields(OutputFieldsDeclarer declarer) {

// in partitioned example, in case an emitter task receives a later transaction than it"s emitted so far,

// when it sees the earlier txid it should know to emit nothing

declarer.declareStream(BATCH_STREAM_ID, new Fields("tx"));

declarer.declareStream(COMMIT_STREAM_ID, new Fields("tx"));

declarer.declareStream(SUCCESS_STREAM_ID, new Fields("tx"));

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

2、TridentSpoutCoordinator

TridentSpoutCoordinator接收來自MasterBatchCoordinator的$success流與$batch流,並通過調用用戶代碼,實現真正的邏輯。此外還向TridentSpoutExecuter發送$batch流,以觸發後者開始真正發送業務數據流。

(1)TridentSpoutCoordinator是一個bolt

public class TridentSpoutCoordinator implements IBasicBolt

  • 1
  • 2

(2)在創建TridentSpoutCoordinator時,需要傳遞一個ITridentSpout對象,

public TridentSpoutCoordinator(String id, ITridentSpout spout) {

_spout = spout;

_id = id;

}

  • 1
  • 2
  • 3
  • 4
  • 5

然後使用這個對象來獲取到用戶定義的Coordinator:

_coord = _spout.getCoordinator(_id, conf, context);

  • 1
  • 2

(3)_state和_underlyingState保存了zk中的元數據信息

_underlyingState = TransactionalState.newCoordinatorState(conf, _id);

_state = new RotatingTransactionalState(_underlyingState, META_DIR);

  • 1
  • 2
  • 3

(4)在execute方法中,TridentSpoutCoordinator接收$success流與$batch流,先看看$success流:

if(tuple.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {

_state.cleanupBefore(attempt.getTransactionId());

_coord.success(attempt.getTransactionId());

}

  • 1
  • 2
  • 3
  • 4
  • 5

即接收到$success流時,調用用戶定義的Coordinator中的success方法。同時還清理了zk中的數據。

(5)再看看$batch流

else {

long txid = attempt.getTransactionId();

Object prevMeta = _state.getPreviousState(txid);

Object meta = _coord.initializeTransaction(txid, prevMeta, _state.getState(txid));

_state.overrideState(txid, meta);

collector.emit(MasterBatchCoordinator.BATCH_STREAM_ID, new Values(attempt, meta));

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

當收到$batch流流時,初始化一個事務並將其發送出去。由於在trident中消息有可能是重放的,因此需要prevMeta。注意,trident是在bolt中初始化一個事務的。

3、TridentSpoutExecutor

TridentSpoutExecutor接收來自TridentSpoutCoordinator的消息流,包括$commit,$success與$batch流,前面2個分別調用emmitter的commit與success方法,$batch則調用emmitter的emitBatch方法,開始向外發送業務數據。

對於分區類型的spout,有可能是OpaquePartitionedTridentSpoutExecutor等分區類型的executor。

(1) TridentSpoutExecutor與是一個bolt

publicclassTridentSpoutExecutorimplementsITridentBatchBolt

  • 1
  • 2

(2)核心的execute方法

@Override

public void execute(BatchInfo info, Tuple input) {

// there won"t be a BatchInfo for the success stream

TransactionAttempt attempt = (TransactionAttempt) input.getValue(0);

if(input.getSourceStreamId().equals(MasterBatchCoordinator.COMMIT_STREAM_ID)) {

if(attempt.equals(_activeBatches.get(attempt.getTransactionId()))) {

((ICommitterTridentSpout.Emitter) _emitter).commit(attempt);

_activeBatches.remove(attempt.getTransactionId());

} else {

throw new FailedException("Received commit for different transaction attempt");

}

} else if(input.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {

// valid to delete before what"s been committed since

// those batches will never be accessed again

_activeBatches.headMap(attempt.getTransactionId()).clear();

_emitter.success(attempt);

} else {

_collector.setBatch(info.batchId);

//發送業務消息

_emitter.emitBatch(attempt, input.getValue(1), _collector);

_activeBatches.put(attempt.getTransactionId(), attempt);

}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

三、bolt

(一)概述

1、組件的基本關係

(1)trident拓撲最終會轉化為一個spout和多個bolt,每個bolt對應一個SubTopologyBolt,它通過TridentBoltExecutor適配成一個bolt。而每個SubTopologyBOlt則由很多節點組成,具體點說這個節點包括(Stream|Node)2部分,注意,Node不是Stream自身的成員變數,而是一個具體的處理節點。Stream定義了哪些數據流,Node定義和如何進行操作,Node包含了一個ProjectedProccessor等處理器,用於定義如何進行數據處理。

(2)一個SubTopologyBOlt包含多個Group,但大多數情況下是一個Group。看TridentTopology#genBoltIds()的代碼。在一個SubTopologyBolt中,含有多個節點組是可能的。例如在含有DRPC的Topology中,查詢操作也存儲操作可以被分配到同一個SubTopologyBolt中。於是該bolt可能收到來自2個節點組的消息。

(3)一個Group有多個Node。符合一定條件的Node會被merge()成一個Group,每個Node表示一個操作。

(4)每個Node與一個Stream一一對應。注意Stream不是指端到端的完整流,而是每一個步驟的處理對象,所有的Stream組合起來才形成完整的流。看Stream的成員變數。

(5)每個Node可能有多個父stream,但多個的情況只在merge()調用multiReduce()時使用。每個Stream與node之間創建一條邊。見TridentTopology#addSourceNode()方法。

2、用戶視角與源碼視角

在用戶角度來看,他通過newStream(),each(),filter()待方案對Stream進行操作。而在代碼角度,這些操作會被轉化為各種Node節點,它些節點組合成一個SubTopologyBolt,然後經過TridentBoltExecutor適配後成為一個bolt。

從用戶層面來看TridentTopology,有兩個重要的概念一是Stream,另一個是作用於Stream上的各種Operation。在實現層面來看,無論是stream,還是後續的operation都會轉變成為各個Node,這些Node之間的關係通過重要的數據結構圖來維護。具體到TridentTopology,實現圖的各種操作的組件是jgrapht。

說到圖,兩個基本的概念會閃現出來,一是結點,二是描述結點之間關係的邊。要想很好的理解TridentTopology就需要緊盯圖中結點和邊的變化。

TridentTopology在轉換成為普通的StormTopology時,需要將原始的圖分成各個group,每個group將運行於一個獨立的bolt中。TridentTopology又是如何知道哪些node應該在同一個group,哪些應該處在另一個group中的呢;如何來確定每個group的並發度(parallismHint)的呢。這些問題的解決都與jgrapht分不開。

關於jgrapht的更多信息,請參考其官方網站 http://jgrapht.org

========================================================

在用戶看來,所有的操作就是各種各樣的數據流與operation的組合,這些組合會被封裝成一個Node(即一個Node包含輸入流+操作+輸出流),符合一定規則的Node會被組合與一個組,組會被放到一個bolt中。

一個blot節點中可能含有多個操作,各個操作間需要進行消息傳遞

(二)基礎類

1、Stream

Stream主要定義了數據流的各種操作,如each(),pproject()等。

(1)成員變數

Node _node;

TridentTopology _topology;

String _name;

  • 1
  • 2
  • 3
  • 4

三個成員變數:

* Node對象,這表明Stream與Node是一一對應的,每個節點對應一個Stream對象。

* name:這個Stream的名稱,也等於是這這個Node的名稱。

* TridentTopology: 這個Stram所屬的拓撲,使用這個變數,可以調用addSourceNode()等方法。

其中_node變數被使用很少。

(2)projectionValidation()

這個方法用於檢查是否對一個不存在的field進行了操作。

private void projectionValidation(Fields projFields) {

if (projFields == null) {

return;

}

Fields allFields = this.getOutputFields();

for (String field : projFields) {

if (!allFields.contains(field)) {

throw new IllegalArgumentException("Trying to select non-existent field: "" + field + "" from stream containing fields fields: <" + allFields + ">");

}

}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

stream中定義了定義了各種各樣的trident操作,下面分別介紹

(3)project()

public Stream project(Fields keepFields) {

projectionValidation(keepFields);

return _topology.addSourcedNode(this, new ProcessorNode(_topology.getUniqueStreamId(), _name, keepFields, new Fields(), new ProjectedProcessor(keepFields)));

}

  • 1
  • 2
  • 3
  • 4
  • 5

首先檢查一下需要project的field是否存在。然後就在TridentTopology中新增一個節點。

第一個參數就是Stream自身,第二個參數是一個Node的子類–ProcessorNode。創建ProcessorNode時,最後一個參數ProjectedProcessor用於指定如何對流進行操作。

addSourcedNode把source和node同時添加進一個拓撲,即一個流與一個節點。注意這裡的節點不是source這個Stream自身的成員變數_node,而是一個新建的節點,比如在project()方法中的節點就是一個使用ProjectedProcessor創建的ProcessorNode。

2、Node SpoutNode PartitionNode ProcessorNode

(1)Node表示拓撲中的一個節點,後面3個均是其子類。事實上拓撲中的節點均用於產生數據或者對數據進行處理。一個拓撲有多個spout/bolt,每個spout/bolt有一個或者多個Group,我個Group有多個Node。

詳細分析見書。

3、Group

節點組是構建SubTopologyBolt的基礎,也是Topology中執行優化的基本操作單元,Trident會通過不斷的合併節點組來達到最優處理的目的。Group中包含了一組連通的節點。

(1)成員變數

public final Set<Node> nodes = new HashSet<>();

private final DirectedGraph<Node, IndexedEdge> graph;

private final String id = UUID.randomUUID().toString();

  • 1
  • 2
  • 3
  • 4

nodes表示節點組中含有的節點。

graph表示拓撲的有向圖。(是整個拓撲的構成的圖)

id用於唯一標識一個group。

(2)構造方法

public Group(DirectedGraph graph, List<Node> nodes) {

this.graph = graph;

this.nodes.addAll(nodes);

}

  • 1
  • 2
  • 3
  • 4
  • 5

初始狀態時,每個Group只有一個Node.

public Group(DirectedGraph graph, Node n) {

this(graph, Arrays.asList(n));

}

  • 1
  • 2
  • 3
  • 4

將2個Group合成一個新的Group。

public Group(Group g1, Group g2) {

this.graph = g1.graph;

nodes.addAll(g1.nodes);

nodes.addAll(g2.nodes);

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

(3)outgoingNodes()

通過遍歷組中節點的方式來獲取該節點組所有節點的子節點,這些子節點可能屬於該節點組,也可能屬於其它節點組。

public Set<Node> outgoingNodes() {

Set<Node> ret = new HashSet<>();

for(Node n: nodes) {

ret.addAll(TridentUtils.getChildren(graph, n));

}

return ret;

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

(4)incommingNodes()

用於獲取該節點組中所有節點的父節點,這些父節點可能屬於該節點組,也可能屬於其它節點組。

4、GraphGrouper

GraphGrouper提供了對節點組進行操作及合併的基本方法。

(1)成員變數

final DirectedGraph<Node, IndexedEdge> graph;

final Set<Group> currGroups;

final Map<Node, Group> groupIndex = new HashMap<>();

  • 1
  • 2
  • 3
  • 4

graph:與Group相同,即這個拓撲的整個圖。

currGroups:當前graph對應的節點組。節點組之間是沒有交集的。

groupIndex:是一個反向索引,用於快速查詢每個節點所在的節點組。

(2)構造方法

public GraphGrouper(DirectedGraph<Node, IndexedEdge> graph, Collection<Group> initialGroups) {

this.graph = graph;

this.currGroups = new LinkedHashSet<>(initialGroups);

reindex();

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

就是為上面幾個變數進行初始化。

(3)reindex()

public void reindex() {

groupIndex.clear();

for(Group g: currGroups) {

for(Node n: g.nodes) {

groupIndex.put(n, g);

}

}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

根據currGroups的內容重構groupIndex。

(4)nodeGroup()

public Group nodeGroup(Node n) {

return groupIndex.get(n);

}

  • 1
  • 2
  • 3
  • 4

查詢某個node屬於哪個group。

(5)outgoingGroups()

計算節點組與哪些節點組之間存在有向邊,即2個節點組是相連的。其基本演算法是遍歷每一個節點的子節點,若該子節點所在的節點組與自身不同,則獲得子節點所在的節點組。

public Collection<Group> outgoingGroups(Group g) {

Set<Group> ret = new HashSet<>();

for(Node n: g.outgoingNodes()) {

Group other = nodeGroup(n);

if(other==null || !other.equals(g)) {

ret.add(other);

}

}

return ret;

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

(6)incomingGroups()

用於獲取該節點組的父節點組,演算法與上面類似。

public Collection<Group> incomingGroups(Group g) {

Set<Group> ret = new HashSet<>();

for(Node n: g.incomingNodes()) {

Group other = nodeGroup(n);

if(other==null || !other.equals(g)) {

ret.add(other);

}

}

return ret;

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

(7)merge()

合併2個節點組。

private void merge(Group g1, Group g2) {

Group newGroup = new Group(g1, g2);

currGroups.remove(g1);

currGroups.remove(g2);

currGroups.add(newGroup);

for(Node n: newGroup.nodes) {

groupIndex.put(n, newGroup);

}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

(8)mergeFully

這個方法是GraphGrouper的核心演算法,它用來計算何時可以對2個節點組進行合併。基本思想是:如果一個節點組只有一個父節點組,那麼將這個節點組與父節點組合併;如果一個節點組只有一個子節點組,那麼將子節點組與自身節點組合併。反覆進行這個過程。

public void mergeFully() {

boolean somethingHappened = true;

while(somethingHappened) {

somethingHappened = false;

for(Group g: currGroups) {

Collection<Group> outgoingGroups = outgoingGroups(g);

if(outgoingGroups.size()==1) {

Group out = outgoingGroups.iterator().next();

if(out!=null) {

merge(g, out);

somethingHappened = true;

break;

}

}

Collection<Group> incomingGroups = incomingGroups(g);

if(incomingGroups.size()==1) {

Group in = incomingGroups.iterator().next();

if(in!=null) {

merge(g, in);

somethingHappened = true;

break;

}

}

}

}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

四、在TridentTopologyBuilder中設置Spout、bolt

(一)參考內容

http://www.cnblogs.com/hseagle/p/3490635.html

TridentTopology是storm提供的高層使用介面,常見的一些SQL中的操作tridenttopology提供的api中都有類似的影射。

從TridentTopology到vanilla topology(普通的topology)由三個層次組

成:

1. 面向最終用戶的概念stream, operation

2. 利用planner將tridenttopology轉換成vanilla topology

3. 執行vanilla topology

從TridentTopology到基本的Topology有三層,下圖是一個全局視圖。

Storm源碼分析之Trident源碼分析

從用戶層面來看TridentTopology,有兩個重要的概念一是Stream,另一個是作用於Stream上的各種Operation。在實現層面來看,無論是stream,還是後續的operation都會轉變成為各個Node,這些Node之間的關係通過重要的數據結構圖來維護。具體到TridentTopology,實現圖的各種操作的組件是jgrapht。

說到圖,兩個基本的概念會閃現出來,一是結點,二是描述結點之間關係的邊。要想很好的理解TridentTopology就需要緊盯圖中結點和邊的變化。

TridentTopology在轉換成為普通的StormTopology時,需要將原始的圖分成各個group,每個group將運行於一個獨立的bolt中。TridentTopology又是如何知道哪些node應該在同一個group,哪些應該處在另一個group中的呢;如何來確定每個group的並發度(parallismHint)的呢。這些問題的解決都與jgrapht分不開。

關於jgrapht的更多信息,請參考其官方網站 http://jgrapht.org

========================================================

在用戶看來,所有的操作就是各種各樣的數據流與operation的組合,這些組合會被封裝成一個Node(即一個Node包含輸入流+操作+輸出流),符合一定規則的Node會被組合與一個組,組會被放到一個bolt中。

一個blot節點中可能含有多個操作,各個操作間需要進行消息傳遞。

=====================================

1、【待完善】通過上面的分析,一個Spout是準備好了,但如何將它載入到拓撲中,並開始真正的數據流:

(1)在TridentTopology中調用newStream方法,將spout節點加入拓撲。

(2)在TridentTopologyBuilder的buildTopololg方法中設置了topo的相關信息

2、拓撲創建的總體流程

(1)在用戶代碼中創建TridentTopology對象

TridentTopology topology = new TridentTopology();

  • 1
  • 2

(2)在用戶代碼中指定spout節點和bolt節點

比如:

topology.newStream("spout1", spout)

.parallelismHint(16)

.each(new Fields("sentence"), new Split(), new Fields("word"))

.groupBy(new Fields("word"))

.persistentAggregate(new MemoryMapState.Factory(), new Count(),

new Fields("count")).parallelismHint(16);

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

(3)在用戶代碼中創建拓撲

topology.build();

  • 1
  • 2

(4)topology.build()會調用TridentTopologyBuilder#buildTopology()

(5)用戶代碼中提交拓撲

StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null));

  • 1
  • 2

(一)概述

(二)基礎類

1、GlobalStreamId

這是由trift生成的類,有2個核心成員變數

public GlobalStreamId(

String componentId,

String streamId)

  • 1
  • 2
  • 3
  • 4

分別記錄了某個component的ID與其對應的streamId,如

"$mastercoord-" + batchGroup MasterBatchCoordinator.BATCH_STREAM_ID

  • 1
  • 2

表示這個component會消費這個stream的消息。

(三)TridentTopology

主要流程:

(1)創建各種各樣的節點,包括spout/bolt

(2)spout全部放到一個set中

(3)bolt的每一個節點放入一個group中

(4)對group進行各種的merge操作(如g1的所有輸出均到g2,則將它們合併)

(5)直到剩餘少量的mergeGroup,作為bolt

(6)TridentTopologyBuilder.buildTopology()對這些spout/mergeGroup進行分組配置。

1、生成bolt的名稱:genBoltIds

genBoltIds用於為bolt生成一個唯一的id,它使用字母b開頭,然後是一個數字id,接著是group的名稱,然後是第2個id, 第2個group的名稱….。而group的名稱是由這個group包含的Node名稱組成的。

private static Map<Group, String> genBoltIds(Collection<Group> groups) {

Map<Group, String> ret = new HashMap<>();

int ctr = 0;

for(Group g: groups) {

if(!isSpoutGroup(g)) {

List<String> name = new ArrayList<>();

name.add("b");

name.add("" + ctr);

String groupName = getGroupName(g);

if(groupName!=null && !groupName.isEmpty()) {

name.add(getGroupName(g));

}

ret.put(g, Utils.join(name, "-"));

ctr++;

}

}

return ret;

}

private static String getGroupName(Group g) {

TreeMap<Integer, String> sortedNames = new TreeMap<>();

for(Node n: g.nodes) {

if(n.name!=null) {

sortedNames.put(n.creationIndex, n.name);

}

}

List<String> names = new ArrayList<>();

String prevName = null;

for(String n: sortedNames.values()) {

if(prevName==null || !n.equals(prevName)) {

prevName = n;

names.add(n);

}

}

return Utils.join(names, "-");

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

2、添加節點:addNode()

protected Stream addNode(Node n) {

registerNode(n);

return new Stream(this, n.name, n);

}

  • 1
  • 2
  • 3
  • 4
  • 5

這個方法很簡單,而且,它只在newStream()及newDRPCStream中調用,很明顯這是用於提供一個新的數據源的。而下面的addSourceNode()是用於在bolt中添加下一個處理節點的。

3、添加節點:addSourceNode()

創建一個新節點,指定新節點的父節點(可能多個)。指定多個sources的情況只在merge()方法中被調用multiReduce()時調用。因此這裡只關注一個source的情形。

protected Stream addSourcedNode(Stream source, Node newNode) {

return addSourcedNode(Arrays.asList(source), newNode);

}

protected Stream addSourcedNode(List<Stream> sources, Node newNode) {

registerSourcedNode(sources, newNode);

return new Stream(this, newNode.name, newNode);

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

addSourcedNode把source和node同時添加進一個拓撲,即一個流與一個節點。注意這裡的節點不是source這個Stream自身的成員變數_node,而是一個新建的節點,比如在project()方法中的節點就是一個使用ProjectedProcessor創建的ProcessorNode。

return _topology.addSourcedNode(this, new ProcessorNode(_topology.getUniqueStreamId(), _name, keepFields, new Fields(), new ProjectedProcessor(keepFields)));

  • 1
  • 2

除了註冊新節點 registerNode(newNode)以外,還在每個stream和節點間創建一條邊。

protected void registerSourcedNode(List<Stream> sources, Node newNode) {

registerNode(newNode);

int streamIndex = 0;

for(Stream s: sources) {

_graph.addEdge(s._node, newNode, new IndexedEdge(s._node, newNode, streamIndex));

streamIndex++;

}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

向圖中添加一個節點。然後若節點中的stateInfo成員不為空,則將該節點放入與存儲序號(StateId)相對應的哈希表_colocate中。_colocate變數將所有訪問同一存儲的節點關聯在一起,並將他們放在一個Bolt中執行。

protected void registerNode(Node n) {

_graph.addVertex(n);

if(n.stateInfo!=null) {

String id = n.stateInfo.id;

if(!_colocate.containsKey(id)) {

_colocate.put(id, new ArrayList());

}

_colocate.get(id).add(n);

}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 程序員小新人學習 的精彩文章:

AndroidStudio常用功能的設置方式
打造Python的vim環境

TAG:程序員小新人學習 |