當前位置:
首頁 > 知識 > Spark實戰之讀寫HBase

Spark實戰之讀寫HBase


1 配置

1.1 開發環境:

  • HBase:hbase-1.0.0-cdh5.4.5.tar.gz
  • Hadoop:hadoop-2.6.0-cdh5.4.5.tar.gz
  • ZooKeeper:zookeeper-3.4.5-cdh5.4.5.tar.gz
  • Spark:spark-2.1.0-bin-hadoop2.6

1.2 Spark的配置

  • Jar包:需要HBase的Jar如下(經過測試,正常運行,但是是否存在冗餘的Jar並未證實,若發現多餘的jar可自行進行刪除)

Spark實戰之讀寫HBase

  • spark-env.sh
    添加以下配置:export SPARK_CLASSPATH=/home/hadoop/data/lib1/*
    註:如果使用spark-shell的yarn模式進行測試的話,那麼最好每個NodeManager節點都有配置jars和hbase-site.xml
  • spark-default.sh

    spark.yarn.historyServer.address=slave11:18080 spark.history.ui.port=18080 spark.eventLog.enabled=true spark.eventLog.dir=hdfs:///tmp/spark/events spark.history.fs.logDirectory=hdfs:///tmp/spark/events spark.driver.memory=1g spark.serializer=org.apache.spark.serializer.KryoSerializer

1.3 數據

2 代碼演示

2.1 準備動作

1)既然是與HBase相關,那麼首先需要使用hbase shell來創建一個表

創建表格:create 『data』,』v』,create 『data1』,』v』

2)使用spark-shell進行操作,命令如下:


bin/spark-shell --master yarn --deploy-mode client --num-executors 5 --executor-memory 1g --executor-cores 2

Spark實戰之讀寫HBase

import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64,Bytes}
import org.apache.hadoop.hbase.KeyValue
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
import org.apache.hadoop.hbase.HColumnDescriptor
import org.apache.commons.codec.digest.DigestUtils

2.2 代碼實戰

創建conf和table

val conf= HBaseConfiguration.create
conf.set(TableInputFormat.INPUT_TABLE,"data1")
val table = new HTable(conf,"data1")

2.2.1 數據寫入

格式:

val put = new Put(Bytes.toBytes("rowKey"))
put.add("cf","q","value")

使用for來插入5條數據

for(i <- 1 to 5){ var put= new Put(Bytes.toBytes("row"+i));put.add(Bytes.toBytes("v"),Bytes.toBytes("value"),Bytes.toBytes("value"+i));table.put(put)}

到hbase shell中查看結果

2.2.2 數據讀取

val hbaseRdd = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])

1)take

hbaseRdd take 1

2)scan

var scan = new Scan;
scan.addFamily(Bytes.toBytes(「v」));
var proto = ProtobufUtil.toScan(scan)
var scanToString = Base64.encodeBytes(proto.toByteArray);
conf.set(TableInputFormat.SCAN,scanToString)

val datas = hbaseRdd.map( x=>x._2).map{result => (result.getRow,result.getValue(Bytes.toBytes("v"),Bytes.toBytes("value")))}.map(row => (new String(row._1),new String(row._2))).collect.foreach(r => (println(r._1+":"+r._2)))

2.3 批量插入2.3.1 普通插入

1)代碼

val rdd = sc.textFile("/data/produce/2015/2015-03-01.log")
val data = rdd.map(_.split("@")).map{x=>(x(0)+x(1),x(2))}
val result = data.foreachPartition{x => {val conf= HBaseConfiguration.create;conf.set(TableInputFormat.INPUT_TABLE,"data");conf.set("hbase.zookeeper.quorum","slave5,slave6,slave7");conf.set("hbase.zookeeper.property.clientPort","2181");conf.addResource("/home/hadoop/data/lib/hbase-site.xml");val table = new HTable(conf,"data");table.setAutoFlush(false,false);table.setWriteBufferSize(3*1024*1024); x.foreach{y => {
var put= new Put(Bytes.toBytes(y._1));put.add(Bytes.toBytes("v"),Bytes.toBytes("value"),Bytes.toBytes(y._2));table.put(put)};table.flushCommits}}}

2)執行時間如下:7.6 min

2.3.2 Bulkload

1) 代碼:

val conf = HBaseConfiguration.create;
val tableName = "data1"
val table = new HTable(conf,tableName)
conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)

lazy val job = Job.getInstance(conf)
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[KeyValue])
HFileOutputFormat.configureIncrementalLoad(job,table)

val rdd = sc.textFile("/data/produce/2015/2015-03-01.log").map(_.split("@")).map{x => (DigestUtils.md5Hex(x(0)+x(1)).substring(0,3)+x(0)+x(1),x(2))}.sortBy(x =>x._1).map{x=>{val kv:KeyValue = new KeyValue(Bytes.toBytes(x._1),Bytes.toBytes("v"),Bytes.toBytes("value"),Bytes.toBytes(x._2+""));(new ImmutableBytesWritable(kv.getKey),kv)}}

rdd.saveAsNewAPIHadoopFile("/tmp/data1",classOf[ImmutableBytesWritable],classOf[KeyValue],classOf[HFileOutputFormat],job.getConfiguration)
val bulkLoader = new LoadIncrementalHFiles(conf)
bulkLoader.doBulkLoad(new Path("/tmp/data1"),table)

2) 執行時間:7s

3)執行結果:

到hbase shell 中查看 list 「data1」

Spark實戰之讀寫HBase

通過對比我們可以發現bulkload批量導入所用時間遠遠少於普通導入,速度提升了60多倍,當然我沒有使用更大的數據量測試,但是我相信導入速度的提升是非常顯著的,強烈建議使用BulkLoad批量導入數據到HBase中。

關於Spark與Hbase之間操作就寫到這裡,如果有什麼地方寫得不對或者運行不了,歡迎指出,謝謝

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 科技優家 的精彩文章:

從零實現一個高性能網路爬蟲(二)應對反爬蟲之前端數據混淆
基於TypeScript的FineUIMvc組件式開發
javaScript 設計模式系列之一:觀察者模式
Swift 中 String 取下標及性能問題
.Net Core中使用ref和Span提高程序性能

TAG:科技優家 |

您可能感興趣

Mysql8.0主從搭建,shardingsphere+springboot+mybatis讀寫分離
SpringBoot 玩轉讀寫分離
python後台架構Django——連接讀寫mysql資料庫
提升Hive操作Amazon S3讀寫數據的性能
用fread和fwrite讀寫文件
Multi Actuator多讀寫臂技術,新的性能突破
mysql+mycat實現主從複製、讀寫分離
文石BOOX Note Pro和Nova Pro電紙書正式發布:讀寫全能帶前光
索尼推出Ultra-Tough系列三防SSD移動硬碟:最高讀寫速度1000MB/s
MapReduce數據序列化讀寫概念淺析!
DuangDuangDuang,黨禺書法藝術叢書《讀寫經典》正式上線了!
Python讀寫Excel表格,就是這麼簡單粗暴又好用
Mushkin推出新款M.2 SSD:主打性價比 連續讀寫500MB/s
希捷提出Multi Actuator技術,以多一組讀寫臂的方式增加存取效率
Realtek首發SD 7.0主控:最大容量128TB、讀寫1GB/s
Realtek首發SD 7.0主控:最大容量128TB、讀寫1GB/s
威剛發布512GB microSD存儲卡:讀寫速度不俗
解決Mac無法寫入U盤問題:無需安裝軟體,讓Mac支持NTFS讀寫
Intel最強消費級SSD將發布,讀寫速度出色
威剛將展示USB-C介面移動SSD:讀寫均破1GB/s