distributedShell 樣例源碼詳解

簡(jiǎn)介
distributedShell是Yarn自帶的應用程序,和MR類(lèi)似,當前工具可以用來(lái)對Yarn進(jìn)行壓測。
使用示例
參考命令如下:
./bin/hadoop jar ./share/hadoop/yarn/hadoop-yarn-applications-distributedshell-3.4.1.jar \
-jar ./share/hadoop/yarn/hadoop-yarn-applications-distributedshell-3.4.1.jar -shell_command \
'/bin/date' -num_containers 5
可以提交一個(gè)樣例作業(yè)到Yarn上面。
源碼閱讀
當前樣例的入口類(lèi)是org.apache.hadoop.yarn.applications.distributedshell.Client ,在pom文件里面默認定義了當前類(lèi)為主類(lèi)。所以在提交的時(shí)候可以不用指定主類(lèi)。
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<!-- 省略部分參數 -->
</executions>
<configuration>
<archive>
<manifest>
<mainClass>org.apache.hadoop.yarn.applications.distributedshell.Client</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
核心流程主要包含下面3個(gè):
- 初始化CLient對象
- 初始化Client
- 提交作業(yè)到yarn
其中前面兩個(gè)主要在客戶(hù)端,第3個(gè)主要是在yarn上面。
客戶(hù)端提交核心代碼
初始化
初始化階段包括下面兩部分:
- 初始化Client對象,主要是創(chuàng )建Yarn的連接以及初始化支持的參數列表
- 初始化Client
下面是初始化Client對象的核心代碼。
Client(String appMasterMainClass, Configuration conf) {
this.conf = conf;
this.conf.setBoolean(
YarnConfiguration.YARN_CLIENT_LOAD_RESOURCETYPES_FROM_SERVER, true);
this.appMasterMainClass = appMasterMainClass;
// 創(chuàng )建和RM的連接
yarnClient = YarnClient.createYarnClient();
yarnClient.init(conf);
opts = new Options();
// 初始化支持的參數列表
stopSignalReceived = new AtomicBoolean(false);
isRunning = new AtomicBoolean(false);
}
初始化Client,在初始化Client階段主要是讀取命令行參數。
// 初始化Client函數入口
boolean doRun = client.init(args);
運行作業(yè)
首先還是建立和Yarn服務(wù)端的連接,為作業(yè)提交做準備。
isRunning.set(true);
yarnClient.start();
在連接建立之后會(huì )查詢(xún)并且在控制臺打印Yarn服務(wù)端的一些信息。主要包含下面內容:
- 當前集群NM的個(gè)數,通過(guò)
yarnClient.getYarnClusterMetrics()查詢(xún)到并且顯示。 - 當前集群中運行中NM的詳細信息,通過(guò)
yarnClient.getNodeReports(NodeState.RUNNING)查詢(xún)到。 - 當前任務(wù)提交的隊列的詳細信息,通過(guò)
yarnClient.getQueueInfo(this.amQueue)查詢(xún)到。 - 當前集群的ACL信息,通過(guò)
yarnClient.getQueueAclsInfo()查詢(xún)。 - 當前集群的ResourceProfile信息,通過(guò)
yarnClient.getResourceProfiles()查詢(xún)。
在打印完集群信息之后才是作業(yè)提交的開(kāi)始。
提交作業(yè)之前,是需要先向RM申請AppId的。AppId可以通過(guò)YarnClientApplication app = yarnClient.createApplication();獲取。作業(yè)提交信息一般都在A(yíng)pplicationSubmissionContext里面,包含下面信息:
-
AM申請資源的請求。通過(guò)
appContext.setAMContainerResourceRequests(amResourceRequests);設置。 -
AM的上下文信息:
- 訪(fǎng)問(wèn)hdfs等所需要的token。當前token會(huì )伴隨著(zhù)整個(gè)作業(yè),直到作業(yè)結束才會(huì )異步銷(xiāo)毀。
- 需要本地話(huà)的文件。
- AM或者Container所需要的環(huán)境變量。
- AM的啟動(dòng)命令,AM啟動(dòng)的類(lèi)也是在這里指定的。類(lèi)似于 java運行jar或者某個(gè)主類(lèi)。
-
App名稱(chēng)。通過(guò)
appContext.setApplicationName(appName);設置。 -
app tag信息。
-
資源標簽信息。
-
作業(yè)的優(yōu)先級。
-
作業(yè)提交的隊列信息。
-
日志聚合相關(guān)配置。主要是和日志歸集的Rolling模式有關(guān)系??梢栽O置需要通過(guò)rolling的方式歸集哪些日志。通過(guò)
appContext.setLogAggregationContext(logAggregationContext);設置。
作業(yè)真正提交的代碼只有一行:
yarnClient.submitApplication(appContext);
當前樣例做到了作業(yè)所需要的信息可配置。是一個(gè)比較適合開(kāi)發(fā)作業(yè)的樣例。
AM核心代碼
AM的核心代碼是在A(yíng)pplicationMaster.java里面的。在啟動(dòng)AM的時(shí)候會(huì )調用到當前函數的main函數。
在構造函數里面和init函數里面,主要是加載配置項以及命令行參數。真正運行的函數是run,核心在run函數里面,
首先需要創(chuàng )建和RM以及NM的連接。
amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
amRMClient.init(conf);
amRMClient.start();
containerListener = createNMCallbackHandler();
nmClientAsync = new NMClientAsyncImpl(containerListener);
nmClientAsync.init(conf);
nmClientAsync.start();
startTimelineClient(conf);
在A(yíng)M啟動(dòng)OK了第一件事就是需要去RM上面注冊,證明當前AM已經(jīng)啟動(dòng)完成了。
RegisterApplicationMasterResponse response = amRMClient
.registerApplicationMaster(appMasterHostname, appMasterRpcPort,
appMasterTrackingUrl, placementConstraintMap);
普通Container的申請是在A(yíng)M里面處理的,類(lèi)似下面代碼,下面代碼是異步申請的。
ContainerRequest containerAsk = setupContainerAskForRM();
amRMClient.addContainerRequest(containerAsk);
當Container申請好之后,可以通過(guò)下面代碼獲取,在樣例中觸發(fā)onContainerAllocated事件。
List<Container> allocated = response.getAllocatedContainers();
if (!allocated.isEmpty()) {
handler.onContainersAllocated(allocated);
}
通過(guò)下面代碼啟動(dòng)Container.
ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
localResources, myShellEnv, commands, null, allTokens.duplicate(),
null, containerRetryContext);
nmClientAsync.startContainerAsync(container, ctx);
在作業(yè)結束的時(shí)候,AM需要做下面事:
- 停止nmClient。
- 從RM上取消AppMaster
- 停止amClient。
nmClientAsync.stop();
try {
amRMClient.unregisterApplicationMaster(appStatus, message, null);
} catch (YarnException | IOException ex) {
LOG.error("Failed to unregister application", ex);
}
amRMClient.stop();
評論