Spark-Streaming 和 Kafka 做實時計算需要注意的點
流式計算中最重要的消息的消費
當我們使用spark做准實時計算的時候,很大場景都是和kafka的通信,總結下spark使用kafka的注意事項,下面上代碼
1
package com.aura.bigdata.spark.scala.streaming.p1
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.JavaConversions
/**
* 使用的zookeeper來管理sparkdriver讀取的offset偏移量
* 將kafka對應的topic的offset保存到的路徑
*
* 約定,offset的保存到路徑
* /xxxxx/offsets/topic/group/partition/
* 0
* 1
* 2
*
* bigdata01:2181,bigdata02:2181,bigdata03:2181/kafka
*/
object _07SparkKafkaDriverHAZooKeeperOps {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.project-spark").setLevel(Level.WARN)
if(args == null || args.length < 4) {
println(
"""
|Parameter Errors! Usage: <batchInterval> <zkQuorum> <groupId> <topics>
|batchInterval : 批次間隔時間
|zkQuorum : zookeeper url地址
|groupId : 消費組的id
|topic : 讀取的topic
""".stripMargin)
System.exit(-1)
}
val Array(batchInterval, zkQuorum, group, topic) = args
val kafkaParams = Map[String, String](
"bootstrap.servers" -> "bigdata01:9092,bigdata02:9092,bigdata03:9092",
"auto.offset.reset"-> "smallest"
)
val conf = new SparkConf().setMaster("local[2]").setAppName("_06SparkKafkaDirectOps2")
def createFunc():StreamingContext = {
val ssc = new StreamingContext(conf, Seconds(batchInterval.toLong))
//讀取kafka的數據
val messages = createMessage(ssc, kafkaParams, topic, group)
//業務操作
messages.foreachRDD((rdd, bTime) => {
if(!rdd.isEmpty()) {
println("###########################->RDD count: " + rdd.count)
println("###########################->RDD count: " + bTime)
//所有的業務操作只能在這裡完成 這裡的處理邏輯和rdd的操作一模一樣
}
//處理完畢之後將偏移量保存回去
storeOffsets(rdd.asInstanceOf[HasOffsetRanges].offsetRanges, topic, group)
})
ssc
}
//開啟的高可用的方式 要從失敗中恢復過來
val ssc = StreamingContext.getActiveOrCreate(createFunc _)
ssc.start()
ssc.awaitTermination()
}
def storeOffsets(offsetRanges: Array[OffsetRange], topic: String, group: String): Unit = {
val zkTopicPath = s"/offsets/${topic}/${group}"
for (range <- offsetRanges) {//每一個range中都存儲了當前rdd中消費之後的偏移量
val path = s"${zkTopicPath}/${range.partition}"
ensureZKExists(path)
client.setData().forPath(path, (range.untilOffset + "").getBytes())
}
}
/*
* 約定,offset的保存到路徑 ----->zookeeper
* /xxxxx/offsets/topic/group/partition/
* 0
* 1
* 2
*/
def createMessage(ssc:StreamingContext, kafkaParams:Map[String, String], topic:String, group:String):InputDStream[(String, String)] = {
//從zookeeper中讀取對應的偏移量,返回值適應fromOffsets和flag(標誌位)
val (fromOffsets, flag) = getFromOffsets(topic, group)
var message:InputDStream[(String, String)] = null
if(!flag) {
//有數據-->zookeeper中是否保存了SparkStreaming程序消費kafka的偏移量信息
//處理第一次以外,從這個介面讀取kafka對應的數據
val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
message = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
} else {
//第一次讀取的時候
message = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic.split(",").toSet)
}
message
}
//從zookeeper中讀取kafka對應的offset --->
def getFromOffsets(topic:String, group:String): (Map[TopicAndPartition, Long], Boolean) = {
///xxxxx/offsets/topic/group/partition/
val zkTopicPath = s"/offsets/${topic}/${group}"
ensureZKExists(zkTopicPath)
//如果有直接讀取對應的數據
val offsets = for{p <- JavaConversions.asScalaBuffer(
client.getChildren.forPath(zkTopicPath))} yield {
// p --->分區所對應的值
val offset = client.getData.forPath(s"${zkTopicPath}/${p}")
(TopicAndPartition(topic, p.toInt), new String(offset).toLong)
}
if(!offsets.isEmpty) {
(offsets.toMap, false)
} else {
(offsets.toMap, true)
}
}
def ensureZKExists(zkTopicPath:String): Unit = {
if(client.checkExists().forPath(zkTopicPath) == null) {//zk中沒有沒寫過數據
client.create().creatingParentsIfNeeded().forPath(zkTopicPath)
}
}
val client = {//代碼塊編程 zk(servlet)--->Curator(SpringMVC/Struts2)
val client = CuratorFrameworkFactory.builder()
.namespace("mykafka")//命名空間就是目錄意思
.connectString("bigdata01:2181,bigdata02:2181,bigdata03:2181/kafka")
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build()
client.start()
client
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
總結
在spark的使用是特別注意使用kafka的時候要處理消息的偏移量。
---------------------
作者:梧桐聽夜雨
原文:https://blog.csdn.net/qq_16457097/article/details/85341057
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!
※eclipse使用ant + ivy 配置項目jar包和依賴關係
※非常震撼的純CSS3人物行走動畫
TAG:程序員小新人學習 |