好文翻譯丨一份關於系統語言的經驗報告
歡迎下載開源中國APP獲取更多優質文章
近期,系統語言社區出現了很多混亂。我們有「Rust」福音派的傳教,促使我們把所有的東西都用Rust重寫。我們有C++17派,他們承諾C++有現代編程語言的安全性和易用性又有c的性能。然後還有一大堆眾多處於長尾的「系統」編程語言,比如Nim、Reason/OCaml、Crystal、Go和Pony。
就我個人而言,我非常興奮,我們在編程語言理論領域看到了一些有趣的工作。這讓我很興奮地了解到那裡的情況。我解決的很多問題通常都是用C語言中解決的。最近,Go語言已經開始蠶食C的領地。我喜歡C和Go一樣多——他們是很好的語言。很多時候,他們留下了很多值得期待的東西,也讓我很羨慕那些使用Flow、Typescript和Dialyzer等語言的程序員,他們有那麼多工具。在使用Erlang的開發過程中,即使是它的類型系統很基礎,函數式編程對我來說也很容易。
什麼是系統語言?
讓我們稍微回顧一下。 什麼是系統語言? 那麼,我認為這取決於你在什麼位置,以及你問的是誰。 一般來說,我會建議系統語言的定義是一種可以用來實現系統運行的組件語言。
例如,對於JavaScript程序員來說,他們的代碼運行在V8和Spidermonkey之上,並將C ++作為他們選擇的系統語言。 大多數情況下,程序員不需要擔心它們下面的內容。 一個Javascript程序員不需要知道V8是如何工作的,他們也不需要閱讀V8代碼。
同樣,Java程序員必須了解JVM的運行時行為,但在OpenJDK之上,實現語言是不相關的。 即使這些生態系統已經相當成熟,但當計算機出現故障時,總會有人需要了解系統下面發生了什麼。
大多數時候,用系統編程語言編寫的東西很少能提供直接的商業價值,但卻是構建底層基礎架構以提供商業價值的基礎。 他們是一個必須要付出的代價,對於大多數組織來說,投資他們主要就是成本,因為他們的使用不是產品成功的核心。
系統對他人而言
在伺服器端運行的是一套很複雜的系統集合。應用程序與業務邏輯經常使用Javascript、Python、Ruby,Elixir、Java或是用Express、Flask、Sinatra、Phoenix、Dropwizard等自定義工具包來寫。這些框架總是依賴於系統後台很少孤立運行,比如說和資料庫、數據管道、負載平衡器或是高速緩存。
從應用程序開發者的角度來看,資料庫,RPC 庫,負載平衡器和高速緩存只是另一個架構的一塊。從根本上講,這個基礎架構由各種系統組件構成。我們看到它的組件由各種語言寫就的。例如:資料庫已經有不同語言寫的,像用Java 寫的Cassandra、C++ 寫的MongoDB、Erlang 寫的CouchDB和Riak、C 寫的Postgresql還有Go 寫的CockroachDB。數據管道也實現了,例如基於Java 寫的Kafka和Spark或是基於C++ 的Heron。高速緩存很大程度上是C的領域(Redis和Memcache)。
很多項目利用多年發展的共享架構的影響,像LevelDB / RocksDB (C++) 這樣的遍地開花,SQLite 幾乎在任何系統上能夠運行。
什麼支撐著這座大山?
最後,幾乎所有這些系統都依賴於我們操作系統的 Userland 和 Linux 核心。多數仍然使用 Userland 的 Linux 發行版在很大程度上是由 C 語言編寫。雖然 GNU C 擁有數量大得難以致信的擴展,提供了形式化內存模型、線程局部變數等各種能力,但 C 就是 C。目前已經存在使用 Go、Rust 和 Nim 來編寫 Userland 的嘗試,但這些項目都還沒有廣泛應用。
最後,幾乎所有人都在Linux 內核中運行軟體。Linux 核心是 C 編寫的,而且並沒有很好的理由來轉向 C++。也存在一些其它語言編寫的內核,比如 Fuschia 和 MirageOS,但它們都還沒有成熟到可以應用於生產環境。
逃離C
我不願看到我在不同的語言中總是選擇C來編寫一個非常簡單的用戶實用程序。目前Go已經在很大程度上取代了我使用C的習慣。選擇Go與其說出於我的喜好,不如說出自它的務實。 我得到了內存管理,大量的構建工具和更簡單的語言。 我失去了泛型、宏,以及對我所做的事情的洞察力。
用例
我試圖編寫一個程序來完成工作上的小事。 這是一個POSIX信號幫手。 實際上,它是容器的入口點,負責在容器終止期間調用關閉腳本。 它將運行那些註定會運行到終止的服務,因此,出於程序員的錯誤或機器故障或由於外部信號才會導致這些服務終止。
通常,我們想要在關機前做一些事情。 時而是把我們從服務搜索中註銷,時而是在關閉後保存一些應用程序的狀態。 幾乎所有的傳統PID 1具有這種功能,但是將諸如systemd之類的東西放入容器中是不起作用的。
大部分工作僅是在系統調用,像fork、exec、sigmask、wait4和sigtimedwait。事實上它們在容器里運行,這就意味著我們不能憑藉大量運行時間或是一組可用共享庫。我們最多能依賴libc.so.6。
評估
我試圖寫出這一堆語言的評估。在大多數情況下,這就是一個錯誤的開始,並且這種語言不能滿足我對技能的需求。為了儘早解決這件糟糕的事情,我最終妥協了,在Go上編寫代碼。即使與Go很接近,它仍然很尷尬。由於fork/exec通過os/exec進行管理,因此不能簡單地在不破壞exec的情況下開始監聽主goroutine中收到的所有信號。
Nim
Nim從一開始就是一個錯誤。我需要在獨立的進程組中運行代碼,而不是信號包裝器,因此當進程組獲得信號時。如果您手動執行fork/exec過程並且調用setpgid,則可以「手動」執行此操作。
更大的問題是信號處理器的安全性。信號處理程序的安全性在Nim中沒有很好的定義,並且存在一個開放的Github 問題。運行時似乎需要獨立的信號處理進程。
一般來說,它看起來像一個整潔的語言,我真的希望它發展壯大。在指定和類型上也存在尷尬。我最喜歡這門語言的地方是它可以編譯到多個後端,並且可以探索中間層狀態。
Pony
Pony仍處於起步階段。我沒有真正使用它的原始用例。我一定會找出語言所缺失的功能。
有了這個說法,這個語言本身就非常有趣。它是一種簡單的語言,帶有一個簡單的工具鏈。 「構建工具」(ponyc)「正常工作」。它也產生一個小的二進位文件,它具有最小的依賴性。
這就是說,這也是一個錯誤的開始。首先,沒有程序退出,沒有辦法只聽信號。我向Pony團隊提交了一個PR,並且他們合併在這個功能中。
另一個問題是fork和exec進程的機制並不適合我的需求。它沒有能力在文件描述符周圍進行隨機排列,也沒有能力在fork之後運行諸如setpgid之類的東西。
好處在於,核心Pony代碼非常簡單,所以我開始諷刺ASIO子系統開始啟用這些功能。考慮到它的運行時間有多簡單,它不需要太多工作就可以讓Pony完成這項任務,但我沒有時間和精力去編寫RFC。
圍繞這個問題的複雜性很多是這樣,儘管我可能會開始撞擊FFI,並粘上一些位,但我失去了語言的很多好處。這個語言有一個概念,在我看來PLT首先是「能力安全」的概念,但在進一步的理解中,它是一個非常強大的工具。我認為它解決了C ++社區所面臨的一個大問題,即使你編寫了很好的代碼,你帶入的庫也可以像YOLO一樣編寫。
Pony的團隊也有一個偉大的哲學。他們明確關心程序員的生產力。
Pony哲學
本著理查德加布里埃爾的精神,Pony哲學既不是「正確的事物」,也不是「更糟糕或更好的」。而是「完成東西」。
正確性
錯誤是不允許的。如果你無法保證結果是正確的,那麼嘗試完成任務是毫無意義的。
性能
運行速度比正確性更重要。如果性能必須犧牲正確性,試著想出一種新的方式來做事。程序可以讓東西做得越快,效果就越好。除了一個正確的結果,這比任何事都重要。
簡單
簡單性可以犧牲性能。介面比實現更簡單更重要。程序員做得越快,效果就越好。為了提高性能,讓程序員的工作變得更加困難是可以的,但更重要的是讓程序員更容易,而不是讓語言/運行時更容易。
[節選]
Reason (OCaml)
我曾嘗試使用OCaml上Facebook風格的Reason編寫此功能。在此列表上看到一個函數式編程語言真是太棒了。在寫這篇文章時,Reason主要針對前端開發人員,而不是他們稱之為的「原生」開發人員。
Reason是OCaml的原因是什麼呢 - Reason實際上就是他們所聲稱的「Transpiler」,並依靠OCaml來完成繁重的搬移。這意味著我們擁有了20年的OCaml傳統和工具集。這幾乎包括opam軟體包庫中的所有內容。
沒有人告訴我的是OCaml的運行時不能同時完成兩件事。它有一個Python風格的GIL。它僅在2015年推出了多核支持。
大致的實現如下,其中部分省略,因為它們嵌入到內部系統中了。
open Unix;
open Printf;
open Sys;
open ExtUnix.All;
open Thread;
open CCBlockingQueue;
open CCTimer;
/*
* How long do we wait for the discovery deregistration
* process to complete
*/
let discovery_deregistration_timeout = 30.;
/*
* How long do we wait after successfully removing
* outselves out of discovery before we begin to
* forward signals
*/
let discovery_grace_timeout = 60.;
type exn +=
| Unknown_process_state;
let rec waitpid_loop = (pid: int) =>
/* TODO, Wrap in Misc.restart_on_EINTR */
switch (Unix.waitpid([], pid)) {
| (_, WEXITED(return_code)) =>
Log.debug("Process exited with return code: %d
", return_code);
Pervasives.exit(return_code);
| (_, WSIGNALED(signal_code)) =>
Log.info("Process exited with signal code: %d
", signal_code);
Pervasives.exit(128 + signal_code);
| exception (Unix.Unix_error(Unix.EINTR, "waitpid", _)) =>
Log.debug("Received unix interrupt error, restarting waitpid loop");
waitpid_loop(pid);
| _ => raise(Unknown_process_state)
};
type signal_msg =
| DiscoveryDeregistrationComplete
| DiscoveryGracePeriodComplete
| DiscoveryTimeout
| Signal(int);
type exn +=
| UnexpectedMessage(signal_msg);
let rec signal_listener_forwarder = (pid, sigq) => {
switch (CCBlockingQueue.take(sigq)) {
| Signal(sig_val) =>
Log.info("Forwarder processing signal: %d", sig_val);
Unix.kill(pid, sig_val);
| DiscoveryTimeout => ()
| DiscoveryDeregistrationComplete => ()
| DiscoveryGracePeriodComplete => ()
/* add arbitrary wait bit */
};
signal_listener_forwarder(pid, sigq);
};
let rec signal_listener_wait = (pid, sigq, first_signal) => {
/* Wait for some arbitrary timeout, and then forward signals, or if we get a signal,
* start forwarding all signals
*/
switch (CCBlockingQueue.take(sigq)) {
| Signal(sig_val) =>
Log.info(
"Going into forwarding signal mode, during wait, due to signal: %d",
sig_val
);
Unix.kill(pid, first_signal);
Unix.kill(pid, sig_val);
ignore(signal_listener_forwarder(pid, sigq));
| DiscoveryGracePeriodComplete =>
Log.info("Going into forwarding signal mode, wait is completed");
Unix.kill(pid, first_signal);
ignore(signal_listener_forwarder(pid, sigq));
/* Both of these messages can come in late */
| DiscoveryTimeout => ()
| DiscoveryDeregistrationComplete => ()
/* add arbitrary wait bit */
};
signal_listener_wait(pid, sigq, first_signal);
};
let signal_listener_phase1 = (pid, sigq, first_signal, discovery_timer) =>
/* TODO: kickoff discovery registration */
switch (CCBlockingQueue.take(sigq)) {
| DiscoveryTimeout =>
Log.error("Received discovery timeout");
signal_listener_wait(pid, sigq, first_signal);
| DiscoveryDeregistrationComplete =>
Log.info("Discover deregistration completed");
CCTimer.stop(discovery_timer);
/*
* Even though we stopped the timer here,
* we still might get a message from it
* since it"s async
*/
let grace_period_timer = CCTimer.create();
CCTimer.after(grace_period_timer, discovery_grace_timeout, () =>
assert (CCBlockingQueue.try_push(sigq, DiscoveryGracePeriodComplete))
);
signal_listener_wait(pid, sigq, first_signal);
| Signal(sig_val) =>
Log.info(
"Going into forwarding signal mode, during discovery de-registration, due to 2nd signal: %d",
sig_val
);
CCTimer.stop(discovery_timer);
Unix.kill(pid, first_signal);
Unix.kill(pid, sig_val);
signal_listener_forwarder(pid, sigq);
| e => raise(UnexpectedMessage(e))
/* Add successful discovery completion here */
/* add wait for discovery bit */
};
let discovery_deregistration = sigq => {
Log.info("Beginning discovery deregistration");
Unix.sleep(5);
assert (CCBlockingQueue.try_push(sigq, DiscoveryDeregistrationComplete));
};
let signal_listener_thread = (pid, sigq) => {
/*
* In this state, the loop is just listening, and waiting for a signal.
* Once we receive a signal, we kick off deregistration in discovery,
* and we run that with timeout N. Either timeout N must elapse, or
* the discovery deregistration must finish. Once that happens,
* we forward the signal that we last received.
*
* If at any point, during this we receive another signal,
* all bets are off, and we immediately start forwarding
* signals.
*/
let sig_val =
switch (CCBlockingQueue.take(sigq)) {
| Signal(sig_val) => sig_val
| e => raise(UnexpectedMessage(e))
};
let _ = Thread.create((_) => discovery_deregistration(sigq), ());
let timer = CCTimer.create();
CCTimer.after(timer, discovery_deregistration_timeout, () =>
assert (CCBlockingQueue.try_push(sigq, DiscoveryTimeout))
);
signal_listener_phase1(pid, sigq, sig_val, timer);
};
let rec signal_cb_thread = (sigq, signals) => {
let my_sig = Thread.wait_signal(signals);
assert (CCBlockingQueue.try_push(sigq, Signal(my_sig)));
signal_cb_thread(sigq, signals);
};
let parent = (pid: int, signals) => {
let sigq = CCBlockingQueue.create(max_int);
let _ = Thread.create((_) => ignore(signal_listener_thread(pid, sigq)), ());
let _ = Thread.create((_) => ignore(signal_cb_thread(sigq, signals)), ());
waitpid_loop(pid);
};
let child = () => {
/* Replace with real child execution code */
Sys.set_signal(
Sys.sigint,
Signal_handle((_: int) => Log.info("Saw SIGINT"))
);
Sys.set_signal(
Sys.sigterm,
Signal_handle((_: int) => Log.info("Saw SIGTERM"))
);
let _ = Unix.sigprocmask(SIG_UNBLOCK, [Sys.sigint, Sys.sigterm]);
ExtUnix.All.setpgid(0, 0);
Log.info("In child");
let _ = Unix.sleep(1000);
Log.info("Done sleeping");
Pervasives.exit(1);
};
let () = {
Log.set_log_level(Log.DEBUG);
Log.set_output(Pervasives.stderr);
Log.color_on();
let signals = [Sys.sigint, Sys.sigterm];
let _ = Unix.sigprocmask(SIG_BLOCK, signals);
switch (Unix.fork()) {
| 0 => child()
| pid => parent(pid, signals)
};
};
所以,這真的可以正常運行。
Channels
最初,我曾使用的是CCBlockingQueue以提供同步機制。這是在等待信號的線程和正在協同的線程之間傳遞信息的好方法。我在隊列中使用了sum類型,所以我可以繼續並使用匹配。這是一個狀態機的窮人版實現 - 除了我必須能夠詳盡地處理所有點上的所有消息。模式匹配使得這一切變得輕而易舉,但它仍然略顯笨拙。
從這一點上來看,這有點令人生厭,因為我在整個隊列中使用了一種sum類型的消息,因為沒有顯式的方式可以一次等待多個隊列。在我的Rust實現中,我使用了channel_select!宏中的channel(https://github.com/BurntSushi/chan)。如果能夠在OCaml中執行相同的操作,或者存在可用的庫來處理這個問題,那將是非常好的。
我遇到的另一個問題是處理定時器。同樣,因為我使用的機制依賴於這個單一隊列,所以我需要或是使用一個單線程作為計時器的輪轂來推送到期消息,或者為每個計時器啟動一個線程。
信號處理
這是進一步深入文檔。當我註冊一個信號處理程序,而不是使用wait_signal (sigtimedwait)時,很難排查為什麼處於死鎖狀態。我已了解到信號處理程序在OCaml運行時(參見:GIL)中是「不安全的」,並且可能阻止其他線程執行。
系統API
OCaml最棒的部分之一是對系統API的訪問。諸如調用fork、setpgrp等的直接機制非常棒。跨平台的信號轉換邏輯有點令人費解,但並不存在使用文檔無法解決的問題。
構建系統
我認為OCaml人應該學習下Rust人的作法。我最終手工編寫了一個Makefile,並使用了重建,但我設想下:如果項目變得複雜得多,或者涉及多個文件,手工完成這項工作將變得很笨拙。目前有Oasis, ocamlbuild, 和Jenga--但所有這些都比語言本身的學習曲線更陡峭。
Rust
我也能夠使用Rust完成此任務。我真的很想喜歡Rust。Rust感覺囊括了C++的所有複雜性和難度,對於簡單的程序並沒有太多額外的好處。
傲慢
Rust經受了七宗罪之一,傲慢,的折磨。這體現在以下兩種方式中的一種。第一種是借用的檢查器。另一種是優先性能而不是簡便性的策略。
在Rust文檔中,他們寫道:
但是,這個系統確實有一定的成本:學習曲線。許多Rust新用戶體驗到我們喜歡稱之為"與借用檢查器做對抗"的情況,此情景下Rust編譯器拒絕編譯作者認為有效的程序。這經常發生是因為程序員對所有權應如何工作的心理模型與Rust所實現的實際規則不匹配之時。起初你可能會經歷類似的事情。然而,有一個好消息:更有經驗的Rust開發人員報告說,一旦他們在所有權系統的規則下使用了一段時間,他們就會越來越少地與借用檢查器做鬥爭了。
如果某語言存在人們為了獲得高生產力而與語言作鬥爭的問題,那麼該語言可能有些問題,而不是程序員本身呢?相反,Rust社區繼續炫耀他們的語言的正確性 - 這是一種寶貴的屬性,但沒有後退一步,並考量或許不同的默認值可能更有意義。
我對默認值和借用檢查器的最大疑問是FP中通常通過複製傳值的地方 - 在Rust中是值傳遞,而不是它假定你想通過引用傳遞。因此,你需要手動克隆之,然後作為替代傳遞該克隆後的版本。雖然它有自動執行此功能的機制,但它遠不符合人體工程學。
通過引用或借用的參數比默認情況下的克隆更高效。 一般來說,電腦越來越快,但系統越來越複雜。
性能不是主要問題 - 易於編程和正確性才是—?Joe Armstrong (http://erlang.org/pipermail/erlang-questions/2014-June/079613.html)
我認為Rust錯過了這個。
#[macro_use]
extern crate slog;
extern crate sloggers;
use sloggers::Build;
use sloggers::terminal::{Destination, TerminalLoggerBuilder};
use slog::Logger;
use std::str::FromStr;
#[macro_use]
extern crate chan;
extern crate nix;
use std::process::Command;
use std::os::unix::process::CommandExt;
use std::os::unix::process::ExitStatusExt;
use std::thread;
use std::process::Child;
use std::process::exit;
use nix::sys::signal::*;
use chan::Sender;
use chan::Receiver;
extern crate clap;
extern crate humantime;
use std::time::Duration;
use clap::App;
use clap::Arg;
use reqwest::Url;
extern crate reqwest;
fn run_command(cmd: &str, args: Vec<String>) -> std::io::Result<Child> {
return Command::new(cmd)
// This is to put in a separate process group, so signals don"t propogate
.before_exec(|| {
let mut old_signal_set = SigSet::empty();
let all_signals = SigSet::all();
assert!(!nix::unistd::setpgid(nix::unistd::Pid::from_raw(0),
nix::unistd::Pid::from_raw(0))
.is_err());
assert!(sigprocmask(SigmaskHow::SIG_UNBLOCK,
Some(&all_signals),
Some(&mut old_signal_set)).is_ok());
Ok(())
})
.args(args)
.spawn();
}
fn forward_signals(
logger: Logger,
pid: nix::unistd::Pid,
sig_rx: Receiver<nix::sys::signal::Signal>,
) {
/* This now just forward signals */
loop {
let signal = sig_rx.recv().unwrap();
info!(logger, "Forwarding signal: {:?}", signal);
assert!(nix::sys::signal::kill(pid, signal).is_ok());
}
}
enum DegistrationResult {
/* We received another signal, so we"re going into forwarding signals mode */
Interrupted(nix::sys::signal::Signal),
TimedOut,
Success,
Failed,
}
enum WaitGracePeriodResult {
Interrupted(nix::sys::signal::Signal),
Success,
}
fn run_deregistration(logger: Logger, config: Config, sender: Sender<DegistrationResult>) {
info!(logger, "Beginning discovery deregistration");
let discovery_host = format!(
"OBFUSCATED",
region = config.region,
environment = config.environment
);
let path = format!(
"OBFUSCATED",
app = config.app,
instance = config.instance
);
let url = &Url::parse(&format!("OBFUSCATED", discovery_host, path)).unwrap();
debug!(
logger,
"Discovery deregistration proceeding against: {}",
url.to_string()
);
let params = [("value", "OBFUSCATED")];
for _ in 0..3 {
let client = reqwest::Client::builder()
.timeout(Some(Duration::from_secs(3)))
.build()
.unwrap();
match client.put(url.clone()).form(¶ms).send() {
Ok(mut resp) => {
if resp.status().is_success() {
info!(logger, "Deregistration succeeded: {}", resp.text().unwrap());
sender.send(DegistrationResult::Success);
return;
} else {
error!(logger, "Deregistration failed, retrying: {}", resp.status());
}
}
Err(err) => {
error!(logger, "Deregistration failed, retrying: {:?}", err);
}
}
}
sender.send(DegistrationResult::Failed);
error!(logger, "Discovery deregistration failed");
}
fn deregistration(
logger: Logger,
config: Config,
sig_rx: &Receiver<nix::sys::signal::Signal>,
) -> DegistrationResult {
let discovery_deregistration_timeout_chan = chan::after(config.deregistration_timeout);
let (send, recv) = chan::async();
thread::Builder::new()
.name("discovery-degistration".to_string())
.spawn(move || run_deregistration(logger, config, send))
.unwrap();
chan_select! {
discovery_deregistration_timeout_chan.recv() => {
return DegistrationResult::TimedOut
},
recv.recv() -> res => match res {
Some(status) => return status,
None => return DegistrationResult::Failed
},
sig_rx.recv() -> new_signal => {
return DegistrationResult::Interrupted(new_signal.unwrap())
}
}
}
fn wait_grace_period(
_logger: &Logger,
config: Config,
sig_rx: &Receiver<nix::sys::signal::Signal>,
) -> WaitGracePeriodResult {
let discovery_grace_period_timeout = chan::after(config.discovery_wait);
chan_select! {
discovery_grace_period_timeout.recv() => {
return WaitGracePeriodResult::Success
},
sig_rx.recv() -> new_signal => {
return WaitGracePeriodResult::Interrupted(new_signal.unwrap())
}
}
}
fn background_watcher(
logger: Logger,
config: Config,
pid: nix::unistd::Pid,
sig_rx: Receiver<nix::sys::signal::Signal>,
) {
/*
* In this state, the loop is just listening, and waiting for a signal.
* Once we receive a signal, we kick off deregistration in discovery,
* and we run that with timeout N. Either timeout N must elapse, or
* the discovery deregistration must finish. Once that happens,
* we forward the signal that we last received.
*
* If at any point, during this we receive another signal,
* all bets are off, and we immediately start forwarding
* signals.
*/
let first_signal = sig_rx.recv().unwrap();
/* Phase 1 */
info!(logger, "Entering do deregistration phase");
match deregistration(logger.clone(), config.clone(), &sig_rx) {
DegistrationResult::Interrupted(new_signal) => {
warn!(logger, "Discovery deregistration process interrupted");
assert!(nix::sys::signal::kill(pid, first_signal).is_ok());
assert!(nix::sys::signal::kill(pid, new_signal).is_ok());
return forward_signals(logger, pid, sig_rx);
}
DegistrationResult::TimedOut => {
error!(logger, "Discovery deregistration timed out");
}
DegistrationResult::Success => info!(logger, "Discovery deregistration completed"),
DegistrationResult::Failed => error!(
logger,
"Discovery deregistration failed, continuing to grace period"
),
}
info!(logger, "Entering waiting for grace period phase");
/* Phase 2 */
match wait_grace_period(&logger, config.clone(), &sig_rx) {
WaitGracePeriodResult::Interrupted(new_signal) => {
warn!(logger, "Discovery grace period wait interrupted");
assert!(nix::sys::signal::kill(pid, first_signal).is_ok());
assert!(nix::sys::signal::kill(pid, new_signal).is_ok());
return forward_signals(logger, pid, sig_rx);
}
WaitGracePeriodResult::Success => {
info!(logger, "Discovery grace period successfully elapsed");
assert!(nix::sys::signal::kill(pid, first_signal).is_ok());
return forward_signals(logger, pid, sig_rx);
}
}
}
fn signal_cb(logger: Logger, ss: SigSet, tx: Sender<nix::sys::signal::Signal>) {
loop {
match ss.wait() {
Err(e) => error!(logger, "Failed to read signal: {:?}", e),
Ok(val) => {
info!(logger, "Received signal: {:?}", val);
tx.send(val);
}
};
}
}
// ss is the set of signals which we intend for the signal callback handler to process / listen to
fn run_signal_watcher(logger: Logger, ss: SigSet, tx: Sender<nix::sys::signal::Signal>) {
// Has to be constructed in the main thread
thread::Builder::new()
.name("signal-cb".to_string())
.spawn(move || signal_cb(logger, ss, tx))
.unwrap();
}
fn run_signal_processor(
logger: Logger,
config: Config,
pid: i32,
rx: Receiver<nix::sys::signal::Signal>,
) {
thread::Builder::new()
.name("signal-handler".to_string())
.spawn(move || background_watcher(logger, config, nix::unistd::Pid::from_raw(pid), rx))
.unwrap();
}
fn run(logger: Logger, config: Config) {
let mut old_signal_set = SigSet::empty();
let mut ss = SigSet::empty();
ss.add(SIGINT);
ss.add(SIGTERM);
// Block the signals
assert!(sigprocmask(SigmaskHow::SIG_BLOCK, Some(&ss), Some(&mut old_signal_set)).is_ok());
let (signal_tx, signal_rx) = chan::async();
run_signal_watcher(logger.clone(), ss.clone(), signal_tx);
let (cmd, args) = config.command.split_first().unwrap();
let mut cmd_handle = run_command(cmd, args.to_vec()).unwrap();
run_signal_processor(
logger.clone(),
config.clone(),
cmd_handle.id() as i32,
signal_rx,
);
debug!(logger, "Running command");
let res = cmd_handle.wait();
// TODO: Fix up the command cleanup code
info!(logger, "Cmd completed: {:?}", res);
match res {
Ok(exit_status) => {
match exit_status.code() {
Some(code) => exit(code),
// This means we"ve been terminated by a signal
None => (),
};
match exit_status.signal() {
Some(signal) => exit(signal + 128),
None => (),
}
exit(0);
}
Err(_other) => {
error!(logger, "Failed to run command");
exit(1);
}
}
}
#[derive(Debug, Clone)]
struct Config {
deregistration_timeout: Duration,
discovery_wait: Duration,
region: String,
instance: String,
app: String,
environment: String,
command: Vec<String>,
}
fn main() {
let app = App::new("signal-watcher")
.version("1.0")
.arg(
Arg::with_name("deregistration-timeout")
.long("deregistration-timeout")
.takes_value(true)
.value_name("SECONDS")
.help("How long to wait for discovery deregistration call")
.default_value("15s")
.required(true)
.validator(validate_timeout),
)
.arg(
Arg::with_name("discovery-wait")
.long("discovery-wait")
.takes_value(true)
.value_name("SECONDS")
.validator(validate_timeout)
.required(true)
.help("How long to wait after deregistration before forwarding the signal")
.default_value("60s"),
)
.arg(
Arg::with_name("region")
.long("region")
.takes_value(true)
.value_name("REGION")
.help("What EC2 region are we in")
.env("EC2_REGION")
.required(true),
)
.arg(
Arg::with_name("environment")
.long("environment")
.takes_value(true)
.value_name("NETFLIX_ENVIRONMENT")
.help("Which environment are we running in")
.env("NETFLIX_ENVIRONMENT")
.required(true),
)
.arg(
Arg::with_name("app")
.long("app")
.takes_value(true)
.value_name("NETFLIX_APP")
.help("Which is our app name")
.env("NETFLIX_APP")
.required(true),
)
.arg(
Arg::with_name("instance-id")
.long("instance-id")
.takes_value(true)
.value_name("INSTANCEID")
.help("EC2 / Container instance ID")
.env("EC2_INSTANCE_ID")
.required(true),
)
.arg(
Arg::with_name("command")
.last(true)
.value_name("COMMAND")
.multiple(true)
.takes_value(true)
.required(true),
)
.arg(
Arg::with_name("log-level")
.long("local-level")
.takes_value(true)
.value_name("LOG_LEVEL")
.env("LOG_LEVEL")
.validator(validate_loglevel)
.default_value("info"),
)
.get_matches();
let log_level =
sloggers::types::Severity::from_str(app.value_of("log-level").unwrap()).unwrap();
let logger = new_logger(log_level);
let config: Config = Config {
deregistration_timeout: app.value_of("deregistration-timeout")
.unwrap()
.parse::<humantime::Duration>()
.unwrap()
.into(),
discovery_wait: app.value_of("discovery-wait")
.unwrap()
.parse::<humantime::Duration>()
.unwrap()
.into(),
region: app.value_of("region").unwrap().to_string(),
environment: app.value_of("environment").unwrap().to_string(),
instance: app.value_of("instance-id").unwrap().to_string(),
app: app.value_of("app").unwrap().to_string(),
command: app.values_of("command")
.unwrap()
.map(|s: &str| s.to_string())
.collect(),
};
run(logger, config);
}
fn new_logger(severity: sloggers::types::Severity) -> Logger {
let mut builder = TerminalLoggerBuilder::new();
builder.level(severity);
builder.destination(Destination::Stderr);
let logger = builder.build().unwrap();
return logger;
}
fn validate_timeout(v: String) -> Result<(), String> {
let num: Duration = match v.parse::<humantime::Duration>() {
Ok(n) => n.into(),
Err(e) => return Err(e.to_string()),
};
if num < Duration::from_secs(1) {
return Err(String::from(
"Timeouts must be greater than or equal to 1 second",
));
}
if num > Duration::from_secs(300) {
return Err(String::from("Timeouts must be smaller than 300 seconds"));
}
return Ok(());
}
fn validate_loglevel(v: String) -> Result<(), String> {
match sloggers::types::Severity::from_str(&v) {
Ok(_) => Ok(()),
Err(e) => Err(e.to_string()),
}
}
一切仍然很糟糕
如果你已經得到這麼多,你會意識到一切都還很糟糕。 如果我想在系統的這一層實現任何東西,我的選擇基本上仍然是C和Go。 我很興奮,因為一些新的參與者已經進入了這個圈子。 我不確定我是否會想要使用Rust,除非他們有巨大的態度調整。 我很高興看到Nim和Pony成熟了。
對於技術達人來說,廣納知識點是進步的源泉。通過閱讀技術文章我們可以學到業務技能,也能了解行業動態。開源中國翻譯頻道旨在每天為用戶推薦並翻譯優質的外網文章。再也不用怕因為英語不過關,被擋在許多技術文章的門外。點擊「了解更多」,獲取往期優質翻譯文章。
※每日一博丨為什麼 PHP 程序員應該學習使用 Swoole
※Web 伺服器 Nginx 幕後公司完成 4300 萬美元 C 輪融資
TAG:OSC開源社區 |