kafka消息隊列學習整理
原理介紹
概述
kafka是一款開源高性能的分散式消息系統。在系統間的消息解耦處理和流計算有著廣泛的應用。最近在學習的過程中遇到了一些問題,整理出來以便查看。希望對剛入門學習小伙們有所幫助。本文只針對kafka的消息處理介紹,不涉及流處理部分。
版本
- jdk : 1.8.0_181
- springBoot: 2.1.0.RELEASE
- spring-kafka: 2.2.2.RELEASE
- kafka-clients: 2.0.0
之所以列出版本,因為在SpringCloud集成過程中出現很多低版本api的兼容問題。建議在新集成的項目中儘可能使用新的穩定版本。相關版本參考請見spring官方說明(https://spring.io/projects/spring-kafka#overview)。
上圖是spring-kafka、kafka server、kafka-clients版本的對應關係。spring項目可根據情況選擇接入。
重要概念
- topic 主題
topic是消息發布的一個分類名稱標識。發布的消息按不同topic name定義,消費端就可以通過訂閱不同的name來關注自己感興趣的主題消息。
每一個topic,kafka集群會維護n(n>=1)個partition(分區)日誌,如下圖。
如上圖所示:每個partition分區中的record有序。寫入消息在尾部以record log文件結構不斷追加。每個record都會被分配一個sequential id number,該number在partition內唯一,kafka中稱之為offset(偏移值)。
kafka集群服務端(broker)在有效期內(configurable retention period),會持久化所有發布的record記錄到磁碟,不管這些record是已消費還是未消費的。如the retention policy(保留策略)設置為2天,發布的record在2天內都將可用,無論該記錄是否有被消費過。2天後這個記錄將被清理以釋放磁碟空間。
由於kafka的性能隨數據的增長是恆定的,所以只要磁碟空間允許,長時間保留數據也不是問題。
kafka中每個消費者事實上只需要拿到record在log文件中的offset位置信息,就能快速的找到具體消息內容。不同於其他消息隊列,消息一旦被消費就馬上被清理,kafka會一直保留直到保存策略設置的時效到期後才統一清理。這樣消費下一個消息只需調整對應值(offset+1),消息的處理過程是相當高效的。消費的消息不馬上清理的另一好處,當消費的消息出現異常時,消費者程序可以自行調整offset重新消費老的數據;或者處理一些有時效的消息時,消費者程序可以跳過已過時效的消息而只消費最新的消息。
- Distribution 分散式
kafka的record消息內容存儲於log日誌文件中,log文件以partition分區歸類組織(可以把partition理解為一個個文件夾,該文件夾下存放所有該分區的log文件)。一般一個topic對應多個partition分區(可通過配置參數設置),為了保證數據可靠性和系統性能,一個partition會複製多份保存到不同的kafka broker集群機器上。如一個partition設置了3份冗餘備份,其中一份作為leader角色承擔讀寫任務。另外2份擔任followers角色,負責實時同步leader的更新消息,當發現leader出現故障,通過zookeeper投票機制選舉新的leader,接管出問題的集群,以保證服務的穩定可用。
- Producers 生產者
producer按topic主題的方式發布消息到broker上的某個partition分區。發布時producer可選擇按輪詢的方式負載均衡到topic下的partition以提高並發,增大吞吐率。也可以自行指定某個partition,或按key hash方式到partition,以滿足某些特定時間順序要求的場景。
- Consumers 消費者
consumer消費者按consumer group消費組的方式訂閱topic消息,consumer group把topic下的多個partition按一定規則分配給組內的各個consumer消費者。一個partition分區只能分配給組內的一個consumer,但一個consumer可以消費多個不同的partition分區。如下圖 Consumer Group A中p0被C1分配消費時,就不能同時也被組內的C2消費。但C1可同時消費P0和P3不同的分區。partition針對不同的組沒有這種限制,如Consumer Group A和Consumer Group B均能消費P0。通過這種機制,按不同的組合搭配可滿足業務上單點和群發消費的需求。
broker集群安裝
kafka broker軟體的安裝,外面有很多文章講的很詳細了。以下是安裝的詳細參考教程。
Kafka的集群配置:https://www.cnblogs.com/5iTech/articles/6043224.html
zookeeper管理工具配置: https://blog.csdn.net/qq_34173549/article/details/80598056
kafka-manager安裝: https://baijiahao.baidu.com/s?id=1598139489983645370&wfr=spider&for=pc
以上安裝教程需要注意的地方:
kafka broker 的server.properties文件
#broker id標誌,集群部署時不同server對應值不同
broker.id=0
#監聽埠,按集群部署時需修改對應broker server ip和埠
listeners=PLAINTEXT://{broker ip地址}:9092
#消息topic對應分區數,默認1,也可以通過程序client端動態設置
num.partitions=1
#日誌保留時間,根據業務具體情況調整
log.retention.hours=168
#zookeeper鏈接地址
zookeeper.connect={zookeeper1 ip地址}:2181,{zookeeper2 ip地址}:2181,{zookeeper3 ip地址}:2181
kafka-manager工具目前僅更新到kafka1.1版本,最新的kafka 2.*版本暫未支持。
代碼集成
gradle jar包依賴
添加spring-boot gradle 插件,springBoot版本2.1.0.RELEASE。
buildscript {
ext {
springBootVersion = "2.1.0.RELEASE"
}
repositories {
mavenLocal()
maven{
url "http://maven.aliyun.com/nexus/content/groups/public/"
}
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
}
}
添加依賴管理 gradle 插件。
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:Finchley.SR2"
// mavenBom org.springframework.boot.gradle.plugin.SpringBootPlugin.BOM_COORDINATES
}
}
引入spring-boot 和依賴管理
apply plugin: "idea"
apply plugin: "java"
apply plugin: "maven"
apply plugin: "org.springframework.boot"
apply plugin: "io.spring.dependency-management"
添加依賴jar
dependencies {
compile "org.springframework.boot:spring-boot-starter-web"
compile "org.springframework.boot:spring-boot-starter-actuator"
// compile "org.springframework.kafka:spring-kafka:2.2.2.RELEASE"
compile "org.springframework.kafka:spring-kafka"
// Use JUnit test framework
testCompile "junit:junit:4.12"
}
配置文件屬性添加
spring.kafka.bootstrap-servers=localhost:9092
#procedure要求leader在考慮完成請求之前收到的確認數,用於控制發送記錄在服務端的持久化,其值可以為如下:
#acks = 0 如果設置為零,則生產者將不會等待來自伺服器的任何確認,該記錄將立即添加到套接字緩衝區並視為已發送。在這種情況下,無法保證伺服器已收到記錄,並且重試配置將不會生效(因為客戶端通常不會知道任何故障),為每條記錄返回的偏移量始終設置為-1。
#acks = 1 這意味著leader會將記錄寫入其本地日誌,但無需等待所有副本伺服器的完全確認即可做出回應,在這種情況下,如果leader在確認記錄後立即失敗,但在將數據複製到所有的副本伺服器之前,則記錄將會丟失。
#acks = all 這意味著leader將等待完整的同步副本集以確認記錄,這保證了只要至少一個同步副本伺服器仍然存活,記錄就不會丟失,這是最強有力的保證,這相當於acks = -1的設置。
#可以設置的值為:all, -1, 0, 1
spring.kafka.producer.acks=1
#關閉自動提交offset
spring.kafka.consumer.enable-auto-commit=false
#enable-auto-commit=false時人工提交offset ack
spring.kafka.listener.ack-mode=manual
說明:關閉消費消息自動提交(spring.kafka.consumer.enable-auto-commit=false),是為了業務處理消息時更靈活。enable-auto-commit=true時,業務拿到消息,spring框架會自動提交消息已消費的ack反饋給kafka,kafka收到回饋認為該條消息已成功消費,然後移動offset到新的位置。但實際情況往往是,業務拿到消息時會有相關邏輯處理,在處理的過程中可能發生種種意外(系統崩潰、發版重啟等),導致消息最終沒有成功消費。如果enable-auto-commit=false時,ack反饋將由業務自行控制,當出現異常報錯時,有問題的消息可以重複消費。保證不會漏消費消息。
spring.kafka.listener.ack-mode=manual設置監聽器的ack反饋模式為人工,該配置只有spring.kafka.consumer.enable-auto-commit=false才生效。
kafka spring 代碼configration添加
@Configuration
public class KafkaConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory,
KafkaTemplate<Object, Object> template) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
// 消息消費異常時,嘗試重新消費異常消息3次。若仍不成功,把問題消息放入到死信(dead-letter) topic隊列中,後期延遲處理。
factory.setErrorHandler(new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(template), 3)); // dead-letter after 3 tries
return factory;
}
}
創建kafka生產者
@Component
public class KafkaProducer {
public static final String TEST_TOPIC = "kafka_msg";
@Autowired
private KafkaTemplate<Object,Object> kafkaTemplate;
/**
* 定時任務
*/
@Scheduled(cron = "00/10 * * * * ?")
public void send(){
System.out.println("start to send kafka msg...");
String message = UUID.randomUUID().toString();
KafkaMessage msg = new KafkaMessage();
msg.setId(message);
msg.setCurrentTime(String.valueOf(System.currentTimeMillis()));
msg.setMessage("content is "+message);
//1. send方法無key時,topic消息按輪詢方式負載到partition上
ListenableFuture future = kafkaTemplate.send(TEST_TOPIC, JSON.toJSONString(msg));
//2. send方法有key時,topic按hash方式負載到partition上
// kafkaTemplate.send(TEST_TOPIC,msg.getId(),JSON.toJSONString(msg));
future.addCallback(o -> System.out.println("send-消息發送成功:" + message), throwable -> System.out.println("消息發送失敗:" + message));
}
@Bean
public NewTopic kafkaMsgTopic() {
//定義topic的分區,複製份數
return new NewTopic(TEST_TOPIC, 3, (short) 1);
}
@Bean
public NewTopic kafkaMsgTopicDLT() {
//dead letter topic 必須與original topic的partition一致,否則exception處理時找不到對應partition,將無法添加exception msg到 {original topic}.DLT 中。
//定義topic的分區,複製份數
return new NewTopic(TEST_TOPIC+".DLT", 3, (short) 1);
}
}
kafka message對象
public class KafkaMessage implements Serializable{
private String id;
private String currentTime;
private String message;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getCurrentTime() {
return currentTime;
}
public void setCurrentTime(String currentTime) {
this.currentTime = currentTime;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
說明:kafkaMsgTopicDLT方法是創建dead-letter(死信隊列)的topic,注意dead-letter的partition要與發生exception的topic初始化信息一致。因為exception的message有多個partition而dead-letter沒有時,spring預置的DeadLetterPublishingRecoverer類(上面KafkaConfig初始化時的定義)將不知道應該把exception message發到哪個dead-letter partition中。
spring默認的dead-letter topic(DeadLetterPublishingRecoverer)是在源topic名稱添加.DLT後綴,所以初始化時也會帶上對應後綴。
創建kafka消費者
@Component
public class KafakaConsumer {
/**
* 消費組testGroup,監聽KafkaProducer.TEST_TOPIC主題消息處理。
* @param record
* @param ack
*/
@KafkaListener(id = "testGroup", topics = {KafkaProducer.TEST_TOPIC})
public void receive(ConsumerRecord<String, String> record, Acknowledgment ack) {
KafkaMessage msg = JSON.parseObject(record.value(), KafkaMessage.class);
System.err.println("Receive:" + msg.getMessage() + ",partion:" + record.partition() + ", offset:" + record.offset());
if(record.partition()!=1){
//消息消費成功,手動反饋成功消費ack到kafka
ack.acknowledge();
}else{
//測試消息消費異常時,消息放入dead-letter死信隊列的情況,未反饋成功消費ack到kafka。
throw new RuntimeException("test consume msg error.");
}
}
/** 消費組testGroup1,監聽相同的KafkaProducer.TEST_TOPIC主題消息,模擬群發消息的監聽。
* 消費組
* @param record
* @param ack
*/
@KafkaListener(id = "testGroup1", topics = {KafkaProducer.TEST_TOPIC})
public void receive1(String record, Acknowledgment ack) {
KafkaMessage msg = JSON.parseObject(record, KafkaMessage.class);
System.err.println("Receive1:" + msg.getMessage());
ack.acknowledge();
}
/**
* 消費組testGroupDLT,監聽KafkaProducer.TEST_TOPIC+".DLT"主題,處理未正常消費的消息。
* @param record
* @param ack
*/
@KafkaListener(id = "testGroupDLT", topics = {KafkaProducer.TEST_TOPIC+".DLT"})
public void receiveDLT(ConsumerRecord<String, String> record, Acknowledgment ack) {
KafkaMessage msg = JSON.parseObject(record.value(), KafkaMessage.class);
System.err.println("Receive DLT topic:" + msg.getMessage() + ",partion:" + record.partition() + ", offset:" + record.offset());
ack.acknowledge();
}
}
spring cloud 入口類
@SpringBootApplication
@EnableDiscoveryClient
@EnableScheduling
@RestController
public class Application {
public static void main(String[] args) {
System.out.println("springcloud kafka demo start...");
SpringApplication.run(Application.class, args);
}
@RequestMapping("/ping")
public String home() {
return "pong";
}
}
測試驗證:
測試環境:window7 單機環境
打開cmd命令行工具,cd到kafka安裝目錄binwindows目錄下,執行zookeeper-server-start.bat ....configzookeeper.properties,運行kafka zookeeper程序。
新開cmd工具,cd到kafka安裝目錄binwindows下,執行kafka-server-start.bat ....configserver.properties,運行kafka broker程序。
idea啟動spring cloud入口Application類。
如上圖所示日誌信息:
send-消息發送成功:e910d522-adbb-4a04-878f-2f92427000d7
消息produce成功發送
Receive:content is 2028370d-3fa2-4496-991b-ab19a98e6187,partion:1, offset:225
消費組testGroup,監聽KafkaProducer.TEST_TOPIC主題消息處理
Receive1:content is e910d522-adbb-4a04-878f-2f92427000d7
消費組testGroup1,監聽相同的KafkaProducer.TEST_TOPIC主題消息,模擬群發消息的監聽。
Receive DLT topic:content is 2028370d-3fa2-4496-991b-ab19a98e6187,partion:1, offset:58
消費組testGroupDLT,監聽KafkaProducer.TEST_TOPIC+".DLT"主題,處理未正常消費的消息。