ZUUL 處理 gerrit patch-set 的流程
本文基於ZUUL 2版本,從源代碼的角度解讀一個gerrit patch-set事件被處理的流程。ZUUL服務的核心進程主要包括zuul-server, zuul-merger, zuul-launcher, zuul-cloner, zuul-client。而對配置文件zuul-conf, layout.yaml的處理主要通過zuul-server進程完成的。
進程zuul-server啟動過程中對配置文件的處理
根據zuul.conf的參數啟動相應的服務
[zuul]部分定義了,layout文件的路徑,zuul-server日誌配置文件,以及啟動時的pid文件
[gearman_server]部分定義了,gearman server啟動的IP地址等參數
[gearman]啟動 gear client的參數
[gerrit]gerrit server的相應參數
[launcher] zuul-launcher進程啟動的參數
[merger] zuul-merger進程啟動的參數
[webapp]WEB服務對應的參數
觸發一個layout 相關的事件
reconfigration管理事件
self.sched.reconfigure(self.config)
def reconfigure(self, config):
event = ReconfigureEvent(config)
self.management_event_queue.put(event)
scheduler線程會處理對應的管理事件(Reconfigure,Promote,Dequeue,Enqueue)
解析配置_parseConfig,初始化pipeline的trigger事件類型並進行_postConig
通過zuul.conf文件中layout配置的路徑找到配置文件,進行解析。
self._parseConfig(self.config.get("zuul", "layout_config"), self.connections)
.......
for trigger_name, trigger in self.triggers.items():
if trigger_name in conf_pipeline["trigger"]:
manager.event_filters += trigger.getEventFilters(
conf_pipeline["trigger"][trigger_name])
......
for pipeline in layout.pipelines.values():
pipeline.manager._postConfig(layout)
獲取gerrit連接的source driver,動態導入相應的庫,返回一個driver對象,並註冊到connection的driver列表中。source driver中定義了獲取和更新change的方法,每個pipeline都必須擁有一個source。
pipeline.source = self._getSourceDriver(conf_pipeline.get("source", "gerrit"))
"source": {"gerrit": "zuul.source.gerrit:GerritSource",}
connection.registerUse(dtype, driver_instance)
patch-set 被 zuul 捕捉的過程
Gerrit connection啟動的線程GerritWatcher,會創建到gerrit的SSH連接並執行「gerrit stream-events」,同時對gerrit連接上的stdout event監聽。
zuul.connection.gerrit.GerritWatcher
_run: client = paramiko.SSHClient()
stdin, stdout, stderr = client.exec_command(stream_events_cmd, timeout=self.timeout)
self._listen(stdout, stderr)
_listen: poll = select.poll()
poll.register(stdout.channel)
ret = poll.poll(self.poll_timeout)
for (fd, event) in ret:
if fd == stdout.channel.fileno():
if event == select.POLLIN:
self._read(stdout)
_read: l = fd.readline()
data = json.loads(l)
self.gerrit_connection.addEvent(data)
patch-set event被Watcher線程監聽到以後,添加事件到connection的事件隊列中去
Gerrit connection啟動的線程GerritEventConnector對事件隊列進行處理。GerritEventConnector的_handleEvent()會對event屬性進行解析,並添加到scheduler的trigger event隊列中去。
_handEvent: 解析event信息,並添加event到connection的Scheduler 的event隊列中。connection的Scheduler通常就是zuul-server啟動的scheduler線程。self.connection.sched.addEvent(event)
Scheduler.addEvent: self.trigger_event_queue.put(event)
獲取change的詳細信息
source.gerrit.GerritSource._getchange()
首先檢查緩存,沒有的話更新緩存,然後_updateChange, 設置ZUUL change 對象的屬性。
change 屬性 [data, project, branch, url, isMerged, approvals, open, status, owner]
通過"dependsOn"屬性檢查是否有依賴
通過commit message檢查是否有依賴
Scheduler線程處理trigger event隊列
輪詢layout中的pipeline,能查到對應的change
change = pipeline.source.getChange(event, project)
通過pipeline中的manager類型,match event,並添加change到 change queue
independent:初始化 ChangeQueue,添加project信息到 change queue中,把change queue添加到pipeline的queue列表中。
dependent:直接通過 change的project信息獲取 change queue
Scheduler線程會處理pipeline的change queue
1. 輪詢pipeline的change queue列表 queues
2. 遍歷change queue的 queue 列表
3. 遍歷changeItem,逐一處理
3.1. 檢查相關信息
3.2.在zuul本地倉庫,Merge change item。self.sched.merger.mergeChanges(merger_items, item.current_build_set, self.pipeline.precedence),zuul.merger.client.mergerChanges 在zuul-server 進程啟動時設置了Scheduler的merger,ZUUL-MERGER進程扮演者gear.Worker的角色,處理merger的task。
merger = zuul.merger.client.MergerClient(self.config, self.sched)
self.schedu.setMerger(merger)
mergerChanges: submitJob("merger:merger", dict(changeItem), buildSet, precedence)
3.3.launchJobs ,launch Job是通過gear.Client的submitJob向gear server發任務請求,gear server通過ZMQ消息隊列發布到各個節點上,註冊該任務的gear worker訂閱此任務,Jenkins gearman plugin擔當gear worker管理者的角色,在對應的slave上創建 worker executor線程。
self.pipeline.findJobsToRun(item, self.sched.mutex)
self.sched.launcher.launch(job,item,self.pipeline,dependent_items)
self.gearman.submitJob(gearman_job,precedence=precedence,timeout=300)
Jenkins端以result事件的形式觸發job result事件到gear server
gear server的消息轉發機制非本人主要內容,稍後關注單獨解析文章。
ZUUL gearman Client 處理返回的 result event
job = super(ZuulGearmanClient, self).handleWorkComplete(packet)
self.__zuul_gearman.onBuildCompleted(job)
changeItem 對應的Jobs result結果全部返回時dequeue Item。
_processOneItem:self.pipeline.areAllJobsComplete(item)
self.dequeueItem(item)
發送 report 到 gerrit對應的patch-set
GerritConnection.review: gerrit review --project $ --message $ $
TAG:派森狗 |