數據流是如何工作的?
21CTO導讀:在本文中,我們將介紹Node.js和Java Streams以及Apache Kafka和Amazon Kinesis等工具,並簡述每個工具的用法。
流是一系列元素的合集,就如同數組是存儲值序列的數據結構。那麼,一個流就是一個數組?呃,不是的 - 讓我們看一下流到底是什麼,來看它是如何工作的。
首先,數據流不存儲元素,而數組存儲元素。所以流並不是數組。此外,雖然集合和數組是有限的大小,但流是無限制。但是,如果流不存儲元素,它如何成為一系列元素?
流實際上是一系列數據從一個點移動到另一個點,但它們是按需計算的。因此,流至少要有一個源,如數組,列表,I/O資源等。讓我們以一個文件為例:當一個文件被打開進行編輯時,它的全部或部分會保留在內存中,允許進行更改,所以只有當它關閉時,才能保證不會丟失或損壞任何的數據。
幸運的是,流可以按塊讀取/寫入數據塊,而不會立即緩衝整個文件。您知道,緩衝區是物理內存存儲區域(通常是RAM),用於在數據從一個位置移動到另一個位置時臨時存儲數據。
在Node.js中,存在四種數據流類型,如下:
可寫:可以寫入數據的流。例如,寫入文件,發送HTTP請求/響應。
可讀:可從中讀取數據的流。例如,從文件讀取,接收HTTP請求/響應。
雙工:可讀和可寫的流。例如,TCP套接字。
轉換:可以在寫入和讀取數據時修改或轉換數據的雙工數據流(例如,zlib壓縮文件)。
在流上運行並生成另一個流的函數稱為過濾器,可以在管道中連接,如下代碼所示:
Arrays.asList(10,3,13,4,1,52)
.stream()
.filter(number -> number % 2 == 0) //10,4,52
.sorted() //4,10,52
.skip(1) //10,52
.forEach(System.out::println); //prints 10 and prints 52
談到Java Streams,Java提供給開發者較輕鬆處理的API。 JavaDoc中這樣定義:
流操作分為中間(流生成)操作和終端(生成值)操作。
所以,如果我這樣做時:
List numbers = Arrays.asList(10,3,13,4,1,52);
Stream numberStream = numbers.stream()
.filter(number -> number % 2 == 0) //10,4,52
.sorted() //4,10,52
.skip(1) //10,52
.peek(System.out::println); //used to execute something while stream is processing
流尚未執行,因為它足夠聰明地等待終端操作被調用,如forEach,reduce,anyMatch等。 除了具有聲明樣式之外,一旦滿足終端操作,它也足夠智能地停止。 請看如下代碼:
Integer integer = Arrays.asList(10,3,13,4,1,52)
.stream()
.filter(number -> number % 2 == 0)
.sorted()
.skip(1)
.peek(System.out::println) //it prints only 10 instead of 10 and 52
.findFirst().get();
以上的數據流上有sorted(),filter方法將在其流上運行,但skip不會在整個過濾和排序的流上運行。再看如下代碼例子:
Integer integer = Arrays.asList(10,3,13,4,1,52)
.stream()
.filter(number -> number % 2 == 0)
.findFirst().get();
有些人可能認為過濾器會在每個元素上運行然後再做查找第一個操作。但我們前面說過,Java的數據流處理足夠智能。
Java Streams的另一個有趣的事情是並行流。如下代碼:
Arrays.asList(10,3,13,4,1,52,2,6,8)
.parallelStream()
.filter(number -> number % 2 == 0)
.forEach(number -> System.out.println(Thread.currentThread())); //prints which thread is being executed
當流並行執行時,Java運行時將流分區為多個子數據流。聚合操作迭代並且並行處理這些子流,然後再組合結果。
以上,我們已經理解了流的工作原理,再看一些工具。
Apache Kafka
Kafka是一個分散式流媒體平台,具有三個主要功能:
1、發布和訂閱記錄流,類似於消息隊列或企業消息傳遞系統。
2、以容錯,持久的方式存儲記錄流。
3、記錄處理流的發生時間。
其目的是實現流的實時處理,使用Kafka Connect支持許多數據源(例如JDBC,ActiveMQ,REST API等)。 Kafka一些用例包括:消息傳遞,網站活動跟蹤,度量標準,日誌聚合,流處理,事件源和提交日誌。
以下是使用Kafka Streams API 應用的內部架構。 它提供了包含多個流線程的Kafka Streams應用的邏輯視圖,每個線程包含多個流任務
亞馬遜Kinesis
Amazon Kinesis是完全託管的Amazon Web Service(AWS)內的產品,用來實時收集,處理和分析視頻和數據流。 Kinesis有以下四種功能:
1、Kinesis視頻流 - 捕獲,處理和存儲視頻流。
2、Kinesis數據流 - 捕獲,處理和存儲數據流。
3、Kinesis Data Firehose - 將數據流載入到AWS數據存儲中。
4、Kinesis Data Analytics - 使用標準SQL分析數據流。
其目的還在於實現流的實時處理以及一些實際用例:構建視頻分析應用程序,從批處理演變為實時分析,構建實時應用以及分析IoT物聯網設備數據等。
以下是Kinesis數據流的工作原理:
小結
介紹了流的工作原理。我們一起看到了關於Node.js流和Java Streams以及Apache Kafka和Amazon Kinesis等工具的優點和缺點。內容介紹完畢,希望您能喜歡本文,歡迎點贊轉發。
作者:Raphael Amoedo
譯者:海力布
來源:https://dzone.com/articles/how-a-stream-works
※RFC7807:API錯誤處理最佳實踐
※乾貨:區塊鏈和數字簽名技術
TAG:21世紀技術官學院 |