當前位置:
首頁 > 知識 > Flume+Kafka+Storm+Hbase+HDSF+Poi整合

Flume+Kafka+Storm+Hbase+HDSF+Poi整合

需求:

針對一個網站,我們需要根據用戶的行為記錄日誌信息,分析對我們有用的數據。

舉例:這個網站www.hongten.com(當然這是一個我虛擬的電商網站),用戶在這個網站裡面可以有很多行為,比如註冊,登錄,查看,點擊,雙擊,購買東西,加入購物車,添加記錄,修改記錄,刪除記錄,評論,登出等一系列我們熟悉的操作。這些操作都被記錄在日誌信息裡面。我們要對日誌信息進行分析。

本文中,我們對購買東西和加入購物車兩個行為進行分析。然後生成相應的報表,這樣我們可以通過報表查看用戶在什麼時候喜歡購買東西,什麼時候喜歡加入購物車,從而,在相應的時間採取行動,激烈用戶購買東西,推薦商品給用戶加入購物車(加入購物車,這屬於潛在購買用戶)。

畢竟網站盈利才是我們希望達到的目的,對吧。

1.抽象用戶行為

// 用戶的action
public static final String[] USER_ACTION = { "Register", "Login", "View", "Click", "Double_Click", "Buy", "Shopping_Car", "Add", "Edit", "Delete", "Comment", "Logout" };

2.日誌格式定義

Flume+Kafka+Storm+Hbase+HDSF+Poi整合

115.19.62.102 海南 2018-12-20 1545286960749 1735787074662918890 www.hongten.com Edit
27.177.45.84 新疆 2018-12-20 1545286962255 6667636903937987930 www.hongten.com Delete
176.54.120.96 寧夏 2018-12-20 1545286962256 6988408478348165495 www.hongten.com Comment
175.117.33.187 遼寧 2018-12-20 1545286962257 8411202446705338969 www.hongten.com Shopping_Car
17.67.62.213 天津 2018-12-20 1545286962258 7787584752786413943 www.hongten.com Add
137.81.41.9 海南 2018-12-20 1545286962259 6218367085234099455 www.hongten.com Shopping_Car
125.187.107.57 山東 2018-12-20 1545286962260 3358658811146151155 www.hongten.com Double_Click
104.167.205.87 內蒙 2018-12-20 1545286962261 2303468282544965471 www.hongten.com Shopping_Car
64.106.149.83 河南 2018-12-20 1545286962262 8422202443986582525 www.hongten.com Delete
138.22.156.183 浙江 2018-12-20 1545286962263 7649154147863130337 www.hongten.com Shopping_Car
41.216.103.31 河北 2018-12-20 1545286962264 6785302169446728008 www.hongten.com Shopping_Car
132.144.93.20 廣東 2018-12-20 1545286962265 6444575166009004406 www.hongten.com Add

日誌格式:

//log fromat
String log = ip + " " + address + " " + d + " " + timestamp + " " + userid + " " + Common.WEB_SITE + " " + action;

3.系統架構

Flume+Kafka+Storm+Hbase+HDSF+Poi整合

註:9.讀取Hbase數據通過POI生成Excel Report

4.報表樣式

由於我採用的是隨機生成數據,所有,我們看到的結果呈現線性增長

這裡我只是實現了一個小時的報表,當然,也可以做一天,一個季度,全年,三年,五年的報表,可以根據實際需求實現即可。

Flume+Kafka+Storm+Hbase+HDSF+Poi整合

5.組件分布情況

我總共搭建了4個節點node1,node2,node3,node4(註: 4個節點上面都要有JDK)

Zookeeper安裝在node1,node2,nod3

Hadoop集群在node1,node2,nod3,node4

Hbase集群在node1,node2,nod3,node4

Flume安裝在node2

Kafka安裝在node1,node2,node3

Storm安裝在node1,node2,node3

Flume+Kafka+Storm+Hbase+HDSF+Poi整合

6.具體實現

6.1.配置Flume

--從node2
cd flumedir
vi flume2kafka
--node2配置如下
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = node2
a1.sources.r1.port = 41414
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = all_my_log
a1.sinks.k1.brokerList = node1:9092,node2:9092,node3:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 10000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
:wq

6.2.啟動Zookeeper

--關閉防火牆node1,node2,node3,node4
service iptables stop
--啟動Zookeeper,在node1,node2,node3
zkServer.sh start

6.3.啟動Kafka

--啟動kafka
--分別進入node1,node2,node3
cd /root/kafka/kafka_2.10-0.8.2.2
./start-kafka.sh

6.4.啟動Flume服務

--進入node2,啟動
cd /root/flumedir
flume-ng agent -n a1 -c conf -f flume2kafka -Dflume.root.logger=DEBUG,console

6.5.產生日誌信息並寫入到Flume

運行java 代碼,產生日誌信息並寫入到Flume伺服器

package com.b510.big.data.flume.client;
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.event.EventBuilder;
/**
* @author Hongten
*
* 功能: 模擬產生用戶日誌信息,並且向Flume發送數據
*/
public class FlumeClient {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new GenerateDataAndSend2Flume());
exec.shutdown();
}
}
class GenerateDataAndSend2Flume implements Runnable {
FlumeRPCClient flumeRPCClient;
static Random random = new Random();
GenerateDataAndSend2Flume() {
// 初始化RPC客戶端
flumeRPCClient = new FlumeRPCClient();
flumeRPCClient.init(Common.FLUME_HOST_NAME, Common.FLUME_PORT);
}
@Override
public void run() {
while (true) {
Date date = new Date();
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(Common.DATE_FORMAT_YYYYDDMM);
String d = simpleDateFormat.format(date);
Long timestamp = new Date().getTime();
// ip地址生成
String ip = random.nextInt(Common.MAX_IP_NUMBER) + "." + random.nextInt(Common.MAX_IP_NUMBER) + "." + random.nextInt(Common.MAX_IP_NUMBER) + "." + random.nextInt(Common.MAX_IP_NUMBER);
// ip地址對應的address(這裡是為了構造數據,並沒有按照真實的ip地址,找到對應的address)
String address = Common.ADDRESS[random.nextInt(Common.ADDRESS.length)];
Long userid = Math.abs(random.nextLong());
String action = Common.USER_ACTION[random.nextInt(Common.USER_ACTION.length)];
// 日誌信息構造
// example : 199.80.45.117 雲南 2018-12-20 1545285957720 3086250439781555145 www.hongten.com Buy
String data = ip + " " + address + " " + d + " " + timestamp + " " + userid + " " + Common.WEB_SITE + " " + action;
//System.out.println(data);
// 往Flume發送數據
flumeRPCClient.sendData2Flume(data);
try {
TimeUnit.MICROSECONDS.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
flumeRPCClient.cleanUp();
System.out.println("interrupted exception : " + e);
}
}
}
}
class FlumeRPCClient {
private RpcClient client;
private String hostname;
private int port;
public void init(String hostname, int port) {
this.hostname = hostname;
this.port = port;
this.client = getRpcClient(hostname, port);
}
public void sendData2Flume(String data) {
Event event = EventBuilder.withBody(data, Charset.forName(Common.CHAR_FORMAT));
try {
client.append(event);
} catch (EventDeliveryException e) {
cleanUp();
client = null;
client = getRpcClient(hostname, port);
}
}
public RpcClient getRpcClient(String hostname, int port) {
return RpcClientFactory.getDefaultInstance(hostname, port);
}
public void cleanUp() {
// Close the RPC connection
client.close();
}
}
// 所有的常量定義
class Common {
public static final String CHAR_FORMAT = "UTF-8";
public static final String DATE_FORMAT_YYYYDDMM = "yyyy-MM-dd";
// this is a test web site
public static final String WEB_SITE = "www.hongten.com";
// 用戶的action
public static final String[] USER_ACTION = { "Register", "Login", "View", "Click", "Double_Click", "Buy", "Shopping_Car", "Add", "Edit", "Delete", "Comment", "Logout" };
public static final int MAX_IP_NUMBER = 224;
// ip所對應的地址
public static String[] ADDRESS = { "北京", "天津", "上海", "廣東", "重慶", "河北", "山東", "河南", "雲南", "山西", "甘肅", "安徽", "福建", "黑龍江", "海南", "四川", "貴州", "寧夏", "新疆", "湖北", "湖南", "山西", "遼寧", "吉林", "江蘇", "浙江", "青海", "江西", "西藏", "內蒙", "廣西", "香港", "澳門", "台灣", };
// Flume conf
public static final String FLUME_HOST_NAME = "node2";
public static final int FLUME_PORT = 41414;
}

Flume+Kafka+Storm+Hbase+HDSF+Poi整合

6.6.監聽Kafka

--進入node3,啟動kafka消費者
cd /home/kafka-2.10/bin
./kafka-console-consumer.sh --zookeeper node1,node2,node3 --from-beginning --topic all_my_log

運行效果:

168.208.193.207 安徽 2018-12-20 1545287646527 5462770148222682599 www.hongten.com Login
103.143.79.127 新疆 2018-12-20 1545287646529 3389475301916412717 www.hongten.com Login
111.208.80.39 山東 2018-12-20 1545287646531 535601622597096753 www.hongten.com Shopping_Car
105.30.86.46 四川 2018-12-20 1545287646532 7825340079790811845 www.hongten.com Login
205.55.33.74 新疆 2018-12-20 1545287646533 4228838365367235561 www.hongten.com Logout
34.44.60.134 安徽 2018-12-20 1545287646536 702584874247456732 www.hongten.com Double_Click
154.169.15.145 廣東 2018-12-20 1545287646537 1683351753576425036 www.hongten.com View
126.28.192.28 湖南 2018-12-20 1545287646538 8319814684518483148 www.hongten.com Edit
5.140.156.73 台灣 2018-12-20 1545287646539 7432409906375230025 www.hongten.com Logout
72.175.210.95 西藏 2018-12-20 1545287646540 5233707593244910849 www.hongten.com View
121.25.190.25 廣西 2018-12-20 1545287646541 268200251881841673 www.hongten.com Buy

6.7.在Kafka創建Topic

--進入node1,創建一個topic:filtered_log
--設置3個partitions
--replication-factor=3
./kafka-topics.sh --zookeeper node1,node2,node3 --create --topic filtered_log --partitions 3 --replication-factor 3

6.8.Storm清洗數據

  • Storm從Kafka消費數據
  • Storm對數據進行篩選(Buy-已經購買,Shopping_Car-潛在購買)
  • Storm把篩選的數據放入到Kafka

package com.b510.big.data.storm.process;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import storm.kafka.bolt.KafkaBolt;
import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import storm.kafka.bolt.selector.DefaultTopicSelector;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class LogFilterTopology {
public static void main(String[] args) {
ZkHosts zkHosts = new ZkHosts(Common.ZOOKEEPER_QUORUM);
//Spout從"filtered_log" topic裡面獲取數據
SpoutConfig spoutConfig = new SpoutConfig(zkHosts, Common.ALL_MY_LOG_TOPIC, Common.ZOOKEEPER_ROOT, Common.ZOOKEEPER_ID);
List<String> zkServers = new ArrayList<>();
for (String host : zkHosts.brokerZkStr.split(",")) {
zkServers.add(host.split(":")[0]);
}
spoutConfig.zkServers = zkServers;
spoutConfig.zkPort = Common.ZOOKEEPER_PORT;
spoutConfig.forceFromStart = true;
spoutConfig.socketTimeoutMs = 60 * 60 * 1000;
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
// 創建KafkaSpout
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder builder = new TopologyBuilder();
// Storm從Kafka消費數據
builder.setSpout(Common.KAFKA_SPOUT, kafkaSpout, 3);
// Storm對數據進行篩選(Buy-已經購買,Shopping_Car-潛在購買)
builder.setBolt(Common.FILTER_BOLT, new FilterBolt(), 8).shuffleGrouping(Common.KAFKA_SPOUT);
// 創建KafkaBolt
@SuppressWarnings({ "unchecked", "rawtypes" })
KafkaBolt kafkaBolt = new KafkaBolt().withTopicSelector(new DefaultTopicSelector(Common.FILTERED_LOG_TOPIC)).withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
// Storm把篩選的數據放入到Kafka
builder.setBolt(Common.KAFKA_BOLT, kafkaBolt, 2).shuffleGrouping(Common.FILTER_BOLT);
Properties props = new Properties();
props.put("metadata.broker.list", Common.STORM_METADATA_BROKER_LIST);
props.put("request.required.acks", Common.STORM_REQUEST_REQUIRED_ACKS);
props.put("serializer.class", Common.STORM_SERILIZER_CLASS);
Config conf = new Config();
conf.put("kafka.broker.properties", props);
conf.put(Config.STORM_ZOOKEEPER_SERVERS, zkServers);
if (args == null || args.length == 0) {
// 本地方式運行
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("storm-kafka-topology", conf, builder.createTopology());
} else {
// 集群方式運行
conf.setNumWorkers(3);
try {
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException e) {
System.out.println("error : " + e);
}
}
}
}
class FilterBolt extends BaseBasicBolt {
private static final long serialVersionUID = 1L;
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String logStr = input.getString(0);
// 只針對我們感興趣的關鍵字進行過濾
// 這裡我們過濾包含"Buy", "Shopping_Car"的日誌信息
if (logStr.contains(Common.KEY_WORD_BUY) || logStr.contains(Common.KEY_WORD_SHOPPING_CAR)) {
collector.emit(new Values(logStr));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(FieldNameBasedTupleToKafkaMapper.BOLT_MESSAGE));
}
}
class Common {
public static final String ALL_MY_LOG_TOPIC = "all_my_log";
public static final String FILTERED_LOG_TOPIC = "filtered_log";

public static final String DATE_FORMAT_YYYYDDMMHHMMSS = "yyyyMMddHHmmss";
public static final String DATE_FORMAT_HHMMSS = "HHmmss";
public static final String DATE_FORMAT_HHMMSS_DEFAULT_VALUE = "000001";
public static final String HBASE_ZOOKEEPER_LIST = "node1:2888,node2:2888,node3:2888";
public static final int ZOOKEEPER_PORT = 2181;
public static final String ZOOKEEPER_QUORUM = "node1:" + ZOOKEEPER_PORT + ",node2:" + ZOOKEEPER_PORT + ",node3:" + ZOOKEEPER_PORT + "";
public static final String ZOOKEEPER_ROOT = "/MyKafka";
public static final String ZOOKEEPER_ID = "MyTrack";
public static final String KAFKA_SPOUT = "kafkaSpout";
public static final String FILTER_BOLT = "filterBolt";
public static final String PROCESS_BOLT = "processBolt";
public static final String HBASE_BOLT = "hbaseBolt";
public static final String KAFKA_BOLT = "kafkaBolt";
// Storm Conf
public static final String STORM_METADATA_BROKER_LIST = "node1:9092,node2:9092,node3:9092";
public static final String STORM_REQUEST_REQUIRED_ACKS = "1";
public static final String STORM_SERILIZER_CLASS = "kafka.serializer.StringEncoder";
// key word
public static final String KEY_WORD_BUY = "Buy";
public static final String KEY_WORD_SHOPPING_CAR = "Shopping_Car";

//hbase
public static final String TABLE_USER_ACTION = "t_user_actions";
public static final String COLUMN_FAMILY = "cf";
//間隔多少秒寫入Hbase一次
public static final int WRITE_RECORD_TO_TABLE_PER_SECOND = 1;
public static final int TABLE_MAX_VERSION = (60/WRITE_RECORD_TO_TABLE_PER_SECOND) * 60 * 24;
}

6.9.監聽Kafka

--進入node3,啟動kafka消費者
cd /home/kafka-2.10/bin
./kafka-console-consumer.sh --zookeeper node1,node2,node3 --from-beginning --topic filtered_log

效果:

Flume+Kafka+Storm+Hbase+HDSF+Poi整合

87.26.135.185 黑龍江 2018-12-20 1545290594658 7290881731606227972 www.hongten.com Shopping_Car
60.96.96.38 青海 2018-12-20 1545290594687 6935901257286057015 www.hongten.com Shopping_Car
43.159.110.193 江蘇 2018-12-20 1545290594727 7096698224110515553 www.hongten.com Shopping_Car
21.103.139.11 山西 2018-12-20 1545290594693 7805867078876194442 www.hongten.com Shopping_Car
139.51.213.184 廣東 2018-12-20 1545290594729 8048796865619113514 www.hongten.com Buy
58.213.148.89 河北 2018-12-20 1545290594708 5176551342435592748 www.hongten.com Buy
36.205.221.116 湖南 2018-12-20 1545290594715 4484717918039766421 www.hongten.com Shopping_Car
135.194.103.53 北京 2018-12-20 1545290594769 4833011508087432349 www.hongten.com Shopping_Car
180.21.100.66 貴州 2018-12-20 1545290594752 5270357330431599426 www.hongten.com Buy
167.71.65.70 山西 2018-12-20 1545290594790 275898530145861990 www.hongten.com Buy
125.51.21.199 寧夏 2018-12-20 1545290594814 3613499600574777198 www.hongten.com Buy

6.10.Storm再次消費Kafka數據處理後保存數據到Hbase

  • Storm再次從Kafka消費數據
  • Storm對數據進行統計(Buy-已經購買人數,Shopping_Car-潛在購買人數)
  • Storm將數據寫入到Hbase

package com.b510.big.data.storm.process;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
public class LogProcessTopology {
public static void main(String[] args) {
ZkHosts zkHosts = new ZkHosts(Common.ZOOKEEPER_QUORUM);
//Spout從"filtered_log" topic裡面獲取數據
SpoutConfig spoutConfig = new SpoutConfig(zkHosts, Common.FILTERED_LOG_TOPIC, Common.ZOOKEEPER_ROOT, Common.ZOOKEEPER_ID);
List<String> zkServers = new ArrayList<>();
for (String host : zkHosts.brokerZkStr.split(",")) {
zkServers.add(host.split(":")[0]);
}
spoutConfig.zkServers = zkServers;
spoutConfig.zkPort = Common.ZOOKEEPER_PORT;
spoutConfig.forceFromStart = true;
spoutConfig.socketTimeoutMs = 60 * 60 * 1000;
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
// 創建KafkaSpout
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder builder = new TopologyBuilder();
// Storm再次從Kafka消費數據
builder.setSpout(Common.KAFKA_SPOUT, kafkaSpout, 3);
// Storm對數據進行統計(Buy-已經購買人數,Shopping_Car-潛在購買人數)
builder.setBolt(Common.PROCESS_BOLT, new ProcessBolt(), 3).shuffleGrouping(Common.KAFKA_SPOUT);
// Storm將數據寫入到Hbase
builder.setBolt(Common.HBASE_BOLT, new HbaseBolt(), 3).shuffleGrouping(Common.PROCESS_BOLT);
Properties props = new Properties();
props.put("metadata.broker.list", Common.STORM_METADATA_BROKER_LIST);
props.put("request.required.acks", Common.STORM_REQUEST_REQUIRED_ACKS);
props.put("serializer.class", Common.STORM_SERILIZER_CLASS);
Config conf = new Config();
conf.put("kafka.broker.properties", props);
conf.put(Config.STORM_ZOOKEEPER_SERVERS, zkServers);
if (args == null || args.length == 0) {
// 本地方式運行
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("storm-kafka-topology", conf, builder.createTopology());
} else {
// 集群方式運行
conf.setNumWorkers(3);
try {
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} catch (AlreadyAliveException | InvalidTopologyException e) {
System.out.println("error : " + e);
}
}

}
}
class ProcessBolt extends BaseBasicBolt {
private static final long serialVersionUID = 1L;
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String logStr = input.getString(0);
if (logStr != null) {
String infos[] = logStr.split("\t");
//180.21.100.66 貴州 2018-12-20 1545290594752 5270357330431599426 www.hongten.com Buy
collector.emit(new Values(infos[2], infos[6]));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("date", "user_action"));
}
}
class HbaseBolt implements IBasicBolt {
private static final long serialVersionUID = 1L;
HBaseDAO hBaseDAO = null;

SimpleDateFormat simpleDateFormat = null;
SimpleDateFormat simpleDateFormatHHMMSS = null;

int userBuyCount = 0;
int userShoopingCarCount = 0;

//這裡要考慮避免頻繁寫入數據到hbase
int writeToHbaseMaxNum = Common.WRITE_RECORD_TO_TABLE_PER_SECOND * 1000;
long begin = System.currentTimeMillis();
long end = 0;

@SuppressWarnings("rawtypes")
@Override
public void prepare(Map map, TopologyContext context) {
hBaseDAO = new HBaseDAOImpl();
simpleDateFormat = new SimpleDateFormat(Common.DATE_FORMAT_YYYYDDMMHHMMSS);
simpleDateFormatHHMMSS = new SimpleDateFormat(Common.DATE_FORMAT_HHMMSS);
hBaseDAO.createTable(Common.TABLE_USER_ACTION, new String[]{Common.COLUMN_FAMILY}, Common.TABLE_MAX_VERSION);
}
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
// 如果時間是第二天的凌晨1s
// 需要對count做清零處理
//不過這裡的判斷不是很準確,因為在此時,可能前一天的數據還沒有處理完
if (simpleDateFormatHHMMSS.format(new Date()).equals(Common.DATE_FORMAT_HHMMSS_DEFAULT_VALUE)) {
userBuyCount = 0;
userShoopingCarCount = 0;
}

if (input != null) {
// base one ProcessBolt.declareOutputFields()
String date = input.getString(0);
String userAction = input.getString(1);
if (userAction.equals(Common.KEY_WORD_BUY)) {
//同一個user在一天之內可以重複"Buy"動作
userBuyCount++;
}
if (userAction.equals(Common.KEY_WORD_SHOPPING_CAR)) {
userShoopingCarCount++;
}
end = System.currentTimeMillis();
if ((end - begin) > writeToHbaseMaxNum) {
System.out.println("hbase_key: " + Common.KEY_WORD_BUY + "_" + date + " , userBuyCount: " + userBuyCount + ", userShoopingCarCount :" + userShoopingCarCount);

//往hbase中寫入數據
String quailifer = simpleDateFormat.format(new Date());
hBaseDAO.insert(Common.TABLE_USER_ACTION ,
Common.KEY_WORD_BUY + "_" + date,
Common.COLUMN_FAMILY,
new String[] { quailifer },
new String[] { "{user_buy_count:" + userBuyCount + "}" }
);
hBaseDAO.insert(Common.TABLE_USER_ACTION ,
Common.KEY_WORD_SHOPPING_CAR + "_" + date,
Common.COLUMN_FAMILY,
new String[] { quailifer },
new String[] { "{user_shopping_car_count:" + userShoopingCarCount + "}" }
);
begin = System.currentTimeMillis();
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
@Override
public void cleanup() {
}
}
interface HBaseDAO {
public void createTable(String tableName, String[] columnFamilys, int maxVersion);
public void insert(String tableName, String rowKey, String family, String quailifer[], String value[]);
}
class HBaseDAOImpl implements HBaseDAO {
HConnection hConnection = null;
static Configuration conf = null;
public HBaseDAOImpl() {
conf = new Configuration();
conf.set("hbase.zookeeper.quorum", Common.HBASE_ZOOKEEPER_LIST);
try {
hConnection = HConnectionManager.createConnection(conf);
} catch (IOException e) {
e.printStackTrace();
}
}

public void createTable(String tableName, String[] columnFamilys, int maxVersion) {
try {
HBaseAdmin admin = new HBaseAdmin(conf);
if (admin.tableExists(tableName)) {
System.err.println("table existing in hbase.");
} else {
HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tableName));
for (String columnFamily : columnFamilys) {
HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(columnFamily);
hColumnDescriptor.setMaxVersions(maxVersion);
tableDesc.addFamily(hColumnDescriptor);
}
admin.createTable(tableDesc);
System.err.println("table is created.");
}
admin.close();
} catch (Exception e) {
e.printStackTrace();
}
}

@Override
public void insert(String tableName, String rowKey, String family, String quailifer[], String value[]) {
HTableInterface table = null;
try {
table = hConnection.getTable(tableName);
Put put = new Put(rowKey.getBytes());
for (int i = 0; i < quailifer.length; i++) {
String col = quailifer[i];
String val = value[i];
put.add(family.getBytes(), col.getBytes(), val.getBytes());
}
table.put(put);
System.err.println("save record successfuly.");
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
table.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

Storm處理邏輯:

1.每秒向Hbase寫入數據

2.明天凌晨會重置數據

如果,我們一直運行上面的程序,那麼,系統就會一直往Hbase裡面寫入數據,那麼這樣,我們就可以採集到我們生成報表的數據了。

那麼下面就是報表實現

6.11.讀取Hbase數據通過POI生成Excel Report

  • 讀取Hbase數據
  • 通過POI生成Excel報表

package com.b510.big.data.poi;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Result;
import org.apache.poi.xssf.usermodel.XSSFCell;
import org.apache.poi.xssf.usermodel.XSSFSheet;
import org.apache.poi.xssf.usermodel.XSSFWorkbook;
public class ReportUtil {
public static void main(String[] args) throws Exception {
String year = "2018";
String month = "12";
String day = "21";
String hour = "14";
generateReport(year, month, day, hour);
}
private static void generateReport(String year, String month, String day, String hour) {
HBaseDAO hBaseDAO = new HBaseDAOImpl();
// format: yyyyMMddHH
String begin = year + month + day + hour;
String[] split = generateQuailifers(begin);
List<Integer> userBuyCountList = getData(hBaseDAO, year, month, day, split, Common.KEY_WORD_BUY);
List<Integer> userShoppingCarCountList = getData(hBaseDAO, year, month, day, split, Common.KEY_WORD_SHOPPING_CAR);
//System.err.println(userBuyCountList.size());
//System.err.println(userShoppingCarCountList.size());
writeExcel(year, month, day, hour, userBuyCountList, userShoppingCarCountList);
}
private static void writeExcel(String year, String month, String day, String hour, List<Integer> userBuyCountList, List<Integer> userShoppingCarCountList) {
try {
File file = new File(Common.REPORT_TEMPLATE);
InputStream in = new FileInputStream(file);
XSSFWorkbook wb = new XSSFWorkbook(in);
XSSFSheet sheet = wb.getSheetAt(0);
if (sheet != null) {
XSSFCell cell = null;
cell = sheet.getRow(0).getCell(0);
cell.setCellValue("One Hour Report-" + year + "-" + month + "-" + day + " From " + hour + ":00 To " + hour + ":59");
putData(userBuyCountList, sheet, 3);
putData(userShoppingCarCountList, sheet, 7);
FileOutputStream out = new FileOutputStream(Common.REPORT_ONE_HOUR);
wb.write(out);
out.close();
System.err.println("done.");
}
} catch (Exception e) {
System.err.println("Exception" + e);
}
}
private static void putData(List<Integer> userBuyCountList, XSSFSheet sheet, int rowNum) {
XSSFCell cell;
if (userBuyCountList != null && userBuyCountList.size() > 0) {
for (int i = 0; i < userBuyCountList.size(); i++) {
cell = sheet.getRow(rowNum).getCell(i + 1);
cell.setCellValue(userBuyCountList.get(i));
}
}
}
private static List<Integer> getData(HBaseDAO hBaseDAO, String year, String month, String day, String[] split, String preKey) {
List<Integer> list = new ArrayList<Integer>();
Result rs = hBaseDAO.getOneRowAndMultiColumn(Common.TABLE_USER_ACTION, preKey + "_" + year + "-" + month + "-" + day, split);
for (Cell cell : rs.rawCells()) {
String value = new String(CellUtil.cloneValue(cell)).split(":")[1].trim();
value = value.substring(0, value.length() - 1);
list.add(Integer.valueOf(value));
}
return list;
}
private static String[] generateQuailifers(String begin) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 60;) {
if (i == 0 || i == 5) {
sb.append(begin).append("0").append(i).append("00").append(",");
} else {
sb.append(begin).append(i).append("00").append(",");
}
i = i + 5;
}
sb.append(begin).append("5959");
String sbStr = sb.toString();
String[] split = sbStr.split(",");
return split;
}
}
interface HBaseDAO {
Result getOneRowAndMultiColumn(String tableName, String rowKey, String[] cols);
}
class HBaseDAOImpl implements HBaseDAO {
HConnection hConnection = null;
static Configuration conf = null;
public HBaseDAOImpl() {
conf = new Configuration();
conf.set("hbase.zookeeper.quorum", Common.HBASE_ZOOKEEPER_LIST);
try {
hConnection = HConnectionManager.createConnection(conf);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public Result getOneRowAndMultiColumn(String tableName, String rowKey, String[] cols) {
HTableInterface table = null;
Result rsResult = null;
try {
table = hConnection.getTable(tableName);
Get get = new Get(rowKey.getBytes());
for (int i = 0; i < cols.length; i++) {
get.addColumn(Common.COLUMN_FAMILY.getBytes(), cols[i].getBytes());
}
rsResult = table.get(get);
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
table.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return rsResult;
}
}
class Common {
// report
public static final String REPORT_TEMPLATE = "./resources/report.xlsx";
public static final String REPORT_ONE_HOUR = "./resources/one_report.xlsx";
public static final String DATE_FORMAT_YYYYDDMMHHMMSS = "yyyyMMddHHmmss";
public static final String HBASE_ZOOKEEPER_LIST = "node1:2888,node2:2888,node3:2888";
// key word
public static final String KEY_WORD_BUY = "Buy";
public static final String KEY_WORD_SHOPPING_CAR = "Shopping_Car";
// hbase
public static final String TABLE_USER_ACTION = "t_user_actions";
public static final String COLUMN_FAMILY = "cf";
}

7.源碼下載

Source Code:Flume_Kafka_Storm_Hbase_Hdfs_Poi_src.zip

相應的Jar文件,由於so big,自己根據import *信息加入。

8.總結

學習Big Data一段時間了,通過自己的學習和摸索,實現自己想要的應用,還是很有成就感的哈....當然,踩地雷也是一種不錯的體驗...:)

作者:Hongten

原文:https://www.cnblogs.com/hongten/p/hongten_flume_kafka_storm_hbase_hdfs_poi.html

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 程序員小新人學習 的精彩文章:

Crunch團隊分享SpringCloud微服務的使用經驗
Vim編輯器使用方法詳解

TAG:程序員小新人學習 |