當前位置:
首頁 > 知識 > Carbondata源碼系列(一)文件生成過程

Carbondata源碼系列(一)文件生成過程

在滴滴的兩年一直在加班,人也變懶了,就很少再寫博客了,最近在進行Carbondata和hive集成方面的工作,於是乎需要對Carbondata進行深入的研究。

於是新開一個系列,記錄自己學習Carbondata的點點滴滴,希望對大家也有所幫助。


1、環境準備

當前版本是1.2.0-SNAPSHOT

git clone https://github.com/apache/carbondata.git

先用IDEA打開carbondata的代碼,點擊上方的View -> Tool Windows -> Maven Projects, 先勾選一下需要的profile和編譯format工程,如下圖所示:

Carbondata源碼系列(一)文件生成過程


2、探尋代碼入口

我們先打開入口類CarbonDataFrameWriter,找到writeToCarbonFile這個方法

private def writeToCarbonFile(parameters: Map[String, String] = Map): Unit = {
val options = new CarbonOption(parameters)
val cc = CarbonContext.getInstance(dataFrame.sqlContext.sparkContext)
if (options.tempCSV) {
loadTempCSV(options, cc)
} else {
loadDataFrame(options, cc)
}
}

它有兩個方式,loadTempCSV和loadDataFrame。

loadTempCSV是先生成CSV文件,再調用LOAD DATA INPATH...的命令導入數據。

這裡我們之研究loadDataFrame這種直接生成數據的方式。

一路點進去,目標落在carbonTableSchema的LoadTable的run方法里,接著就是洋洋洒洒的二百行的set代碼。它是核心其實是構造一個CarbonLoadModel類。

val carbonLoadModel = new CarbonLoadModel
carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
carbonLoadModel.setStorePath(relation.tableMeta.storePath)

val table = relation.tableMeta.carbonTable
carbonLoadModel.setAggTables(table.getAggregateTablesName.asScala.toArray)
carbonLoadModel.setTableName(table.getFactTableName)
val dataLoadSchema = new CarbonDataLoadSchema(table)
// Need to fill dimension relation
carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)

這些代碼為了Load一個文本文件準備的,如果是用dataframe的方式則不需要看了。直接略過,直接調到if (carbonLoadModel.getUseOnePass)這一句。

這個跟字典的生成方式有關,這個值默認是false,先忽略true的過程吧,看主流程就行,下面這哥倆才是我們要找的。

// 生成字典文件
GlobalDictionaryUtil
.generateGlobalDictionary(
sparkSession.sqlContext,
carbonLoadModel,
relation.tableMeta.storePath,
dictionaryDataFrame)
// 生成數據文件
CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext, carbonLoadModel, relation.tableMeta.storePath, columnar, partitionStatus, None, loadDataFrame, updateModel)

3、欄位生成過程

先看GlobalDictionaryUtil.generateGlobalDictionary方法

if (StringUtils.isEmpty(allDictionaryPath)) {
LOGGER.info("Generate global dictionary from source data files!")
// load data by using dataSource com.databricks.spark.csv
var df = dataFrame.getOrElse(loadDataFrame(sqlContext, carbonLoadModel))
var headers = carbonLoadModel.getCsvHeaderColumns
headers = headers.map(headerName => headerName.trim)
val colDictFilePath = carbonLoadModel.getColDictFilePath
if (colDictFilePath != null) {
// generate predefined dictionary
generatePredefinedColDictionary(colDictFilePath, carbonTableIdentifier,
dimensions, carbonLoadModel, sqlContext, storePath, dictfolderPath)
}
if (headers.length > df.columns.length) {
val msg = "The number of columns in the file header do not match the " +
"number of columns in the data file; Either delimiter " +
"or fileheader provided is not correct"
LOGGER.error(msg)
throw new DataLoadingException(msg)
}
// use fact file to generate global dict
val (requireDimension, requireColumnNames) = pruneDimensions(dimensions,
headers, df.columns)
if (requireDimension.nonEmpty) {
// select column to push down pruning
df = df.select(requireColumnNames.head, requireColumnNames.tail: _*)
val model = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier,
requireDimension, storePath, dictfolderPath, false)
// combine distinct value in a block and partition by column
val inputRDD = new CarbonBlockDistinctValuesCombineRDD(df.rdd, model)
.partitionBy(new ColumnPartitioner(model.primDimensions.length))
// generate global dictionary files
val statusList = new CarbonGlobalDictionaryGenerateRDD(inputRDD, model).collect
// check result status
checkStatus(carbonLoadModel, sqlContext, model, statusList)
} else {
LOGGER.info("No column found for generating global dictionary in source data files")
}
} else {
generateDictionaryFromDictionaryFiles(sqlContext,
carbonLoadModel,
storePath,
carbonTableIdentifier,
dictfolderPath,
dimensions,
allDictionaryPath)
}

View Code

包含了兩種情況:不存在字典文件和已存在欄位文件。

先看不存在的情況

// use fact file to generate global dict
val (requireDimension, requireColumnNames) = pruneDimensions(dimensions,
headers, df.columns)
if (requireDimension.nonEmpty) {
// 只選取標記為字典的維度列
df = df.select(requireColumnNames.head, requireColumnNames.tail: _*)
val model = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier,
requireDimension, storePath, dictfolderPath, false)
// 去重之後按列分區
val inputRDD = new CarbonBlockDistinctValuesCombineRDD(df.rdd, model)
.partitionBy(new ColumnPartitioner(model.primDimensions.length))
// 生成全局欄位文件
val statusList = new CarbonGlobalDictionaryGenerateRDD(inputRDD, model).collect
// check result status
checkStatus(carbonLoadModel, sqlContext, model, statusList)
} else {
LOGGER.info("No column found for generating global dictionary in source data files")
}

先從源文件當中讀取所有維度列,去重之後按列分區,然後輸出,具體輸出的過程請看CarbonGlobalDictionaryGenerateRDD的internalCompute方法。

val dictWriteTask = new DictionaryWriterTask(valuesBuffer,
dictionaryForDistinctValueLookUp,
model.table,
model.columnIdentifier(split.index),
model.hdfsLocation,
model.primDimensions(split.index).getColumnSchema,
model.dictFileExists(split.index)
)
// execute dictionary writer task to get distinct values
val distinctValues = dictWriteTask.execute
val dictWriteTime = System.currentTimeMillis - t3
val t4 = System.currentTimeMillis
// if new data came than rewrite sort index file
if (distinctValues.size > 0) {
val sortIndexWriteTask = new SortIndexWriterTask(model.table,
model.columnIdentifier(split.index),
model.primDimensions(split.index).getDataType,
model.hdfsLocation,
dictionaryForDistinctValueLookUp,
distinctValues)
sortIndexWriteTask.execute
}
val sortIndexWriteTime = System.currentTimeMillis - t4
CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordDicShuffleAndWriteTime
// After sortIndex writing, update dictionaryMeta
dictWriteTask.updateMetaData

View Code

字典文件在表目錄的下的Metadata目錄下,它需要生成三種文件

1、欄位文件,命令方式為 列ID.dict

2、sort index文件,命令方式為 列ID.sortindex

3、字典列的meta信息,命令方式為 列ID.dictmeta


4、數據生成過程

請打開CarbonDataRDDFactory,找到loadCarbonData這個方法,方法裡面包括了從load命令和從dataframe載入的兩種方式,代碼看起來是有點兒又長又臭的感覺。我們只關注loadDataFrame的方式就好。

def loadDataFrame: Unit = {
try {
val rdd = dataFrame.get.rdd
      // 獲取數據的位置
val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]]{ p =>
DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
}.distinct.size
// 確保executor數量要和數據的節點數一樣多 val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(nodeNumOfData, sqlContext.sparkContext) val newRdd = new DataLoadCoalescedRDD[Row](rdd, nodes.toArray.distinct) // 生成數據文件 status = new NewDataFrameLoaderRDD(sqlContext.sparkContext, new DataLoadResultImpl, carbonLoadModel, currentLoadCount, tableCreationTime, schemaLastUpdatedTime, newRdd).collect } catch { case ex: Exception => LOGGER.error(ex, "load data frame failed") throw ex } }

打開NewDataFrameLoaderRDD類,查看internalCompute方法,這個方法的核心是這句話

new DataLoadExecutor.execute(model, loader.storeLocation, recordReaders.toArray)

打開DataLoadExecutor,execute方法裡面的核心是DataLoadProcessBuilder的build方法,根據表不同的參數設置,DataLoadProcessBuilder的build過程會有一些不同

public AbstractDataLoadProcessorStep build(CarbonLoadModel loadModel, String storeLocation,
CarbonIterator[] inputIterators) throws Exception {
CarbonDataLoadConfiguration configuration = createConfiguration(loadModel, storeLocation);
SortScopeOptions.SortScope sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
if (!configuration.isSortTable || sortScope.equals(SortScopeOptions.SortScope.NO_SORT)) {
// 沒有排序列或者carbon.load.sort.scope設置為NO_SORT的
return buildInternalForNoSort(inputIterators, configuration);
} else if (configuration.getBucketingInfo != null) {
// 設置了Bucket的表
return buildInternalForBucketing(inputIterators, configuration);
} else if (sortScope.equals(SortScopeOptions.SortScope.BATCH_SORT)) {
// carbon.load.sort.scope設置為BATCH_SORT
return buildInternalForBatchSort(inputIterators, configuration);
} else {
return buildInternal(inputIterators, configuration);
}
}

下面僅介紹標準的導入過程buildInternal:

private AbstractDataLoadProcessorStep buildInternal(CarbonIterator[] inputIterators,
CarbonDataLoadConfiguration configuration) {
// 1. Reads the data input iterators and parses the data.
AbstractDataLoadProcessorStep inputProcessorStep =
new InputProcessorStepImpl(configuration, inputIterators);
// 2. Converts the data like dictionary or non dictionary or complex objects depends on
// data types and configurations.
AbstractDataLoadProcessorStep converterProcessorStep =
new DataConverterProcessorStepImpl(configuration, inputProcessorStep);
// 3. Sorts the data by SortColumn
AbstractDataLoadProcessorStep sortProcessorStep =
new SortProcessorStepImpl(configuration, converterProcessorStep);
// 4. Writes the sorted data in carbondata format.
return new DataWriterProcessorStepImpl(configuration, sortProcessorStep);
}

主要是分4個步驟:

1、讀取數據,並進行格式轉換,這一步驟是讀取csv文件服務的,dataframe的數據格式都已經處理過了

2、根據欄位的數據類型和配置,替換掉字典列的值;非字典列會被替換成byte數組

3、按照Sort列進行排序

4、把數據用Carbondata的格式輸出

下面我們從第二步DataConverterProcessorStepImpl開始說起,在getIterator方法當中,會發現每一個CarbonRowBatch都要經過localConverter的convert方法轉換,localConverter中只有RowConverterImpl一個轉換器。

RowConverterImpl由很多的FieldConverter組成,在initialize方法中可以看到它是由FieldEncoderFactory的createFieldEncoder方法生成的。

public FieldConverter createFieldEncoder(DataField dataField,
Cache cache,
CarbonTableIdentifier carbonTableIdentifier, int index, String nullFormat,
DictionaryClient client, Boolean useOnePass, String storePath, boolean tableInitialize,
Map localCache, boolean isEmptyBadRecord)
throws IOException {
// Converters are only needed for dimensions and measures it return null.
if (dataField.getColumn.isDimension) {
if (dataField.getColumn.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
!dataField.getColumn.isComplex) {
return new DirectDictionaryFieldConverterImpl(dataField, nullFormat, index,
isEmptyBadRecord);
} else if (dataField.getColumn.hasEncoding(Encoding.DICTIONARY) &&
!dataField.getColumn.isComplex) {
return new DictionaryFieldConverterImpl(dataField, cache, carbonTableIdentifier, nullFormat,
index, client, useOnePass, storePath, tableInitialize, localCache, isEmptyBadRecord);
} else if (dataField.getColumn.isComplex) {
return new ComplexFieldConverterImpl(
createComplexType(dataField, cache, carbonTableIdentifier,
client, useOnePass, storePath, tableInitialize, localCache), index);
} else {
return new NonDictionaryFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord);
}
} else {
return new MeasureFieldConverterImpl(dataField, nullFormat, index, isEmptyBadRecord);
}
}

View Code

從這段代碼當中可以看出來,它是分成了幾種類型的

1、維度類型,編碼方式為Encoding.DIRECT_DICTIONARY的非複雜列,採用DirectDictionaryFieldConverterImpl (主要是TIMESTAMP和DATE類型),換算成值和基準時間的差值

2、維度類型,編碼方式為Encoding.DICTIONARY的非複雜列,採用DictionaryFieldConverterImpl (非高基數的欄位類型),把欄位換成字典中的key(int類型)

3、維度類型,複雜列,採用ComplexFieldConverterImpl (複雜欄位類型,Sturct和Array類型),把欄位轉成二進位

4、維度類型,高基數列,採用NonDictionaryFieldConverterImpl,原封不動,原來是啥樣,現在還是啥樣

5、指標類型,採用MeasureFieldConverterImpl (值類型,float、double、int、bigint、decimal等),原封不動,原來是啥樣,現在還是啥樣

第三步SortProcessorStepImpl,關鍵點在SorterFactory.createSorter是怎麼實現的

public static Sorter createSorter(CarbonDataLoadConfiguration configuration, AtomicLong counter) {
boolean offheapsort = Boolean.parseBoolean(CarbonProperties.getInstance
.getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT));
SortScopeOptions.SortScope sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
Sorter sorter;
if (offheapsort) {
if (configuration.getBucketingInfo != null) {
sorter = new UnsafeParallelReadMergeSorterWithBucketingImpl(configuration.getDataFields,
configuration.getBucketingInfo);
} else {
sorter = new UnsafeParallelReadMergeSorterImpl(counter);
}
} else {
if (configuration.getBucketingInfo != null) {
sorter =
new ParallelReadMergeSorterWithBucketingImpl(counter, configuration.getBucketingInfo);
} else {
sorter = new ParallelReadMergeSorterImpl(counter);
}
}
if (sortScope.equals(SortScopeOptions.SortScope.BATCH_SORT)) {
if (configuration.getBucketingInfo == null) {
sorter = new UnsafeBatchParallelReadMergeSorterImpl(counter);
} else {
LOGGER.warn(
"Batch sort is not enabled in case of bucketing. Falling back to " + sorter.getClass
.getName);
}
}
return sorter;
}

View Code

居然還可以使用堆外內存sort,設置enable.unsafe.sort為true就可以開啟了。我們看默認的ParallelReadMergeSorterImpl吧。

超過100000條記錄就要把數據排序,然後生成一個文件,文件數超過20個文件之後,就要做一次文件合并。

規則在NewRowComparatorForNormalDims當中,從規則上可以看出來,需要排序的列一定是在所有數據的前N列,而不會是隨機散落的

public int compare(Object[] rowA, Object[] rowB) {
int diff = 0;

for (int i = 0; i < numberOfSortColumns; i++) { int dimFieldA = (int)rowA[i]; int dimFieldB = (int)rowB[i]; diff = dimFieldA - dimFieldB; if (diff != 0) { return diff; } } return diff; }

View Code

相關參數:

carbon.sort.size 100000

carbon.sort.intermediate.files.limit 20

到最後一步了,打開DataWriterProcessorStepImpl類,它是通過CarbonFactHandlerFactory.createCarbonFactHandler生成一個CarbonFactHandler,通過CarbonFactHandler的addDataToStore方法處理CarbonRow

addDataToStore的實現很簡單,當row的數量達到一個blocklet的大小之後,就往線程池裡提交一個非同步的任務Producer進行處理

public void addDataToStore(CarbonRow row) throws CarbonDataWriterException {
dataRows.add(row);
this.entryCount++;
// if entry count reaches to leaf node size then we are ready to write
// this to leaf node file and update the intermediate files
if (this.entryCount == this.blockletSize) {
try {
semaphore.acquire;

producerExecutorServiceTaskList.add(
producerExecutorService.submit(
new Producer(blockletDataHolder, dataRows, ++writerTaskSequenceCounter, false)
)
);
blockletProcessingCount.incrementAndGet;
// set the entry count to zero
processedDataCount += entryCount;
LOGGER.info("Total Number Of records added to store: " + processedDataCount);
dataRows = new ArrayList<>(this.blockletSize);
this.entryCount = 0;
} catch (InterruptedException e) {
LOGGER.error(e, e.getMessage);
throw new CarbonDataWriterException(e.getMessage, e);
}
}
}

View Code

這裡用到了生產者消費者的模式,Producer的處理是多線程的,Consumer是單線程的;Producer主要是負責數據的壓縮,Consumer負責進行輸出,數據的交換通過blockletDataHolder。

相關參數:

carbon.number.of.cores.while.loading 2 (Producer的線程數)

carbon.blocklet.size 120000

文件生成主要包含以上過程,限於文章篇幅,下一章再繼續接著寫Carbondata的數據文件格式細節。

喜歡這篇文章嗎?立刻分享出去讓更多人知道吧!

本站內容充實豐富,博大精深,小編精選每日熱門資訊,隨時更新,點擊「搶先收到最新資訊」瀏覽吧!


請您繼續閱讀更多來自 達人科技 的精彩文章:

直截了當的告訴你什麼是工廠模式
React 實踐項目(三)
包裝類、數組、string類淺析及練習
Hadoop源碼系列(一)FairScheduler申請和分配container的過程
Android系統——輸入系統(十五)實戰 使用GlobalKey一鍵啟動程序

TAG:達人科技 |

您可能感興趣

MapReduce中源碼分析(map端的過程)
Ryan Reynolds 談論《Pokémon: Detective Pikachu》角色 Pikachu 學習過程
SAP Fiori實施之Transactional Apps實施全過程
MongoDB系統CentOS 7.1 crash的排障過程
oracle 中 print_table 存儲過程介紹
Biomaterials:肺上皮再生過程中,Fibrillin-2和Tenascin-C消除了年齡差距的影響
Ultimaker Cura集成了HP 3D掃描軟體,以簡化3D列印過程!
HackTheBox Writeup之拿下Mantis主機許可權過程
HttpURLConnection使用過程中踩的坑
不一樣的Windows,ReactOS系統詳細安裝過程
剖析Hadoop和Spark的Shuffle過程差異
SonarQube代碼審核詳細過程
強化學習 2 Markov Decision Process 馬可夫決策過程
GlobeImposter家族的勒索樣本分析過程
新車檢測過程有瑕疵 Suzuki、Mazda、Yamaha排污數據「全造假」
Facebook也參與了英特爾Cooper Lake CPU的研發過程
Spring源碼閱讀——Bean的載入和獲取過程
圖解 intel Core i7 CPU生產全過程,漲知識
美美滴angelababy 彩鉛手繪過程圖
Kubernetes中Node的網路重啟後,所有Pod無法訪問的調查過程