當前位置:
首頁 > 知識 > Spark-Streaming 和 Kafka 做實時計算需要注意的點

Spark-Streaming 和 Kafka 做實時計算需要注意的點

流式計算中最重要的消息的消費

當我們使用spark做准實時計算的時候,很大場景都是和kafka的通信,總結下spark使用kafka的注意事項,下面上代碼

1

package com.aura.bigdata.spark.scala.streaming.p1

import kafka.common.TopicAndPartition

import kafka.message.MessageAndMetadata

import kafka.serializer.StringDecoder

import org.apache.curator.framework.CuratorFrameworkFactory

import org.apache.curator.retry.ExponentialBackoffRetry

import org.apache.log4j.{Level, Logger}

import org.apache.spark.SparkConf

import org.apache.spark.streaming.dstream.InputDStream

import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}

import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.JavaConversions

/**

* 使用的zookeeper來管理sparkdriver讀取的offset偏移量

* 將kafka對應的topic的offset保存到的路徑

*

* 約定,offset的保存到路徑

* /xxxxx/offsets/topic/group/partition/

* 0

* 1

* 2

*

* bigdata01:2181,bigdata02:2181,bigdata03:2181/kafka

*/

object _07SparkKafkaDriverHAZooKeeperOps {

def main(args: Array[String]): Unit = {

Logger.getLogger("org.apache.hadoop").setLevel(Level.WARN)

Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

Logger.getLogger("org.project-spark").setLevel(Level.WARN)

if(args == null || args.length < 4) {

println(

"""

|Parameter Errors! Usage: <batchInterval> <zkQuorum> <groupId> <topics>

|batchInterval : 批次間隔時間

|zkQuorum : zookeeper url地址

|groupId : 消費組的id

|topic : 讀取的topic

""".stripMargin)

System.exit(-1)

}

val Array(batchInterval, zkQuorum, group, topic) = args

val kafkaParams = Map[String, String](

"bootstrap.servers" -> "bigdata01:9092,bigdata02:9092,bigdata03:9092",

"auto.offset.reset"-> "smallest"

)

val conf = new SparkConf().setMaster("local[2]").setAppName("_06SparkKafkaDirectOps2")

def createFunc():StreamingContext = {

val ssc = new StreamingContext(conf, Seconds(batchInterval.toLong))

//讀取kafka的數據

val messages = createMessage(ssc, kafkaParams, topic, group)

//業務操作

messages.foreachRDD((rdd, bTime) => {

if(!rdd.isEmpty()) {

println("###########################->RDD count: " + rdd.count)

println("###########################->RDD count: " + bTime)

//所有的業務操作只能在這裡完成 這裡的處理邏輯和rdd的操作一模一樣

}

//處理完畢之後將偏移量保存回去

storeOffsets(rdd.asInstanceOf[HasOffsetRanges].offsetRanges, topic, group)

})

ssc

}

//開啟的高可用的方式 要從失敗中恢復過來

val ssc = StreamingContext.getActiveOrCreate(createFunc _)

ssc.start()

ssc.awaitTermination()

}

def storeOffsets(offsetRanges: Array[OffsetRange], topic: String, group: String): Unit = {

val zkTopicPath = s"/offsets/${topic}/${group}"

for (range <- offsetRanges) {//每一個range中都存儲了當前rdd中消費之後的偏移量

val path = s"${zkTopicPath}/${range.partition}"

ensureZKExists(path)

client.setData().forPath(path, (range.untilOffset + "").getBytes())

}

}

/*

* 約定,offset的保存到路徑 ----->zookeeper

* /xxxxx/offsets/topic/group/partition/

* 0

* 1

* 2

*/

def createMessage(ssc:StreamingContext, kafkaParams:Map[String, String], topic:String, group:String):InputDStream[(String, String)] = {

//從zookeeper中讀取對應的偏移量,返回值適應fromOffsets和flag(標誌位)

val (fromOffsets, flag) = getFromOffsets(topic, group)

var message:InputDStream[(String, String)] = null

if(!flag) {

//有數據-->zookeeper中是否保存了SparkStreaming程序消費kafka的偏移量信息

//處理第一次以外,從這個介面讀取kafka對應的數據

val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)

message = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)

} else {

//第一次讀取的時候

message = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic.split(",").toSet)

}

message

}

//從zookeeper中讀取kafka對應的offset --->

def getFromOffsets(topic:String, group:String): (Map[TopicAndPartition, Long], Boolean) = {

///xxxxx/offsets/topic/group/partition/

val zkTopicPath = s"/offsets/${topic}/${group}"

ensureZKExists(zkTopicPath)

//如果有直接讀取對應的數據

val offsets = for{p <- JavaConversions.asScalaBuffer(

client.getChildren.forPath(zkTopicPath))} yield {

// p --->分區所對應的值

val offset = client.getData.forPath(s"${zkTopicPath}/${p}")

(TopicAndPartition(topic, p.toInt), new String(offset).toLong)

}

if(!offsets.isEmpty) {

(offsets.toMap, false)

} else {

(offsets.toMap, true)

}

}

def ensureZKExists(zkTopicPath:String): Unit = {

if(client.checkExists().forPath(zkTopicPath) == null) {//zk中沒有沒寫過數據

client.create().creatingParentsIfNeeded().forPath(zkTopicPath)

}

}

val client = {//代碼塊編程 zk(servlet)--->Curator(SpringMVC/Struts2)

val client = CuratorFrameworkFactory.builder()

.namespace("mykafka")//命名空間就是目錄意思

.connectString("bigdata01:2181,bigdata02:2181,bigdata03:2181/kafka")

.retryPolicy(new ExponentialBackoffRetry(1000, 3))

.build()

client.start()

client

}

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

總結

在spark的使用是特別注意使用kafka的時候要處理消息的偏移量。

---------------------

作者:梧桐聽夜雨

原文:https://blog.csdn.net/qq_16457097/article/details/85341057

版權聲明:本文為博主原創文章,轉載請附上博文鏈接!

Spark-Streaming 和 Kafka 做實時計算需要注意的點

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 程序員小新人學習 的精彩文章:

eclipse使用ant + ivy 配置項目jar包和依賴關係
非常震撼的純CSS3人物行走動畫

TAG:程序員小新人學習 |