Storm源碼分析之Trident源碼分析
@(STORM)[storm]
- Storm源碼分析之四 Trident源碼分析
- 一概述
- 0小結
- 1簡介
- 2關鍵類
- 1Spout的創建
- 2spout的消息流
- 3spout調用的整體流程
- 4spout如何被 載入到拓撲中
- 二Spout
- 一Spout的創建
- 1ItridentSpout
- 2BatchCoordinator
- 3Emmitter
- 4一個示例
- 二spout實際的消息流
- 1MasterBatchCoordinator
- 2TridentSpoutCoordinator
- 3TridentSpoutExecutor
- 三bolt
- 一概述
- 1組件的基本關係
- 2用戶視角與源碼視角
- 二基礎類
- 1Stream
- 1成員變數
- 2projectionValidation
- 3project
- 2Node SpoutNode PartitionNode ProcessorNode
- 詳細分析見書
- 3Group
- 1成員變數
- 2構造方法
- 3outgoingNodes
- 4incommingNodes
- 4GraphGrouper
- 1成員變數
- 2構造方法
- 3reindex
- 4nodeGroup
- 5outgoingGroups
- 6incomingGroups
- 7merge
- 8mergeFully
- 四在TridentTopologyBuilder中設置Spoutbolt
- 一參考內容
- 一概述
- 二基礎類
- 1GlobalStreamId
- 三TridentTopology
- 1生成bolt的名稱genBoltIds
- 2添加節點addNode
- 3添加節點addSourceNode
- 四TridentTopologyBuilder
一、概述
0、小結
TridentTopologyBuilder與TridentTopology調用MBC/TSC/TSE設置spout與2個bolt,而這三個類通過調用用戶代碼Spout中定義的Coordinator與Emitter完成真正的邏輯。
最後構建好的拓撲會提交至nimbus,nimbus開始調度這個拓撲,開始運行。
1、簡介
trident是storm的更高層次抽象,相對storm,它主要提供了3個方面的好處:
(1)提供了更高層次的抽象,將常用的count,sum等封裝成了方法,可以直接調用,不需要自己實現。
(2)以批次代替單個元組,每次處理一個批次的數據。
(3)提供了事務支持,可以保證數據均處理且只處理了一次。
本文介紹了在一個Trident拓撲中,spout是如何被產生並被調用的。首先介紹了用戶如何創建一個Spout以及其基本原理,然後介紹了Spout的實際數據流,最後解釋了在創建topo時如何設置一個Spout。
2、關鍵類
MaterBatchCorodeinator —————> ITridentSpout.Coordinator#isReady
|
|
v
TridentSpoutCoordinator —————> ITridentSpout.Coordinator#[initialTransaction, success, close]
|
|
v
TridentSpoutExecutor —————> ITridentSpout.Emitter#(emitBatch, success(),close)
Spout中涉及2組類,第一組類定義了用戶如何創建一個Spout,這些用戶的代碼會被第二組的類調用。第二組類定義了實際的數據流是如何發起並傳送的。
(1)Spout的創建
涉及三個類:ItridentSpout, BatchCoordinator, Emitter,其中後面2個是第一個的內部類。
用戶創建一個Spout需要實現上述三個介面。比如storm-kafka中的Spout就是實現了這3個介面或者其子介面。
(2)spout的消息流
也是涉及三個類:MasterBatchCoordinator, TridentSpoutCoordinator, TridentSpoutExecutor。它們除了自身固定的邏輯以外,還會調用用戶的代碼,就是上面介紹的Spout代碼。
它們的定義分別為:
MasterBatchCoordinator extends BaseRichSpout
TridentSpoutCoordinator implements IBasicBolt
TridentSpoutExecutor implements ITridentBatchBolt
- 1
- 2
- 3
- 4
可以看出來,MasterBatchCoordinator才是真正的spout,另外2個都是bolt。
MasterBatchCoordinator會調用用戶定義的BatchCoordinator的isReady()方法,返回true的話,則會發送一個id為batch的消息流,從而開始一個數據流轉。TridentSpoutCoordinator接到MBC的
batch的消息流,從而開始一個數據流轉。TridentSpoutCoordinator接到MBC的batch流後,會調用BatchCoordinator的initialTransaction()初始化一個消息,並繼續向外發送 batch流。TridentSpoutExecutor接到
batch流。TridentSpoutExecutor接到batch流後,會調用用戶代碼中的TridentSpoutExecutor#emitBatch()方法,開始發送實際的業務數據。
3、spout調用的整體流程
1、MasterBatchCoordinator是Trident中真正的Spout,它可以包含多個TridentSpoutCoordinator的節點。MBC在nextTuple()中向外發送id為batch的流,作為整個數據流的起點。MBC會先判斷正在處理的事務數是否少於
m
axTransactionActive,是的話就繼續向外發送
batch的流,作為整個數據流的起點。MBC會先判斷正在處理的事務數是否少於maxTransactionActive,是的話就繼續向外發送batch流。
if(_activeTx.size() < _maxTransactionActive) {
Long curr = _currTransaction;
for(int i=0; i<_maxTransactionActive; i++) {
if(!_activeTx.containsKey(curr) && isReady(curr)) {
// by using a monotonically increasing attempt id, downstream tasks
// can be memory efficient by clearing out state for old attempts
// as soon as they see a higher attempt id for a transaction
Integer attemptId = _attemptIds.get(curr);
if(attemptId==null) {
attemptId = 0;
} else {
attemptId++;
}
_attemptIds.put(curr, attemptId);
for(TransactionalState state: _states) {
state.setData(CURRENT_ATTEMPTS, _attemptIds);
}
TransactionAttempt attempt = new TransactionAttempt(curr, attemptId);
_activeTx.put(curr, new TransactionStatus(attempt));
_collector.emit(BATCH_STREAM_ID, new Values(attempt), attempt);
_throttler.markEvent();
}
curr = nextTransactionId(curr);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
2、TSC收到batch流後,在execute()方法中,繼續向外發送
batch流後,在execute()方法中,繼續向外發送batch流。
long txid = attempt.getTransactionId();
Object prevMeta = _state.getPreviousState(txid);
Object meta = _coord.initializeTransaction(txid, prevMeta, _state.getState(txid));
_state.overrideState(txid, meta);
collector.emit(MasterBatchCoordinator.BATCH_STREAM_ID, new Values(attempt, meta));
- 1
- 2
- 3
- 4
- 5
- 6
3、TSE收到$batch流後,調用用戶Emitter類中的emitBatch()方法,開始向外發送數據。
_collector.setBatch(info.batchId);
_emitter.emitBatch(attempt, input.getValue(1), _collector);
_activeBatches.put(attempt.getTransactionId(), attempt);
- 1
- 2
- 3
- 4
4、當整個消息被成功處理完後,會調用MBC的ack()方法,ack方法會將事務的狀態從PROCESSING改為PROCESSED:
if(status.status==AttemptStatus.PROCESSING) {
status.status = AttemptStatus.PROCESSED;
}
- 1
- 2
- 3
- 4
當然,如果fail掉了,則會調用fail()方法。
當sync()方法接收到事務狀態為PROCESSED時,將其改為COMMITTING的狀態,並向外發送id為$commit的流。
if(maybeCommit!=null && maybeCommit.status == AttemptStatus.PROCESSED) {
maybeCommit.status = AttemptStatus.COMMITTING;
_collector.emit(COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt);
}
- 1
- 2
- 3
- 4
- 5
5、TSE處理$commit流
if(input.getSourceStreamId().equals(MasterBatchCoordinator.COMMIT_STREAM_ID)) {
if(attempt.equals(_activeBatches.get(attempt.getTransactionId()))) {
((ICommitterTridentSpout.Emitter) _emitter).commit(attempt);
_activeBatches.remove(attempt.getTransactionId());
} else {
throw new FailedException("Received commit for different transaction attempt");
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
收到$commit流的節點會開始提交操作,但trident會按事務號順序提交事務的,所以由提交bolt來決定是否現在提交,還是先緩存下來之後再提交。
6、當$commit流處理完後,MBC的ack()方法會被再次調用,同時向外發送$success流
else if(status.status==AttemptStatus.COMMITTING) {
//如果當前狀態是COMMITTING,則將事務從_activeTx及_attemptIds去掉,並發送$success流。
_activeTx.remove(tx.getTransactionId());
_attemptIds.remove(tx.getTransactionId());
_collector.emit(SUCCESS_STREAM_ID, new Values(tx));
_currTransaction = nextTransactionId(tx.getTransactionId());
for(TransactionalState state: _states) {
state.setData(CURRENT_TX, _currTransaction);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
7、TSC處理$commit流
if(tuple.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {
_state.cleanupBefore(attempt.getTransactionId());
_coord.success(attempt.getTransactionId());
}
- 1
- 2
- 3
- 4
- 5
8、TSE處理$success流
else if(input.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {
// valid to delete before what"s been committed since
// those batches will never be accessed again
_activeBatches.headMap(attempt.getTransactionId()).clear();
_emitter.success(attempt);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
至此整個流程全部完成。
總結說就是消息是從MasterBatchCoordinator開始的,它是一個真正的spout,而TridentSpoutCoordinator與TridentSpoutExecutor都是bolt,MasterBatchCoordinator發起協調消息,最後的結果是TridentSpoutExecutor發送業務消息。而發送協調消息與業務消息的都是調用用戶Spout中BatchCoordinator與Emitter中定義的代碼。
可以參考《storm源碼分析》P458的流程圖
4、spout如何被 載入到拓撲中
(1)在TridentTopologyBuilder的buildTopololg方法中設置了topo的相關信息
(2)在TridentTopology中調用newStream方法,將spout節點加入拓撲。
包括MBC, TSC, TSE等均是在上面2個類中被調用,從而形成一個完整的拓撲。
二、Spout
(一)Spout的創建
1、ItridentSpout
在Trident中用戶定義的Spout需要實現ItridentSpout介面。我們先看看ItridentSpout的定義
package storm.trident.spout;
import backtype.storm.task.TopologyContext;
import storm.trident.topology.TransactionAttempt;
import backtype.storm.tuple.Fields;
import java.io.Serializable;
import java.util.Map;
import storm.trident.operation.TridentCollector;
public interface ITridentSpout<T> extends Serializable {
public interface BatchCoordinator<X> {
X initializeTransaction(long txid, X prevMetadata, X currMetadata);
void success(long txid);
boolean isReady(long txid)
void close();
}
public interface Emitter<X> {
void emitBatch(TransactionAttempt tx, X coordinatorMeta, TridentCollector collector);
void success(TransactionAttempt tx);
void close();
}
BatchCoordinator<T> getCoordinator(String txStateId, Map conf, TopologyContext context);
Emitter<T> getEmitter(String txStateId, Map conf, TopologyContext context);
Map getComponentConfiguration();
Fields getOutputFields();
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
它有2個內部介面,分別是BatchCoordinator和Emitter,分別是用於協調的Spout介面和發送消息的Bolt介面。實現一個Spout的主要工作就在於實現這2個介面,創建實際工作的Coordinator和Emitter。Spout中提供了2個get方法用於分別用於指定使用哪個Coordinator和Emitter類,這些類會由用戶定義。稍後我們再分析Coordinator和Emitter的內容。
除此之外,還提供了getComponentConfiguration用於獲取配置信息,getOutputFields獲取輸出field。
我們再看看2個內部介面的代碼。
2、BatchCoordinator
public interface BatchCoordinator<X> {
X initializeTransaction(long txid, X prevMetadata, X currMetadata);
void success(long txid);
boolean isReady(long txid);
void close();
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
(1)initializeTransaction方法返回一個用戶定義的事務元數據。X是用戶自定義的與事務相關的數據類型,返回的數據會存儲到zk中。
其中txid為事務序列號,prevMetadata是前一個事務所對應的元數據。若當前事務為第一個事務,則其為空。currMetadata是當前事務的元數據,如果是當前事務的第一次嘗試,則為空,否則為事務上一次嘗試所產生的元數據。
(2)isReady方法用於判斷事務所對應的數據是否已經準備好,當為true時,表示可以開始一個新事務。其參數是當前的事務號。
BatchCoordinator中實現的方法會被部署到多個節點中運行,其中isReady是在真正的Spout(MasterBatchCoordinator)中執行的,其餘方法在TridentSpoutCoordinator中執行。
3、Emmitter
public interface Emitter<X> {
void emitBatch(TransactionAttempt tx, X coordinatorMeta, TridentCollector collector);
void success(TransactionAttempt tx);
void close();
}
- 1
- 2
- 3
- 4
- 5
- 6
消息發送節點會接收協調spout的$batch和$success流。
(1)當收到$batch消息時,節點便調用emitBatch方法來發送消息。
(2)當收到$success消息時,會調用success方法對事務進行後處理
4、一個示例
參考 DiagnosisEventSpout
(1)Spout的代碼
package com.packtpub.storm.trident.spout;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import storm.trident.spout.ITridentSpout;
import java.util.Map;
@SuppressWarnings("rawtypes")
public class DiagnosisEventSpout implements ITridentSpout<Long> {
private static final long serialVersionUID = 1L;
BatchCoordinator<Long> coordinator = new DefaultCoordinator();
Emitter<Long> emitter = new DiagnosisEventEmitter();
@Override
public BatchCoordinator<Long> getCoordinator(String txStateId, Map conf, TopologyContext context) {
return coordinator;
}
@Override
public Emitter<Long> getEmitter(String txStateId, Map conf, TopologyContext context) {
return emitter;
}
@Override
public Map getComponentConfiguration() {
return null;
}
@Override
public Fields getOutputFields() {
return new Fields("event");
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
(2)BatchCoordinator的代碼
package com.packtpub.storm.trident.spout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.trident.spout.ITridentSpout.BatchCoordinator;
import java.io.Serializable;
public class DefaultCoordinator implements BatchCoordinator<Long>, Serializable {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(DefaultCoordinator.class);
@Override
public boolean isReady(long txid) {
return true;
}
@Override
public void close() {
}
@Override
public Long initializeTransaction(long txid, Long prevMetadata, Long currMetadata) {
LOG.info("Initializing Transaction [" + txid + "]");
return null;
}
@Override
public void success(long txid) {
LOG.info("Successful Transaction [" + txid + "]");
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
(3)Emitter的代碼
package com.packtpub.storm.trident.spout;
import com.packtpub.storm.trident.model.DiagnosisEvent;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.ITridentSpout.Emitter;
import storm.trident.topology.TransactionAttempt;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
public class DiagnosisEventEmitter implements Emitter<Long>, Serializable {
private static final long serialVersionUID = 1L;
AtomicInteger successfulTransactions = new AtomicInteger(0);
@Override
public void emitBatch(TransactionAttempt tx, Long coordinatorMeta, TridentCollector collector) {
for (int i = 0; i < 10000; i++) {
List<Object> events = new ArrayList<Object>();
double lat = new Double(-30 + (int) (Math.random() * 75));
double lng = new Double(-120 + (int) (Math.random() * 70));
long time = System.currentTimeMillis();
String diag = new Integer(320 + (int) (Math.random() * 7)).toString();
DiagnosisEvent event = new DiagnosisEvent(lat, lng, time, diag);
events.add(event);
collector.emit(events);
}
}
@Override
public void success(TransactionAttempt tx) {
successfulTransactions.incrementAndGet();
}
@Override
public void close() {
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
(4)最後,在創建topo時指定spout
TridentTopology topology = new TridentTopology();
DiagnosisEventSpout spout = new DiagnosisEventSpout();
Stream inputStream = topology.newStream("event", spout);
- 1
- 2
- 3
- 4
(二)spout實際的消息流
以上的內容說明了如何在用戶代碼中創建一個Spout,以及其基本原理。但創建Spout後,它是怎麼被載入到拓撲真正的Spout中呢?我們繼續看trident的實現。
1、MasterBatchCoordinator
總體而言,MasterBatchCoordinator作為一個數據流的真正起點:
* 首先調用open方法完成初始化,包括讀取之前的拓撲處理到的事務序列號,最多同時處理的tuple數量,每個事務的嘗試次數等。
* 然後nextTuple會改變事務的狀態,或者是創建事務並發送$batch流。
* 最後,ack方法會根據流的狀態向外發送$commit流,或者是重新調用sync方法,開始創建新的事務。
總而言之,MasterBatchCoordinator作為拓撲數據流的真正起點,通過循環發送協調信息,不斷的處理數據流。MasterBatchCoordinator的真正作用在於協調消息的起點,裡面所有的map,如_activeTx,_attemptIds等都只是為了保存當前正在處理的情況而已。
(1)MasterBatchCoordinator是一個真正的spout
public class MasterBatchCoordinator extends BaseRichSpout
- 1
- 2
一個Trident拓撲的真正邏輯就是從MasterBatchCoordinator開始的,先調用open方法完成一些初始化,然後是在nextTuple中發送$batch和$commit流。
(2)看一下open方法
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_throttler = new WindowedTimeThrottler((Number)conf.get(Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS), 1);
for(String spoutId: _managedSpoutIds) {
//每個MasterBatchSpout可以處理多個ITridentSpout,這裡將多個spout的元數據放到_states這個Map中。稍後再看看放進來的是什麼內容。
_states.add(TransactionalState.newCoordinatorState(conf, spoutId));
}
//從zk中獲取當前的transation事務序號,當拓撲新啟動時,需要從zk恢復之前的狀態。也就是說zk存儲的是下一個需要提交的事務序號,而不是已經提交的事務序號。
_currTransaction = getStoredCurrTransaction();
_collector = collector;
//任何時刻中,一個spout task最多可以同時處理的tuple數量,即已經emite,但未acked的tuple數量。
Number active = (Number) conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);
if(active==null) {
_maxTransactionActive = 1;
} else {
_maxTransactionActive = active.intValue();
}
//每一個事務的當前嘗試編號,即_currTransaction這個事務序號中,各個事務的嘗試次數。
_attemptIds = getStoredCurrAttempts(_currTransaction, _maxTransactionActive);
for(int i=0; i<_spouts.size(); i++) {
//將各個Spout的Coordinator保存在_coordinators這個List中。
String txId = _managedSpoutIds.get(i);
_coordinators.add(_spouts.get(i).getCoordinator(txId, conf, context));
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
(3)再看一下nextTuple()方法,它只調用了sync()方法,主要完成了以下功能:
* 如果事務狀態是PROCESSED,則將其狀態改為COMMITTING,然後發送commit流。接收到
commit流。接收到commit流的節點會調用finishBatch方法,進行事務的提交和後處理
* 如果_activeTx.size()小於_maxTransactionActive,則新建事務,放到_activeTx中,同時向外發送$batch流,等待Coordinator的處理。( 當ack方法被 調用時,這個事務會被從_activeTx中移除)
注意:當前處於acitve狀態的應該是序列在[_currTransaction,_currTransaction+_maxTransactionActive-1]之間的事務。
private void sync() {
// note that sometimes the tuples active may be less than max_spout_pending, e.g.
// max_spout_pending = 3
// tx 1, 2, 3 active, tx 2 is acked. there won"t be a commit for tx 2 (because tx 1 isn"t committed yet),
// and there won"t be a batch for tx 4 because there"s max_spout_pending tx active
//判斷當前事務_currTransaction是否為PROCESSED狀態,如果是的話,將其狀態改為COMMITTING,然後發送$commit流。接收到$commit流的節點會調用finishBatch方法,進行事務的提交和後處理。
TransactionStatus maybeCommit = _activeTx.get(_currTransaction);
if(maybeCommit!=null && maybeCommit.status == AttemptStatus.PROCESSED) {
maybeCommit.status = AttemptStatus.COMMITTING;
_collector.emit(COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt);
}
//用於產生一個新事務。最多存在_maxTransactionActive個事務同時運行,當前active的事務序號區間處於[_currTransaction,_currTransaction+_maxTransactionActive-1]之間。注意只有在當前
//事務結束之後,系統才會初始化新的事務,所以系統中實際活躍的事務可能少於_maxTransactionActive。
if(_active) {
if(_activeTx.size() < _maxTransactionActive) {
Long curr = _currTransaction;
//創建_maxTransactionActive個事務。
for(int i=0; i<_maxTransactionActive; i++) {
//如果事務序號不存在_activeTx中,則創建新事務,並發送$batch流。當ack被調用時,這個序號會被remove掉,詳見ack方法。
if(!_activeTx.containsKey(curr) && isReady(curr)) {
// by using a monotonically increasing attempt id, downstream tasks
// can be memory efficient by clearing out state for old attempts
// as soon as they see a higher attempt id for a transaction
Integer attemptId = _attemptIds.get(curr);
if(attemptId==null) {
attemptId = 0;
} else {
attemptId++;
}
//_activeTx記錄的是事務序號和事務狀態的map,而_activeTx則記錄事務序號與嘗試次數的map。
_attemptIds.put(curr, attemptId);
for(TransactionalState state: _states) {
state.setData(CURRENT_ATTEMPTS, _attemptIds);
}
//TransactionAttempt包含事務序號和嘗試編號2個變數,對應於一個具體的事務。
TransactionAttempt attempt = new TransactionAttempt(curr, attemptId);
_activeTx.put(curr, new TransactionStatus(attempt));
_collector.emit(BATCH_STREAM_ID, new Values(attempt), attempt);
_throttler.markEvent();
}
//如果事務序號已經存在_activeTx中,則curr遞增,然後再循環檢查下一個。
curr = nextTransactionId(curr);
}
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
完整代碼見最後。
(4)繼續往下,看看ack方法。
@Override
public void ack(Object msgId) {
//獲取某個事務的狀態
TransactionAttempt tx = (TransactionAttempt) msgId;
TransactionStatus status = _activeTx.get(tx.getTransactionId());
if(status!=null && tx.equals(status.attempt)) {
//如果當前狀態是PROCESSING,則改為PROCESSED
if(status.status==AttemptStatus.PROCESSING) {
status.status = AttemptStatus.PROCESSED;
} else if(status.status==AttemptStatus.COMMITTING) {
//如果當前狀態是COMMITTING,則將事務從_activeTx及_attemptIds去掉,並發送$success流。
_activeTx.remove(tx.getTransactionId());
_attemptIds.remove(tx.getTransactionId());
_collector.emit(SUCCESS_STREAM_ID, new Values(tx));
_currTransaction = nextTransactionId(tx.getTransactionId());
for(TransactionalState state: _states) {
state.setData(CURRENT_TX, _currTransaction);
}
}
//由於有些事務狀態已經改變,需要重新調用sync()繼續後續處理,或者發送新tuple。
sync();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
(5)還有fail方法和declareOutputFileds方法。
@Override
public void fail(Object msgId) {
TransactionAttempt tx = (TransactionAttempt) msgId;
TransactionStatus stored = _activeTx.remove(tx.getTransactionId());
if(stored!=null && tx.equals(stored.attempt)) {
_activeTx.tailMap(tx.getTransactionId()).clear();
sync();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// in partitioned example, in case an emitter task receives a later transaction than it"s emitted so far,
// when it sees the earlier txid it should know to emit nothing
declarer.declareStream(BATCH_STREAM_ID, new Fields("tx"));
declarer.declareStream(COMMIT_STREAM_ID, new Fields("tx"));
declarer.declareStream(SUCCESS_STREAM_ID, new Fields("tx"));
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
2、TridentSpoutCoordinator
TridentSpoutCoordinator接收來自MasterBatchCoordinator的$success流與$batch流,並通過調用用戶代碼,實現真正的邏輯。此外還向TridentSpoutExecuter發送$batch流,以觸發後者開始真正發送業務數據流。
(1)TridentSpoutCoordinator是一個bolt
public class TridentSpoutCoordinator implements IBasicBolt
- 1
- 2
(2)在創建TridentSpoutCoordinator時,需要傳遞一個ITridentSpout對象,
public TridentSpoutCoordinator(String id, ITridentSpout spout) {
_spout = spout;
_id = id;
}
- 1
- 2
- 3
- 4
- 5
然後使用這個對象來獲取到用戶定義的Coordinator:
_coord = _spout.getCoordinator(_id, conf, context);
- 1
- 2
(3)_state和_underlyingState保存了zk中的元數據信息
_underlyingState = TransactionalState.newCoordinatorState(conf, _id);
_state = new RotatingTransactionalState(_underlyingState, META_DIR);
- 1
- 2
- 3
(4)在execute方法中,TridentSpoutCoordinator接收$success流與$batch流,先看看$success流:
if(tuple.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {
_state.cleanupBefore(attempt.getTransactionId());
_coord.success(attempt.getTransactionId());
}
- 1
- 2
- 3
- 4
- 5
即接收到$success流時,調用用戶定義的Coordinator中的success方法。同時還清理了zk中的數據。
(5)再看看$batch流
else {
long txid = attempt.getTransactionId();
Object prevMeta = _state.getPreviousState(txid);
Object meta = _coord.initializeTransaction(txid, prevMeta, _state.getState(txid));
_state.overrideState(txid, meta);
collector.emit(MasterBatchCoordinator.BATCH_STREAM_ID, new Values(attempt, meta));
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
當收到$batch流流時,初始化一個事務並將其發送出去。由於在trident中消息有可能是重放的,因此需要prevMeta。注意,trident是在bolt中初始化一個事務的。
3、TridentSpoutExecutor
TridentSpoutExecutor接收來自TridentSpoutCoordinator的消息流,包括$commit,$success與$batch流,前面2個分別調用emmitter的commit與success方法,$batch則調用emmitter的emitBatch方法,開始向外發送業務數據。
對於分區類型的spout,有可能是OpaquePartitionedTridentSpoutExecutor等分區類型的executor。
(1) TridentSpoutExecutor與是一個bolt
publicclassTridentSpoutExecutorimplementsITridentBatchBolt
- 1
- 2
(2)核心的execute方法
@Override
public void execute(BatchInfo info, Tuple input) {
// there won"t be a BatchInfo for the success stream
TransactionAttempt attempt = (TransactionAttempt) input.getValue(0);
if(input.getSourceStreamId().equals(MasterBatchCoordinator.COMMIT_STREAM_ID)) {
if(attempt.equals(_activeBatches.get(attempt.getTransactionId()))) {
((ICommitterTridentSpout.Emitter) _emitter).commit(attempt);
_activeBatches.remove(attempt.getTransactionId());
} else {
throw new FailedException("Received commit for different transaction attempt");
}
} else if(input.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {
// valid to delete before what"s been committed since
// those batches will never be accessed again
_activeBatches.headMap(attempt.getTransactionId()).clear();
_emitter.success(attempt);
} else {
_collector.setBatch(info.batchId);
//發送業務消息
_emitter.emitBatch(attempt, input.getValue(1), _collector);
_activeBatches.put(attempt.getTransactionId(), attempt);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
三、bolt
(一)概述
1、組件的基本關係
(1)trident拓撲最終會轉化為一個spout和多個bolt,每個bolt對應一個SubTopologyBolt,它通過TridentBoltExecutor適配成一個bolt。而每個SubTopologyBOlt則由很多節點組成,具體點說這個節點包括(Stream|Node)2部分,注意,Node不是Stream自身的成員變數,而是一個具體的處理節點。Stream定義了哪些數據流,Node定義和如何進行操作,Node包含了一個ProjectedProccessor等處理器,用於定義如何進行數據處理。
(2)一個SubTopologyBOlt包含多個Group,但大多數情況下是一個Group。看TridentTopology#genBoltIds()的代碼。在一個SubTopologyBolt中,含有多個節點組是可能的。例如在含有DRPC的Topology中,查詢操作也存儲操作可以被分配到同一個SubTopologyBolt中。於是該bolt可能收到來自2個節點組的消息。
(3)一個Group有多個Node。符合一定條件的Node會被merge()成一個Group,每個Node表示一個操作。
(4)每個Node與一個Stream一一對應。注意Stream不是指端到端的完整流,而是每一個步驟的處理對象,所有的Stream組合起來才形成完整的流。看Stream的成員變數。
(5)每個Node可能有多個父stream,但多個的情況只在merge()調用multiReduce()時使用。每個Stream與node之間創建一條邊。見TridentTopology#addSourceNode()方法。
2、用戶視角與源碼視角
在用戶角度來看,他通過newStream(),each(),filter()待方案對Stream進行操作。而在代碼角度,這些操作會被轉化為各種Node節點,它些節點組合成一個SubTopologyBolt,然後經過TridentBoltExecutor適配後成為一個bolt。
從用戶層面來看TridentTopology,有兩個重要的概念一是Stream,另一個是作用於Stream上的各種Operation。在實現層面來看,無論是stream,還是後續的operation都會轉變成為各個Node,這些Node之間的關係通過重要的數據結構圖來維護。具體到TridentTopology,實現圖的各種操作的組件是jgrapht。
說到圖,兩個基本的概念會閃現出來,一是結點,二是描述結點之間關係的邊。要想很好的理解TridentTopology就需要緊盯圖中結點和邊的變化。
TridentTopology在轉換成為普通的StormTopology時,需要將原始的圖分成各個group,每個group將運行於一個獨立的bolt中。TridentTopology又是如何知道哪些node應該在同一個group,哪些應該處在另一個group中的呢;如何來確定每個group的並發度(parallismHint)的呢。這些問題的解決都與jgrapht分不開。
關於jgrapht的更多信息,請參考其官方網站 http://jgrapht.org
========================================================
在用戶看來,所有的操作就是各種各樣的數據流與operation的組合,這些組合會被封裝成一個Node(即一個Node包含輸入流+操作+輸出流),符合一定規則的Node會被組合與一個組,組會被放到一個bolt中。
一個blot節點中可能含有多個操作,各個操作間需要進行消息傳遞
(二)基礎類
1、Stream
Stream主要定義了數據流的各種操作,如each(),pproject()等。
(1)成員變數
Node _node;
TridentTopology _topology;
String _name;
- 1
- 2
- 3
- 4
三個成員變數:
* Node對象,這表明Stream與Node是一一對應的,每個節點對應一個Stream對象。
* name:這個Stream的名稱,也等於是這這個Node的名稱。
* TridentTopology: 這個Stram所屬的拓撲,使用這個變數,可以調用addSourceNode()等方法。
其中_node變數被使用很少。
(2)projectionValidation()
這個方法用於檢查是否對一個不存在的field進行了操作。
private void projectionValidation(Fields projFields) {
if (projFields == null) {
return;
}
Fields allFields = this.getOutputFields();
for (String field : projFields) {
if (!allFields.contains(field)) {
throw new IllegalArgumentException("Trying to select non-existent field: "" + field + "" from stream containing fields fields: <" + allFields + ">");
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
stream中定義了定義了各種各樣的trident操作,下面分別介紹
(3)project()
public Stream project(Fields keepFields) {
projectionValidation(keepFields);
return _topology.addSourcedNode(this, new ProcessorNode(_topology.getUniqueStreamId(), _name, keepFields, new Fields(), new ProjectedProcessor(keepFields)));
}
- 1
- 2
- 3
- 4
- 5
首先檢查一下需要project的field是否存在。然後就在TridentTopology中新增一個節點。
第一個參數就是Stream自身,第二個參數是一個Node的子類–ProcessorNode。創建ProcessorNode時,最後一個參數ProjectedProcessor用於指定如何對流進行操作。
addSourcedNode把source和node同時添加進一個拓撲,即一個流與一個節點。注意這裡的節點不是source這個Stream自身的成員變數_node,而是一個新建的節點,比如在project()方法中的節點就是一個使用ProjectedProcessor創建的ProcessorNode。
2、Node SpoutNode PartitionNode ProcessorNode
(1)Node表示拓撲中的一個節點,後面3個均是其子類。事實上拓撲中的節點均用於產生數據或者對數據進行處理。一個拓撲有多個spout/bolt,每個spout/bolt有一個或者多個Group,我個Group有多個Node。
詳細分析見書。
3、Group
節點組是構建SubTopologyBolt的基礎,也是Topology中執行優化的基本操作單元,Trident會通過不斷的合併節點組來達到最優處理的目的。Group中包含了一組連通的節點。
(1)成員變數
public final Set<Node> nodes = new HashSet<>();
private final DirectedGraph<Node, IndexedEdge> graph;
private final String id = UUID.randomUUID().toString();
- 1
- 2
- 3
- 4
nodes表示節點組中含有的節點。
graph表示拓撲的有向圖。(是整個拓撲的構成的圖)
id用於唯一標識一個group。
(2)構造方法
public Group(DirectedGraph graph, List<Node> nodes) {
this.graph = graph;
this.nodes.addAll(nodes);
}
- 1
- 2
- 3
- 4
- 5
初始狀態時,每個Group只有一個Node.
public Group(DirectedGraph graph, Node n) {
this(graph, Arrays.asList(n));
}
- 1
- 2
- 3
- 4
將2個Group合成一個新的Group。
public Group(Group g1, Group g2) {
this.graph = g1.graph;
nodes.addAll(g1.nodes);
nodes.addAll(g2.nodes);
}
- 1
- 2
- 3
- 4
- 5
- 6
(3)outgoingNodes()
通過遍歷組中節點的方式來獲取該節點組所有節點的子節點,這些子節點可能屬於該節點組,也可能屬於其它節點組。
public Set<Node> outgoingNodes() {
Set<Node> ret = new HashSet<>();
for(Node n: nodes) {
ret.addAll(TridentUtils.getChildren(graph, n));
}
return ret;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
(4)incommingNodes()
用於獲取該節點組中所有節點的父節點,這些父節點可能屬於該節點組,也可能屬於其它節點組。
4、GraphGrouper
GraphGrouper提供了對節點組進行操作及合併的基本方法。
(1)成員變數
final DirectedGraph<Node, IndexedEdge> graph;
final Set<Group> currGroups;
final Map<Node, Group> groupIndex = new HashMap<>();
- 1
- 2
- 3
- 4
graph:與Group相同,即這個拓撲的整個圖。
currGroups:當前graph對應的節點組。節點組之間是沒有交集的。
groupIndex:是一個反向索引,用於快速查詢每個節點所在的節點組。
(2)構造方法
public GraphGrouper(DirectedGraph<Node, IndexedEdge> graph, Collection<Group> initialGroups) {
this.graph = graph;
this.currGroups = new LinkedHashSet<>(initialGroups);
reindex();
}
- 1
- 2
- 3
- 4
- 5
- 6
就是為上面幾個變數進行初始化。
(3)reindex()
public void reindex() {
groupIndex.clear();
for(Group g: currGroups) {
for(Node n: g.nodes) {
groupIndex.put(n, g);
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
根據currGroups的內容重構groupIndex。
(4)nodeGroup()
public Group nodeGroup(Node n) {
return groupIndex.get(n);
}
- 1
- 2
- 3
- 4
查詢某個node屬於哪個group。
(5)outgoingGroups()
計算節點組與哪些節點組之間存在有向邊,即2個節點組是相連的。其基本演算法是遍歷每一個節點的子節點,若該子節點所在的節點組與自身不同,則獲得子節點所在的節點組。
public Collection<Group> outgoingGroups(Group g) {
Set<Group> ret = new HashSet<>();
for(Node n: g.outgoingNodes()) {
Group other = nodeGroup(n);
if(other==null || !other.equals(g)) {
ret.add(other);
}
}
return ret;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
(6)incomingGroups()
用於獲取該節點組的父節點組,演算法與上面類似。
public Collection<Group> incomingGroups(Group g) {
Set<Group> ret = new HashSet<>();
for(Node n: g.incomingNodes()) {
Group other = nodeGroup(n);
if(other==null || !other.equals(g)) {
ret.add(other);
}
}
return ret;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
(7)merge()
合併2個節點組。
private void merge(Group g1, Group g2) {
Group newGroup = new Group(g1, g2);
currGroups.remove(g1);
currGroups.remove(g2);
currGroups.add(newGroup);
for(Node n: newGroup.nodes) {
groupIndex.put(n, newGroup);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
(8)mergeFully
這個方法是GraphGrouper的核心演算法,它用來計算何時可以對2個節點組進行合併。基本思想是:如果一個節點組只有一個父節點組,那麼將這個節點組與父節點組合併;如果一個節點組只有一個子節點組,那麼將子節點組與自身節點組合併。反覆進行這個過程。
public void mergeFully() {
boolean somethingHappened = true;
while(somethingHappened) {
somethingHappened = false;
for(Group g: currGroups) {
Collection<Group> outgoingGroups = outgoingGroups(g);
if(outgoingGroups.size()==1) {
Group out = outgoingGroups.iterator().next();
if(out!=null) {
merge(g, out);
somethingHappened = true;
break;
}
}
Collection<Group> incomingGroups = incomingGroups(g);
if(incomingGroups.size()==1) {
Group in = incomingGroups.iterator().next();
if(in!=null) {
merge(g, in);
somethingHappened = true;
break;
}
}
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
四、在TridentTopologyBuilder中設置Spout、bolt
(一)參考內容
http://www.cnblogs.com/hseagle/p/3490635.html
TridentTopology是storm提供的高層使用介面,常見的一些SQL中的操作tridenttopology提供的api中都有類似的影射。
從TridentTopology到vanilla topology(普通的topology)由三個層次組
成:
1. 面向最終用戶的概念stream, operation
2. 利用planner將tridenttopology轉換成vanilla topology
3. 執行vanilla topology
從TridentTopology到基本的Topology有三層,下圖是一個全局視圖。
從用戶層面來看TridentTopology,有兩個重要的概念一是Stream,另一個是作用於Stream上的各種Operation。在實現層面來看,無論是stream,還是後續的operation都會轉變成為各個Node,這些Node之間的關係通過重要的數據結構圖來維護。具體到TridentTopology,實現圖的各種操作的組件是jgrapht。
說到圖,兩個基本的概念會閃現出來,一是結點,二是描述結點之間關係的邊。要想很好的理解TridentTopology就需要緊盯圖中結點和邊的變化。
TridentTopology在轉換成為普通的StormTopology時,需要將原始的圖分成各個group,每個group將運行於一個獨立的bolt中。TridentTopology又是如何知道哪些node應該在同一個group,哪些應該處在另一個group中的呢;如何來確定每個group的並發度(parallismHint)的呢。這些問題的解決都與jgrapht分不開。
關於jgrapht的更多信息,請參考其官方網站 http://jgrapht.org
========================================================
在用戶看來,所有的操作就是各種各樣的數據流與operation的組合,這些組合會被封裝成一個Node(即一個Node包含輸入流+操作+輸出流),符合一定規則的Node會被組合與一個組,組會被放到一個bolt中。
一個blot節點中可能含有多個操作,各個操作間需要進行消息傳遞。
=====================================
1、【待完善】通過上面的分析,一個Spout是準備好了,但如何將它載入到拓撲中,並開始真正的數據流:
(1)在TridentTopology中調用newStream方法,將spout節點加入拓撲。
(2)在TridentTopologyBuilder的buildTopololg方法中設置了topo的相關信息
2、拓撲創建的總體流程
(1)在用戶代碼中創建TridentTopology對象
TridentTopology topology = new TridentTopology();
- 1
- 2
(2)在用戶代碼中指定spout節點和bolt節點
比如:
topology.newStream("spout1", spout)
.parallelismHint(16)
.each(new Fields("sentence"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(),
new Fields("count")).parallelismHint(16);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
(3)在用戶代碼中創建拓撲
topology.build();
- 1
- 2
(4)topology.build()會調用TridentTopologyBuilder#buildTopology()
(5)用戶代碼中提交拓撲
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, buildTopology(null));
- 1
- 2
(一)概述
(二)基礎類
1、GlobalStreamId
這是由trift生成的類,有2個核心成員變數
public GlobalStreamId(
String componentId,
String streamId)
- 1
- 2
- 3
- 4
分別記錄了某個component的ID與其對應的streamId,如
"$mastercoord-" + batchGroup MasterBatchCoordinator.BATCH_STREAM_ID
- 1
- 2
表示這個component會消費這個stream的消息。
(三)TridentTopology
主要流程:
(1)創建各種各樣的節點,包括spout/bolt
(2)spout全部放到一個set中
(3)bolt的每一個節點放入一個group中
(4)對group進行各種的merge操作(如g1的所有輸出均到g2,則將它們合併)
(5)直到剩餘少量的mergeGroup,作為bolt
(6)TridentTopologyBuilder.buildTopology()對這些spout/mergeGroup進行分組配置。
1、生成bolt的名稱:genBoltIds
genBoltIds用於為bolt生成一個唯一的id,它使用字母b開頭,然後是一個數字id,接著是group的名稱,然後是第2個id, 第2個group的名稱….。而group的名稱是由這個group包含的Node名稱組成的。
private static Map<Group, String> genBoltIds(Collection<Group> groups) {
Map<Group, String> ret = new HashMap<>();
int ctr = 0;
for(Group g: groups) {
if(!isSpoutGroup(g)) {
List<String> name = new ArrayList<>();
name.add("b");
name.add("" + ctr);
String groupName = getGroupName(g);
if(groupName!=null && !groupName.isEmpty()) {
name.add(getGroupName(g));
}
ret.put(g, Utils.join(name, "-"));
ctr++;
}
}
return ret;
}
private static String getGroupName(Group g) {
TreeMap<Integer, String> sortedNames = new TreeMap<>();
for(Node n: g.nodes) {
if(n.name!=null) {
sortedNames.put(n.creationIndex, n.name);
}
}
List<String> names = new ArrayList<>();
String prevName = null;
for(String n: sortedNames.values()) {
if(prevName==null || !n.equals(prevName)) {
prevName = n;
names.add(n);
}
}
return Utils.join(names, "-");
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
2、添加節點:addNode()
protected Stream addNode(Node n) {
registerNode(n);
return new Stream(this, n.name, n);
}
- 1
- 2
- 3
- 4
- 5
這個方法很簡單,而且,它只在newStream()及newDRPCStream中調用,很明顯這是用於提供一個新的數據源的。而下面的addSourceNode()是用於在bolt中添加下一個處理節點的。
3、添加節點:addSourceNode()
創建一個新節點,指定新節點的父節點(可能多個)。指定多個sources的情況只在merge()方法中被調用multiReduce()時調用。因此這裡只關注一個source的情形。
protected Stream addSourcedNode(Stream source, Node newNode) {
return addSourcedNode(Arrays.asList(source), newNode);
}
protected Stream addSourcedNode(List<Stream> sources, Node newNode) {
registerSourcedNode(sources, newNode);
return new Stream(this, newNode.name, newNode);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
addSourcedNode把source和node同時添加進一個拓撲,即一個流與一個節點。注意這裡的節點不是source這個Stream自身的成員變數_node,而是一個新建的節點,比如在project()方法中的節點就是一個使用ProjectedProcessor創建的ProcessorNode。
return _topology.addSourcedNode(this, new ProcessorNode(_topology.getUniqueStreamId(), _name, keepFields, new Fields(), new ProjectedProcessor(keepFields)));
- 1
- 2
除了註冊新節點 registerNode(newNode)以外,還在每個stream和節點間創建一條邊。
protected void registerSourcedNode(List<Stream> sources, Node newNode) {
registerNode(newNode);
int streamIndex = 0;
for(Stream s: sources) {
_graph.addEdge(s._node, newNode, new IndexedEdge(s._node, newNode, streamIndex));
streamIndex++;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
向圖中添加一個節點。然後若節點中的stateInfo成員不為空,則將該節點放入與存儲序號(StateId)相對應的哈希表_colocate中。_colocate變數將所有訪問同一存儲的節點關聯在一起,並將他們放在一個Bolt中執行。
protected void registerNode(Node n) {
_graph.addVertex(n);
if(n.stateInfo!=null) {
String id = n.stateInfo.id;
if(!_colocate.containsKey(id)) {
_colocate.put(id, new ArrayList());
}
_colocate.get(id).add(n);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
※AndroidStudio常用功能的設置方式
※打造Python的vim環境
TAG:程序員小新人學習 |