DistCp源碼解析
說(shuō)明
DistCp(分布式拷貝)是用于大規模集群內部和集群之間拷貝的工具。
它使用Map/Reduce實(shí)現文件分發(fā),錯誤處理和恢復,以及報告生成。
它把文件和目錄的列表作為map任務(wù)的輸入,每個(gè)任務(wù)會(huì )完成源列表中部分文件的拷貝。
由于使用了Map/Reduce方法,這個(gè)工具在語(yǔ)義和執行上都會(huì )有特殊的地方。 這篇文檔會(huì )為常用DistCp操作提供指南并闡述它的工作模型。
源碼詳解
作業(yè)啟動(dòng)
作業(yè)的啟動(dòng)主要包含初始化和作業(yè)提交,在初始化階段主要是list左右需要拷貝的文件信息,根據文件信息構造split信息。
作業(yè)提交階段就是根據初始化階段構造的split信息,將作業(yè)提交到Yarn上面。
作業(yè)初始化
初始化階段主要是list左右需要拷貝的文件信息,根據文件信息構造split信息。
DistCp的入口函數是main函數,在main函數里面主要做了兩件事:
- 注冊Cleanup。
- 初始化和啟動(dòng)作業(yè),核心處理函數為execute函數里面的createAndSubmitJob
創(chuàng )建Job對象,主要是指定Map的處理類(lèi),InputFormat 和outputFormat 信息:
Job job = Job.getInstance(getConf());
job.setJobName(jobName);
job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), context));
job.setJarByClass(CopyMapper.class);
configureOutputFormat(job);
job.setMapperClass(CopyMapper.class);
job.setOutputFormatClass(CopyOutputFormat.class);
job.getConfiguration().set(JobContext.MAP_SPECULATIVE, "false");
根據需要拷貝的目錄獲取所有的文件信息。支持snapshot模式和普通模式。
snapshot 模式
核心函數為SimpleCopyListing.doBuildListingWithSnapshotDiff。主要是通過(guò)DistCpSync.getAllDiffs獲取Snapshot的差異文件。
差異文件主要包含創(chuàng )建、修改、刪除類(lèi)型,將差別的的文件輸出到fileList.seq文件里面。fileList.seq文件在staging目錄下面的的_distcp_隨機的int值。
普通模式
核心函數為SimpleCopyListing.doBuildListing。對于非snapshot模式,核心處理邏輯就是通過(guò)list將所有的文件獲取出來(lái)。添加到fileList.seq里面。
對于XAttrs等權限信息也會(huì )按照-p參數指定的來(lái)獲取。
作業(yè)提交
由于DistCp也是MapReduce作業(yè),所以作業(yè)提交沿用了MapReduce作業(yè)提交的框架,對于Map和Reduce的處理類(lèi),
以及InputFormat和outputFormat都是DistCp自己實(shí)現的。
其中比較常用的是DynamicInputFormat,DynamicInputFormat主要是通過(guò)主要是按照文件數量分配的。
作業(yè)運行
AM運行
在創(chuàng )建作業(yè)的時(shí)候定義了outputFormat,在CopyOutputFormat中定義了getOutputCommitter。
job.setOutputFormatClass(CopyOutputFormat.class);
Distcp的AM結束時(shí)的核心處理類(lèi)是CopyCommitter。結束的時(shí)候會(huì )調用commitJob函數,在commitJob函數里面。
deleteMissing函數
從目標端刪除多余的文件,需要配置-delete參數。
preserveFileAttributesForDirectories函數
當前函數是用于檢查并修改文件屬性的功能。當前是單線(xiàn)程運行,在文件多的時(shí)候可能會(huì )比較慢。同步的權限包含:
- ACL權限。
- 普通權限。
- 副本數。
- XATTR屬性。
- 用戶(hù)以及用戶(hù)組。
Map運行
Map運行的核心類(lèi)是CopyMapper。當前類(lèi)的核心函數為: setup()、 map()、 cleanup()、 run()
setup函數
setup函數主要是讀取配置。setup函數的入參是Context,里面包含從客戶(hù)端傳入的配置文件信息??梢酝ㄟ^(guò)context.getConfiguration()獲取。
map函數
map函數是復制數據的核心類(lèi),map的入參定義如下:
public void map(Text relPath, CopyListingFileStatus sourceFileStatus,
Context context) throws IOException, InterruptedException {
}
- relPath:目標文件路徑。
- sourceFileStatus: 源端文件信息,包含路徑。
在map函數里面主要做了幾件事:
- 獲取目標端文件信息,如果復制的屬性里面包含XATTR,則需要單獨調用getXAttrs接口獲取XATTR信息,當前會(huì )多一次請求,大大的增加復制時(shí)間。
- 檢查當前文件是否需要復制,如果需要copy,則將文件拷貝到目標端。
- 復制文件的屬性到目標端。核心函數如下:
DistCpUtils.preserve(target.getFileSystem(conf), tmpTarget,
sourceCurrStatus, fileAttributes, preserveRawXattrs);
cleanup函數
在當前map結束之后調用,主要是更新統計信息,主要是復制帶寬。如下:
long secs = (System.currentTimeMillis() - startEpoch) / 1000;
incrementCounter(context, Counter.BANDWIDTH_IN_BYTES,
totalBytesCopied / ((secs == 0 ? 1 : secs)));
run函數
主要是MapReduce框架層面的邏輯,控制map的所有流程,在處理完成之后調用cleanup。
setup(context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
Reduce運行
DistCp作業(yè)沒(méi)有reduce任務(wù),只有map。
評論