大家好,欢迎来到IT知识分享网。
Flink Yarn-per-job模式提交流程如图所示:
1、启动Yarn客户端
AbstractJobClusterExecutor.java
public CompletableFuture<JobClient> execute(@Nonnull finalPipeline pipeline, @Nonnull final Configuration configuration) throws Exception{
final JobGraph jobGraph =ExecutorUtils.getJobGraph(pipeline, configuration);
// 创建并启动yarn客户端
try (finalClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)){
final ExecutionConfigAccessorconfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
// 获取集群配置参数
final ClusterSpecificationclusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
// 部署集群
final ClusterClientProvider<ClusterID> clusterClientProvider =clusterDescriptor
.deployJobCluster(clusterSpecification,jobGraph, configAccessor.getDetachedMode());
LOG.info("Job has beensubmitted with JobID " + jobGraph.getJobID());
return CompletableFuture.completedFuture(
newClusterClientJobClientAdapter<>(clusterClientProvider,jobGraph.getJobID()));
}
}
IT知识分享网
YarnClusterClientFactory.java
IT知识分享网public YarnClusterDescriptor createClusterDescriptor(Configuration configuration){
... ...
return getClusterDescriptor(configuration);
}
private YarnClusterDescriptor getClusterDescriptor(Configurationconfiguration) {
final YarnClient yarnClient = YarnClient.createYarnClient();
final YarnConfigurationyarnConfiguration = new YarnConfiguration();
yarnClient.init(yarnConfiguration);
yarnClient.start();
return new YarnClusterDescriptor(
configuration,
yarnConfiguration,
yarnClient,
YarnClientYarnClusterInformationRetriever.create(yarnClient),
false);
}
2、获取集群配置参数
AbstractContainerizedClusterClientFactory.java
public ClusterSpecification getClusterSpecification(Configurationconfiguration) {
... ...
final int jobManagerMemoryMB =JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
configuration,
JobManagerOptions.TOTAL_PROCESS_MEMORY)
.getTotalProcessMemorySize()
.getMebiBytes();
final int taskManagerMemoryMB = TaskExecutorProcessUtils
.processSpecFromConfig(TaskExecutorProcessUtils.getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption(
configuration,TaskManagerOptions.TOTAL_PROCESS_MEMORY))
.getTotalProcessMemorySize()
.getMebiBytes();
int slotsPerTaskManager =configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
return newClusterSpecification.ClusterSpecificationBuilder()
.setMasterMemoryMB(jobManagerMemoryMB)
.setTaskManagerMemoryMB(taskManagerMemoryMB)
.setSlotsPerTaskManager(slotsPerTaskManager)
.createClusterSpecification();
}
3、部署集群
YarnClusterDescriptor.java
IT知识分享网public ClusterClientProvider<ApplicationId> deployJobCluster(
ClusterSpecification clusterSpecification,
JobGraph jobGraph,
boolean detached) throws ClusterDeploymentException{
try {
return deployInternal(
clusterSpecification,
"Flink per-jobcluster",
getYarnJobClusterEntrypoint(), //获取YarnJobClusterEntrypoint,启动AM的入口
jobGraph,
detached);
} catch (Exception e) {
throw new ClusterDeploymentException("Couldnot deploy Yarn job cluster.", e);
}
}
上传 jar 包和配置文件到 HDFS
YarnClusterDescriptor.java
private ClusterClientProvider<ApplicationId> deployInternal(
ClusterSpecificationclusterSpecification,
String applicationName,
String yarnClusterEntrypoint,
@Nullable JobGraph jobGraph,
boolean detached) throwsException {
... ...
// 创建应用
final YarnClientApplicationyarnApplication = yarnClient.createApplication();
... ...
ApplicationReport report = startAppMaster(
flinkConfiguration,
applicationName,
yarnClusterEntrypoint,
jobGraph,
yarnClient,
yarnApplication,
validClusterSpecification);
... ...
}
private ApplicationReport startAppMaster(
Configuration configuration,
String applicationName,
String yarnClusterEntrypoint,
JobGraph jobGraph,
YarnClient yarnClient,
YarnClientApplicationyarnApplication,
ClusterSpecificationclusterSpecification) throws Exception {
... ...
// 初始化文件系统(HDFS)
final FileSystem fs = FileSystem.get(yarnConfiguration);
... ...
ApplicationSubmissionContextappContext =yarnApplication.getApplicationSubmissionContext();
final List<Path> providedLibDirs = getRemoteSharedPaths(configuration);
// 上传文件的工具类
final YarnApplicationFileUploader fileUploader=YarnApplicationFileUploader.from(
fs,
fs.getHomeDirectory(),
providedLibDirs,
appContext.getApplicationId(),
getFileReplication());
... ...
final ApplicationId appId =appContext.getApplicationId();
... ...
if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)){
// yarn重试次数,默认2
appContext.setMaxAppAttempts(
configuration.getInteger(
YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
activateHighAvailabilitySupport(appContext);
} else {
//不是高可用重试次数为1
appContext.setMaxAppAttempts(
configuration.getInteger(
YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
1));
}
... ...
// 多次调用上传HDFS的方法,分别是:
// => systemShipFiles:日志的配置文件、lib/目录下除了dist的jar包
// => shipOnlyFiles:plugins/目录下的文件
// => userJarFiles:用户代码的jar包
fileUploader.registerMultipleLocalResources (... ...);
... ...
// 上传和配置ApplicationMaster的jar包:flink-dist*.jar
final YarnLocalResourceDescriptorlocalResourceDescFlinkJar = fileUploader.uploadFlinkDist(flinkJarPath);
... ...
//
fileUploader.registerSingleLocalResource(
jobGraphFilename,
newPath(tmpJobGraphFile.toURI()),
"",
true,
false);
... ...
// 上传flink配置文件
String flinkConfigKey ="flink-conf.yaml";
Path remotePathConf = setupSingleLocalResource(
flinkConfigKey,
fs,
appId,
newPath(tmpConfigurationFile.getAbsolutePath()),
localResources,
homeDir,
"");
... ...
// 将JobGraph写入tmp文件并添加到本地资源,并上传到HDFS
fileUploader.registerSingleLocalResource(
jobGraphFilename,
newPath(tmpJobGraphFile.toURI()),
"",
true,
false);
... ...
// 上传flink配置文件
String flinkConfigKey = "flink-conf.yaml";
fileUploader.registerSingleLocalResource(
flinkConfigKey,
newPath(tmpConfigurationFile.getAbsolutePath()),
"",
true,
true);
... ...
final JobManagerProcessSpec processSpec =JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
flinkConfiguration,
JobManagerOptions.TOTAL_PROCESS_MEMORY);
//封装启动AM container的Java命令
final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
yarnClusterEntrypoint,
hasKrb5,
processSpec);
... ...
appContext.setApplicationName(customApplicationName);
appContext.setApplicationType(applicationType!= null ? applicationType : "Apache Flink");
appContext.setAMContainerSpec(amContainer);
appContext.setResource(capability);
... ...
yarnClient.submitApplication(appContext);
... ...
}
封装 AM 参数和命令
YarnClusterDescriptor.java
ContainerLaunchContext setupApplicationMasterContainer(
String yarnClusterEntrypoint,
boolean hasKrb5,
JobManagerProcessSpec processSpec) {
// respect custom JVM options in theYAML file
String javaOpts =flinkConfiguration.getString(CoreOptions.FLINK_JVM_OPTIONS);
if (flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS).length()> 0) {
javaOpts += " " +flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS);
}
//applicable only for YarnMiniClustersecure test run
//krb5.conf file will be available aslocal resource in JM/TM container
if (hasKrb5) {
javaOpts += "-Djava.security.krb5.conf=krb5.conf";
}
// 创建AM的容器启动上下文
ContainerLaunchContext amContainer =Records.newRecord(ContainerLaunchContext.class);
final Map<String, String> startCommandValues = new HashMap<>();
startCommandValues.put("java","$JAVA_HOME/bin/java");
String jvmHeapMem =JobManagerProcessUtils.generateJvmParametersStr(processSpec,flinkConfiguration);
startCommandValues.put("jvmmem", jvmHeapMem);
startCommandValues.put("jvmopts", javaOpts);
startCommandValues.put("logging",YarnLogConfigUtil.getLoggingYarnCommand(flinkConfiguration));
startCommandValues.put("class",yarnClusterEntrypoint);
startCommandValues.put("redirects",
"1> " +ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.out " +
"2> " +ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.err");
startCommandValues.put("args", "");
final String commandTemplate =flinkConfiguration
.getString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
ConfigConstants.DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE);
final String amCommand =
BootstrapTools.getStartCommand(commandTemplate,startCommandValues);
amContainer.setCommands(Collections.singletonList(amCommand));
LOG.debug("Application Masterstart command: " + amCommand);
return amContainer;
}
封装 AM 参数:
private ApplicationReport startAppMaster(
Configuration configuration,
String applicationName,
String yarnClusterEntrypoint,
JobGraph jobGraph,
YarnClient yarnClient,
YarnClientApplicationyarnApplication,
ClusterSpecificationclusterSpecification) throws Exception {
... ...
final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
yarnClusterEntrypoint,
hasKrb5,
processSpec);
... ...
// 封装AM 的classpath和环境参数
final Map<String,String> appMasterEnv = new HashMap<>();
// set user specified appmaster environment variables
appMasterEnv.putAll(
ConfigurationUtils.getPrefixedKeyValuePairs(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX,configuration));
// set Flink app class path
appMasterEnv.put(YarnConfigKeys.ENV_FLINK_CLASSPATH,classPathBuilder.toString());
// set Flink on YARN internalconfiguration values
appMasterEnv.put(YarnConfigKeys.FLINK_DIST_JAR,localResourceDescFlinkJar.toString());
appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString());
appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR,fileUploader.getHomeDir().toString());
appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, encodeYarnLocalResourceDescriptorListToString(fileUploader.getEnvShipResourceList()));
appMasterEnv.put(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE, getZookeeperNamespace());
appMasterEnv.put(YarnConfigKeys.FLINK_YARN_FILES,fileUploader.getApplicationDir().toUri().toString());
//https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_name
appMasterEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME,UserGroupInformation.getCurrentUser().getUserName());
if (localizedKeytabPath !=null) {
appMasterEnv.put(YarnConfigKeys.LOCAL_KEYTAB_PATH, localizedKeytabPath);
String principal =configuration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal);
if (remotePathKeytab!= null) {
appMasterEnv.put(YarnConfigKeys.REMOTE_KEYTAB_PATH,remotePathKeytab.toString());
}
}
//To support Yarn SecureIntegration Test Scenario
if (remoteYarnSiteXmlPath !=null) {
appMasterEnv.put(YarnConfigKeys.ENV_YARN_SITE_XML_PATH,remoteYarnSiteXmlPath.toString());
}
if (remoteKrb5Path != null) {
appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, remoteKrb5Path.toString());
}
// set classpath from YARNconfiguration
Utils.setupYarnClassPath(yarnConfiguration,appMasterEnv);
//设置 AM 参数
amContainer.setEnvironment(appMasterEnv);
... ...
yarnClient.submitApplication(appContext);
... ...
}
4、提交应用
YarnClientImpl.java
public ApplicationId submitApplication(ApplicationSubmissionContextappContext) throws YarnException, IOException {
ApplicationId applicationId =appContext.getApplicationId();
... ...
SubmitApplicationRequest request =
Records.newRecord(SubmitApplicationRequest.class);
request.setApplicationSubmissionContext(appContext);
//TODO: YARN-1763:Handle RM failoversduring the submitApplication call.
rmClient.submitApplication(request);
... ...
}
ApplicationClientProtocolPBClientImpl.java
public SubmitApplicationResponse submitApplication(
SubmitApplicationRequest request) throwsYarnException,
IOException {
//取出报文
SubmitApplicationRequestProto requestProto =
((SubmitApplicationRequestPBImpl)request).getProto();
//将报文发送发送到服务端,并将返回结果构成response
try {
return newSubmitApplicationResponsePBImpl(proxy.submitApplication(null,
requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;
}
}
ApplicationClientProtocolPBServiceImpl.java
public SubmitApplicationResponseProto submitApplication(RpcController arg0,
SubmitApplicationRequestProto proto) throws ServiceException {
//服务端重新构建报文
SubmitApplicationRequestPBImpl request = newSubmitApplicationRequestPBImpl(proto);
......
SubmitApplicationResponse response = real.submitApplication(request);
return((SubmitApplicationResponsePBImpl)response).getProto();
......
}
ClientRMService.java
public SubmitApplicationResponse submitApplication(SubmitApplicationRequestrequest) throws YarnException {
... ...
//将应用请求提交到Yarn上的RMAppManager去提交任务
this.rmAppManager.submitApplication(submissionContext,System.currentTimeMillis(), user);
... ...
}
5、创建Dispatcher、ResourceManager
Per-job模式的AM container加载运行入口是YarnJobClusterEntryPoint中的main()方法
YarnJobClusterEntrypoint.java
public staticvoid main(String[] args) {
... ...
Configuration configuration = YarnEntrypointUtils.loadConfiguration(workingDirectory,env);
YarnJobClusterEntrypoint yarnJobClusterEntrypoint = newYarnJobClusterEntrypoint(configuration);
ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);
}
ClusterEntrypoint.java
private void runCluster(Configuration configuration,PluginManager pluginManager) throws Exception {
synchronized (lock) {
initializeServices(configuration,pluginManager);
... ...
//1、创建dispatcher、ResourceManager对象的工厂类
// 其中有从本地重新创建JobGraph的过程
finalDispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory= createDispatcherResourceManagerComponentFactory(configuration);
//2、通过工厂类创建dispatcher、ResourceManager对象
// Entry 启动RpcService、HAService、BlobServer、HeartbeatServices、MetricRegistry、ExecutionGraphStore等
clusterComponent = dispatcherResourceManagerComponentFactory.create(
configuration,
ioExecutor,
commonRpcService,
haServices,
blobServer,
heartbeatServices,
metricRegistry,
archivedExecutionGraphStore,
newRpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
this);
... ...
}
}
DefaultDispatcherResourceManagerComponentFactory.java
public DispatcherResourceManagerComponent create(
Configuration configuration,
Executor ioExecutor,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
BlobServer blobServer,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
ArchivedExecutionGraphStore archivedExecutionGraphStore,
MetricQueryServiceRetriever metricQueryServiceRetriever,
FatalErrorHandler fatalErrorHandler) throws Exception {
LeaderRetrievalService dispatcherLeaderRetrievalService = null;
LeaderRetrievalService resourceManagerRetrievalService = null;
WebMonitorEndpoint<?> webMonitorEndpoint = null;
ResourceManager<?> resourceManager = null;
ResourceManagerMetricGroup resourceManagerMetricGroup = null;
DispatcherRunner dispatcherRunner =null;
try {
dispatcherLeaderRetrievalService =highAvailabilityServices.getDispatcherLeaderRetriever();
resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever();
final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = newRpcGatewayRetriever<>(
rpcService,
DispatcherGateway.class,
DispatcherId::fromUuid,
10,
Time.milliseconds(50L));
final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = newRpcGatewayRetriever<>(
rpcService,
ResourceManagerGateway.class,
ResourceManagerId::fromUuid,
10,
Time.milliseconds(50L));
... ...
// 创建接收前端Rest请求的节点
webMonitorEndpoint =restEndpointFactory.createRestEndpoint(
configuration,
dispatcherGatewayRetriever,
resourceManagerGatewayRetriever,
blobServer,
executor,
metricFetcher,
highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
fatalErrorHandler);
log.debug("StartingDispatcher REST endpoint.");
webMonitorEndpoint.start();
... ...
// 创建ResourceManager对象,返回的是new YarnResourceManager
// 调度过程:AbstractDispatcherResourceManagerComponentFactory
// -> ActiveResourceManagerFactory
// -> YarnResourceManagerFactory
resourceManager = resourceManagerFactory.createResourceManager(
configuration,
ResourceID.generate(),
rpcService,
highAvailabilityServices,
heartbeatServices,
fatalErrorHandler,
newClusterInformation(hostname, blobServer.getPort()),
webMonitorEndpoint.getRestBaseUrl(),
resourceManagerMetricGroup);
... ...
// 创建dispatcherRunner对象并启动
log.debug("StartingDispatcher.");
dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner(
highAvailabilityServices.getDispatcherLeaderElectionService(),
fatalErrorHandler,
newHaServicesJobGraphStoreFactory(highAvailabilityServices),
ioExecutor,
rpcService,
partialDispatcherServices);
// 启动ResourceManager
log.debug("StartingResourceManager.");
resourceManager.start();
resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
return newDispatcherResourceManagerComponent(
dispatcherRunner,
resourceManager,
dispatcherLeaderRetrievalService,
resourceManagerRetrievalService,
webMonitorEndpoint);
}
... ...
}
创建 YarnResourceManager
ResourceManagerFactory.java
public ResourceManager<T> createResourceManager(
Configuration configuration,
ResourceID resourceId,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
FatalErrorHandler fatalErrorHandler,
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
MetricRegistry metricRegistry,
String hostname) throwsException {
final ResourceManagerMetricGroup resourceManagerMetricGroup = ResourceManagerMetricGroup.create(metricRegistry,hostname);
final SlotManagerMetricGroup slotManagerMetricGroup = SlotManagerMetricGroup.create(metricRegistry,hostname);
final ResourceManagerRuntimeServices resourceManagerRuntimeServices = createResourceManagerRuntimeServices(
configuration, rpcService,highAvailabilityServices, slotManagerMetricGroup);
return createResourceManager(
configuration,
resourceId,
rpcService,
highAvailabilityServices,
heartbeatServices,
fatalErrorHandler,
clusterInformation,
webInterfaceUrl,
resourceManagerMetricGroup,
resourceManagerRuntimeServices);
}
YarnResourceManagerFactory.java
public ResourceManager<YarnWorkerNode> createResourceManager(
Configuration configuration,
ResourceID resourceId,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
FatalErrorHandler fatalErrorHandler,
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
ResourceManagerMetricGroup resourceManagerMetricGroup,
ResourceManagerRuntimeServices resourceManagerRuntimeServices) {
return new YarnResourceManager(
rpcService,
resourceId,
configuration,
System.getenv(),
highAvailabilityServices,
heartbeatServices,
resourceManagerRuntimeServices.getSlotManager(),
ResourceManagerPartitionTrackerImpl::new,
resourceManagerRuntimeServices.getJobLeaderIdService(),
clusterInformation,
fatalErrorHandler,
webInterfaceUrl,
resourceManagerMetricGroup);
}
创建YarnResourceManager时,创建了SlotManager
ResourceManagerFactory.java
private ResourceManagerRuntimeServices createResourceManagerRuntimeServices(
Configuration configuration,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
SlotManagerMetricGroup slotManagerMetricGroup) throws ConfigurationException {
return ResourceManagerRuntimeServices.fromConfiguration(
createResourceManagerRuntimeServicesConfiguration(configuration),
highAvailabilityServices,
rpcService.getScheduledExecutor(),
slotManagerMetricGroup);
}
ResourceManagerRuntimeServices.java
public static ResourceManagerRuntimeServices fromConfiguration(
ResourceManagerRuntimeServicesConfiguration configuration,
HighAvailabilityServices highAvailabilityServices,
ScheduledExecutor scheduledExecutor,
SlotManagerMetricGroup slotManagerMetricGroup) {
final SlotManager slotManager = createSlotManager(configuration,scheduledExecutor, slotManagerMetricGroup);
final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
highAvailabilityServices,
scheduledExecutor,
configuration.getJobTimeout());
return new ResourceManagerRuntimeServices(slotManager, jobLeaderIdService);
}
创建并启动 Dispatcher
DefaultDispatcherRunnerFactory.java
public DispatcherRunner createDispatcherRunner(
LeaderElectionService leaderElectionService,
FatalErrorHandler fatalErrorHandler,
JobGraphStoreFactory jobGraphStoreFactory,
Executor ioExecutor,
RpcService rpcService,
PartialDispatcherServices partialDispatcherServices) throws Exception {
final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory =dispatcherLeaderProcessFactoryFactory.createFactory(
jobGraphStoreFactory,
ioExecutor,
rpcService,
partialDispatcherServices,
fatalErrorHandler);
return DefaultDispatcherRunner.create(
leaderElectionService,
fatalErrorHandler,
dispatcherLeaderProcessFactory);
}
DefaultDispatcherRunner.java
public static DispatcherRunner create(
LeaderElectionService leaderElectionService,
FatalErrorHandler fatalErrorHandler,
DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory) throws Exception {
final DefaultDispatcherRunner dispatcherRunner = new DefaultDispatcherRunner(
leaderElectionService,
fatalErrorHandler,
dispatcherLeaderProcessFactory);
return DispatcherRunnerLeaderElectionLifecycleManager.createFor(dispatcherRunner,leaderElectionService);
}
DispatcherRunnerLeaderElectionLifecycleManager.java
public static <T extends DispatcherRunner & LeaderContender> DispatcherRunner createFor(T dispatcherRunner,LeaderElectionService leaderElectionService) throws Exception {
return new DispatcherRunnerLeaderElectionLifecycleManager<>(dispatcherRunner,leaderElectionService);
}
private DispatcherRunnerLeaderElectionLifecycleManager(TdispatcherRunner, LeaderElectionService leaderElectionService) throws Exception{
this.dispatcherRunner =dispatcherRunner;
this.leaderElectionService =leaderElectionService;
// 启动dispacher的leader选举
leaderElectionService.start(dispatcherRunner);
}
StandaloneLeaderElectionService.java
public void start(LeaderContender newContender) throws Exception {
... ...
contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);
}
DefaultDispatcherRunner.java
public void grantLeadership(UUID leaderSessionID) {
runActionIfRunning(() -> startNewDispatcherLeaderProcess(leaderSessionID));
}
private void startNewDispatcherLeaderProcess(UUIDleaderSessionID) {
... ... previousDispatcherLeaderProcessTerminationFuture.thenRun(newDispatcherLeaderProcess::start));
}
AbstractDispatcherLeaderProcess.java
public final void start() {
runIfStateIs(
State.CREATED,
this::startInternal);
}
private void startInternal() {
log.info("Start {}.",getClass().getSimpleName());
state = State.RUNNING;
onStart();
}
JobDispatcherLeaderProcess.java
protected void onStart() {
final DispatcherGatewayServicedispatcherService = dispatcherGatewayServiceFactory.create(
DispatcherId.fromUuid(getLeaderSessionId()),
Collections.singleton(jobGraph),
ThrowingJobGraphWriter.INSTANCE);
completeDispatcherSetup(dispatcherService);
}
public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
DispatcherId fencingToken,
Collection<JobGraph> recoveredJobs,
JobGraphWriter jobGraphWriter){
... ...
// 启动dispacher
dispatcher.start();
... ...
}
启动 ResourceManager
DefaultDispatcherResourceManagerComponentFactory.java
public DispatcherResourceManagerComponent create(
Configuration configuration,
Executor ioExecutor,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
BlobServer blobServer,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
ArchivedExecutionGraphStore archivedExecutionGraphStore,
MetricQueryServiceRetriever metricQueryServiceRetriever,
FatalErrorHandler fatalErrorHandler) throws Exception {
... ...
// 启动ResourceManager
log.debug("StartingResourceManager.");
resourceManager.start();
... ...
}
ResourceManager.java
public void onStart() throws Exception {
... ...
startResourceManagerServices();
... ...
}
private void startResourceManagerServices() throwsException {
try {
leaderElectionService =highAvailabilityServices.getResourceManagerLeaderElectionService();
initialize();
leaderElectionService.start(this);
jobLeaderIdService.start(new JobLeaderIdActionsImpl());
registerTaskExecutorMetrics();
} catch (Exception e) {
handleStartResourceManagerServicesException(e);
}
}
6、Dispatcher启动JobManager
Dispatcher.java
public void onStart() throws Exception {
try {
// 启动Dispatcher
startDispatcherServices();
}
... ...
// 启动Job
startRecoveredJobs();
... ...
}
Dispatcher.java
CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraphjobGraph, long initializationTimestamp) {
final RpcService rpcService =getRpcService();
return CompletableFuture.supplyAsync(
() -> {
try {
JobManagerRunnerrunner = jobManagerRunnerFactory.createJobManagerRunner(
jobGraph,
configuration,
rpcService,
highAvailabilityServices,
heartbeatServices,
jobManagerSharedServices,
newDefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
fatalErrorHandler,
initializationTimestamp);
// 启动 JobManagerRunner
runner.start();
return runner;
}
......
}
JobManagerRunnerImpl.java
public void start() throws Exception {
try {
leaderElectionService.start(this);
} catch (Exception e) {
log.error("Could notstart the JobManager because the leader election service did not start.",e);
throw new Exception("Couldnot start the leader election service.", e);
}
}
StandaloneLeaderElectionService.java
public void start(LeaderContendernewContender) throws Exception {
... ...
contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);
}
JobManagerRunnerImpl.java
public void grantLeadership(final UUID leaderSessionID){
synchronized (lock) {
if (shutdown) {
log.debug("JobManagerRunnercannot be granted leadership because it is already shut down.");
return;
}
leadershipOperation =leadershipOperation.thenCompose(
(ignored) -> {
synchronized(lock) {
// 校验作业的调度状态然后启动作业管理器
returnverifyJobSchedulingStatusAndStartJobManager(leaderSessionID);
}
});
handleException(leadershipOperation,"Could not start the job manager.");
}
}
private CompletableFuture<Void> verifyJobSchedulingStatusAndStartJobManager(UUIDleaderSessionId) {
final CompletableFuture<JobSchedulingStatus> jobSchedulingStatusFuture = getJobSchedulingStatus();
return jobSchedulingStatusFuture.thenCompose(
jobSchedulingStatus -> {
if(jobSchedulingStatus == JobSchedulingStatus.DONE) {
returnjobAlreadyDone();
} else {
return startJobMaster(leaderSessionId);
}
});
}
private CompletionStage<Void> startJobMaster(UUIDleaderSessionId) {
... ...
startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));
... ...
}
JobMaster.java
public CompletableFuture<Acknowledge> start(finalJobMasterId newJobMasterId) throws Exception {
// make sure we receive RPC and asynccalls
start();
return callAsyncWithoutFencing(() ->startJobExecution(newJobMasterId),RpcUtils.INF_TIMEOUT);
}
private Acknowledge startJobExecution(JobMasterId newJobMasterId)throws Exception {
... ...
// 启动JobMaster
startJobMasterServices();
log.info("Starting execution ofjob {} ({}) under job master id {}.", jobGraph.getName(),jobGraph.getJobID(), newJobMasterId);
// 重置开始调度
resetAndStartScheduler();
... ...
}
7、ResourceManager启动SlotManager
ResourceManager.java
public final void onStart() throws Exception {
... ...
startResourceManagerServices();
... ...
}
private void startResourceManagerServices() throwsException {
try {
leaderElectionService =highAvailabilityServices.getResourceManagerLeaderElectionService();
initialize();
leaderElectionService.start(this);
jobLeaderIdService.start(newJobLeaderIdActionsImpl());
registerTaskExecutorMetrics();
} catch (Exception e) {
handleStartResourceManagerServicesException(e);
}
}
创建 Yarn 的 RM 和 NM 客户端
ActiveResourceManager.java
protected void initialize() throws ResourceManagerException{
try {
resourceManagerDriver.initialize(
this,
newGatewayMainThreadExecutor(),
ioExecutor);
} catch (Exception e) {
throw newResourceManagerException("Cannot initialize resource provider.", e);
}
}
AbstractResourceManagerDriver.java
public final void initialize(
ResourceEventHandler<WorkerType> resourceEventHandler,
ScheduledExecutor mainThreadExecutor,
Executor ioExecutor) throwsException {
this.resourceEventHandler =Preconditions.checkNotNull(resourceEventHandler);
this.mainThreadExecutor =Preconditions.checkNotNull(mainThreadExecutor);
this.ioExecutor =Preconditions.checkNotNull(ioExecutor);
initializeInternal();
}
YarnResourceManagerDriver.java
protected void initializeInternal() throws Exception {
final YarnContainerEventHandler yarnContainerEventHandler = new YarnContainerEventHandler();
try {
// 创建和启动yarn的resourcemanager客户端
resourceManagerClient =yarnResourceManagerClientFactory.createResourceManagerClient(
yarnHeartbeatIntervalMillis,
yarnContainerEventHandler);
resourceManagerClient.init(yarnConfig);
resourceManagerClient.start();
final RegisterApplicationMasterResponse registerApplicationMasterResponse = registerApplicationMaster();
getContainersFromPreviousAttempts(registerApplicationMasterResponse);
taskExecutorProcessSpecContainerResourcePriorityAdapter=
newTaskExecutorProcessSpecContainerResourcePriorityAdapter(
registerApplicationMasterResponse.getMaximumResourceCapability(),
ExternalResourceUtils.getExternalResources(flinkConfig,YarnConfigOptions.EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX));
} catch (Exception e) {
throw newResourceManagerException("Could not start resource manager client.",e);
}
// 创建和启动yarn的nodemanager客户端
nodeManagerClient =yarnNodeManagerClientFactory.createNodeManagerClient(yarnContainerEventHandler);
nodeManagerClient.init(yarnConfig);
nodeManagerClient.start();
}
启动 SlotManager
StandaloneLeaderElectionService.java
private void startServicesOnLeadership() {
startHeartbeatServices();
slotManager.start(getFencingToken(),getMainThreadExecutor(), new ResourceActionsImpl());
onLeadership();
}
SlotManagerImpl.java
public void start(ResourceManagerId newResourceManagerId,Executor newMainThreadExecutor, ResourceActions newResourceActions) {
LOG.info("Starting theSlotManager.");
this.resourceManagerId =Preconditions.checkNotNull(newResourceManagerId);
mainThreadExecutor =Preconditions.checkNotNull(newMainThreadExecutor);
resourceActions =Preconditions.checkNotNull(newResourceActions);
started = true;
taskManagerTimeoutsAndRedundancyCheck =scheduledExecutor.scheduleWithFixedDelay(
() ->mainThreadExecutor.execute(
() -> checkTaskManagerTimeoutsAndRedundancy()),
0L,
taskManagerTimeout.toMilliseconds(),
TimeUnit.MILLISECONDS);
slotRequestTimeoutCheck =scheduledExecutor.scheduleWithFixedDelay(
() ->mainThreadExecutor.execute(
() -> checkSlotRequestTimeouts()),
0L,
slotRequestTimeout.toMilliseconds(),
TimeUnit.MILLISECONDS);
registerSlotManagerMetrics();
}
void checkTaskManagerTimeoutsAndRedundancy() {
if (!taskManagerRegistrations.isEmpty()){
long currentTime =System.currentTimeMillis();
ArrayList<TaskManagerRegistration>timedOutTaskManagers = new ArrayList<>(taskManagerRegistrations.size());
// first retrieve the timedout TaskManagers
for (TaskManagerRegistrationtaskManagerRegistration : taskManagerRegistrations.values()) {
if (currentTime -taskManagerRegistration.getIdleSince() >=taskManagerTimeout.toMilliseconds()) {
// we collectthe instance ids first in order to avoid concurrent modifications by the
//ResourceActions.releaseResource call
timedOutTaskManagers.add(taskManagerRegistration);
}
}
int slotsDiff =redundantTaskManagerNum * numSlotsPerWorker - freeSlots.size();
if (freeSlots.size() == slots.size()){
// No need tokeep redundant taskManagers if no job is running.
// 如果没有job在运行,释放taskmanager
releaseTaskExecutors(timedOutTaskManagers,timedOutTaskManagers.size());
} else if (slotsDiff > 0) {
// Keep enoughredundant taskManagers from time to time.
// 保证随时有足够的taskmanager
intrequiredTaskManagers = MathUtils.divideRoundUp(slotsDiff, numSlotsPerWorker);
allocateRedundantTaskManagers(requiredTaskManagers);
} else {
// second we triggerthe release resource callback which can decide upon the resource release
int maxReleaseNum =(-slotsDiff) / numSlotsPerWorker;
releaseTaskExecutors(timedOutTaskManagers,Math.min(maxReleaseNum, timedOutTaskManagers.size()));
}
}
}
8、JobManager申请Slot
启动 SlotPool
接6,JobMaster启动时,启动SlotPool,向ResourceManager注册
private void startJobMasterServices() throwsException {
// 启动心跳服务
startHeartbeatServices();
// 启动slotPool
slotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor());
// 连接到之前已知的ResourceManager
reconnectToResourceManager(newFlinkException("Starting JobMaster component."));
// 启动后slotpool开始向slot manager请求slot
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
}
向 ResourceManager 注册
经过下面层层调用:
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
-> notifyOfNewResourceManagerLeader()
-> notifyOfNewResourceManagerLeader()
-> reconnectToResourceManager()
-> tryConnectToResourceManager()
-> connectToResourceManager()
private void connectToResourceManager() {
... ...
resourceManagerConnection = new ResourceManagerConnection(
log,
jobGraph.getJobID(),
resourceId,
getAddress(),
getFencingToken(),
resourceManagerAddress.getAddress(),
resourceManagerAddress.getResourceManagerId(),
scheduledExecutorService);
resourceManagerConnection.start();
}
RegisteredRpcConnection.java
public void start() {
... ...
final RetryingRegistration<F, G,S> newRegistration = createNewRegistration();
if (REGISTRATION_UPDATER.compareAndSet(this,null, newRegistration)) {
newRegistration.startRegistration();
} else {
// concurrent start operation
newRegistration.cancel();
}
}
privateRetryingRegistration<F, G, S> createNewRegistration() {
RetryingRegistration<F, G, S>newRegistration = checkNotNull(generateRegistration());
... ...
}
JobMaster.java的内部类ResourceManagerConnection
protected RetryingRegistration<ResourceManagerId, ResourceManagerGateway,JobMasterRegistrationSuccess> generateRegistration() {
return newRetryingRegistration<ResourceManagerId, ResourceManagerGateway,JobMasterRegistrationSuccess>(
log,
getRpcService(),
"ResourceManager",
ResourceManagerGateway.class,
getTargetAddress(),
getTargetLeaderId(),
jobMasterConfiguration.getRetryingRegistrationConfiguration()){
@Override
protectedCompletableFuture<RegistrationResponse> invokeRegistration(
ResourceManagerGatewaygateway, ResourceManagerId fencingToken, long timeoutMillis) {
Time timeout =Time.milliseconds(timeoutMillis);
return gateway.registerJobManager(
jobMasterId,
jobManagerResourceID,
jobManagerRpcAddress,
jobID,
timeout);
}
};
}
SlotPool 申请 slot
注册成功调用onRegistrationSuccess(),向ResourceManager进行slot的申请:
JobMaster.java的内部类ResourceManagerConnection
protected void onRegistrationSuccess(finalJobMasterRegistrationSuccess success) {
runAsync(() -> {
// filter out outdatedconnections
//noinspection ObjectEquality
if (this ==resourceManagerConnection) {
establishResourceManagerConnection(success);
}
});
}
private void establishResourceManagerConnection(finalJobMasterRegistrationSuccess success) {
... ...
slotPool.connectToResourceManager(resourceManagerGateway);
... ...
}
SlotPoolImpl.java
public void connectToResourceManager(@NonnullResourceManagerGateway resourceManagerGateway) {
this.resourceManagerGateway =checkNotNull(resourceManagerGateway);
// work on all slots waiting for thisconnection
for (PendingRequest pendingRequest :waitingForResourceManager.values()) {
// 向ResourceManager申请slot
requestSlotFromResourceManager(resourceManagerGateway,pendingRequest);
}
// all sent off
waitingForResourceManager.clear();
}
private void requestSlotFromResourceManager(
final ResourceManagerGatewayresourceManagerGateway,
final PendingRequestpendingRequest) {
... ...
CompletableFuture<Acknowledge>rmResponse = resourceManagerGateway.requestSlot(
jobMasterId,
new SlotRequest(jobId,allocationId, pendingRequest.getResourceProfile(), jobManagerAddress),
rpcTimeout);
... ...
}
ResourceManager.java:由ResourceManager里的SlotManager处理请求
public CompletableFuture<Acknowledge> requestSlot(
JobMasterId jobMasterId,
SlotRequest slotRequest,
final Time timeout) {
... ...
try {
// SlotManager处理slot请求
slotManager.registerSlotRequest(slotRequest);
}
... ...
}
SlotManagerImpl.java
public boolean registerSlotRequest(SlotRequest slotRequest)throws ResourceManagerException {
checkInit();
... ...
PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest);
pendingSlotRequests.put(slotRequest.getAllocationId(),pendingSlotRequest);
try {
internalRequestSlot(pendingSlotRequest);
}
... ...
}
private void internalRequestSlot(PendingSlotRequestpendingSlotRequest) throws ResourceManagerException {
final ResourceProfile resourceProfile =pendingSlotRequest.getResourceProfile();
OptionalConsumer.of(findMatchingSlot(resourceProfile))
.ifPresent(taskManagerSlot-> allocateSlot(taskManagerSlot, pendingSlotRequest))
.ifNotPresent(() -> fulfillPendingSlotRequestWithPendingTaskManagerSlot(pendingSlotRequest));
}
private void fulfillPendingSlotRequestWithPendingTaskManagerSlot(PendingSlotRequestpendingSlotRequest) throws ResourceManagerException {
... ...
if (!pendingTaskManagerSlotOptional.isPresent()){
pendingTaskManagerSlotOptional= allocateResource(resourceProfile);
}
... ...
}
9、ResourceManager申请资源
ResourceManager.java
public boolean allocateResource(WorkerResourceSpec workerResourceSpec){
validateRunsInMainThread();
return startNewWorker(workerResourceSpec);
}
ActiveResourceManager.java
public boolean startNewWorker(WorkerResourceSpecworkerResourceSpec) {
requestNewWorker(workerResourceSpec);
return true;
}
private void requestNewWorker(WorkerResourceSpecworkerResourceSpec) {
// 从配置中获取taskexecutor配置
final TaskExecutorProcessSpectaskExecutorProcessSpec =
TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig,workerResourceSpec);
... ...
// 申请资源
CompletableFuture<WorkerType> requestResourceFuture = resourceManagerDriver.requestResource(taskExecutorProcessSpec);
... ...
}
YarnResourceManagerDriver.java
public CompletableFuture<YarnWorkerNode> requestResource(TaskExecutorProcessSpec taskExecutorProcessSpec) {
checkInitialized();
final CompletableFuture<YarnWorkerNode> requestResourceFuture = newCompletableFuture<>();
final Optional<TaskExecutorProcessSpecContainerResourcePriorityAdapter.PriorityAndResource>priorityAndResourceOpt =
taskExecutorProcessSpecContainerResourcePriorityAdapter.getPriorityAndResource(taskExecutorProcessSpec);
if(!priorityAndResourceOpt.isPresent()) {
requestResourceFuture.completeExceptionally(
newResourceManagerException(
String.format("Couldnot compute the container Resource from the given TaskExecutorProcessSpec %s." +
"Thisusually indicates the requested resource is larger than Yarn's max containerresource limit.",
taskExecutorProcessSpec)));
} else {
final Priority priority =priorityAndResourceOpt.get().getPriority();
final Resource resource =priorityAndResourceOpt.get().getResource();
resourceManagerClient.addContainerRequest(getContainerRequest(resource, priority));
// make sure we transmit therequest fast and receive fast news of granted allocations
resourceManagerClient.setHeartbeatInterval(containerRequestHeartbeatIntervalMillis);
requestResourceFutures.computeIfAbsent(taskExecutorProcessSpec,ignore -> new LinkedList<>()).add(requestResourceFuture);
log.info("Requesting newTaskExecutor container with resource {}, priority {}.",taskExecutorProcessSpec, priority);
}
return requestResourceFuture;
}
10、TaskManager启动
YarnTaskExecutorRunner.java
public static void main(String[] args) {
EnvironmentInformation.logEnvironmentInfo(LOG,"YARN TaskExecutor runner", args);
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
runTaskManagerSecurely(args);
}
private static void runTaskManagerSecurely(String[] args) {
try {
LOG.debug("Allenvironment variables: {}", ENV);
final String currDir =ENV.get(Environment.PWD.key());
LOG.info("Current workingDirectory: {}", currDir);
final Configurationconfiguration = TaskManagerRunner.loadConfiguration(args);
setupAndModifyConfiguration(configuration,currDir, ENV);
TaskManagerRunner.runTaskManagerSecurely(configuration);
}
catch (Throwable t) {
final ThrowablestrippedThrowable = ExceptionUtils.stripException(t,UndeclaredThrowableException.class);
// make sure that everythingwhatever ends up in the log
LOG.error("YARN TaskManagerinitialization failed.", strippedThrowable);
System.exit(INIT_ERROR_EXIT_CODE);
}
}
TaskManagerRunner.java
public void start() throws Exception {
taskExecutorService.start();
}
TaskExecutorToServiceAdapter.java
public void start() {
taskExecutor.start();
}
TaskExecutor.java
public void onStart() throws Exception {
try {
startTaskExecutorServices();
} catch (Throwable t) {
final TaskManagerExceptionexception = new TaskManagerException(String.format("Could not start theTaskExecutor %s", getAddress()), t);
onFatalError(exception);
throw exception;
}
startRegistrationTimeout();
}
11、向ResourceManager注册
TaskExecutor.java
private void startTaskExecutorServices() throwsException {
try {
// start by connecting to theResourceManager
resourceManagerLeaderRetriever.start(newResourceManagerLeaderListener());
// tell the task slot tablewho's responsible for the task slot actions
taskSlotTable.start(newSlotActionsImpl(), getMainThreadExecutor());
// start the job leaderservice
jobLeaderService.start(getAddress(),getRpcService(), haServices, new JobLeaderListenerImpl());
fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(),blobCacheService.getPermanentBlobService());
} catch (Exception e) {
handleStartTaskExecutorServicesException(e);
}
}
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
-> notifyOfNewResourceManagerLeader()
-> TaskExecutor的notifyOfNewResourceManagerLeader()
-> TaskExecutor的reconnectToResourceManager()
-> TaskExecutor的tryConnectToResourceManager()
-> TaskExecutor的connectToResourceManager()
-> TaskExecutor的resourceManagerConnection.start()
执行 createNewRegistration()->generateRegistration()
TaskExecutorToResourceManagerConnection.java
protected RetryingRegistration<ResourceManagerId, ResourceManagerGateway,TaskExecutorRegistrationSuccess> generateRegistration() {
return newTaskExecutorToResourceManagerConnection.ResourceManagerRegistration(
log,
rpcService,
getTargetAddress(),
getTargetLeaderId(),
retryingRegistrationConfiguration,
taskExecutorRegistration);
}
开始注册newRegistration. startRegistration()会调用invokeRegistration():
TaskExecutorToResourceManagerConnection.java的内部类ResourceManagerRegistration
private static class ResourceManagerRegistration
extendsRetryingRegistration<ResourceManagerId, ResourceManagerGateway,TaskExecutorRegistrationSuccess> {
private final TaskExecutorRegistration taskExecutorRegistration;
ResourceManagerRegistration(
Logger log,
RpcService rpcService,
String targetAddress,
ResourceManagerIdresourceManagerId,
RetryingRegistrationConfigurationretryingRegistrationConfiguration,
TaskExecutorRegistrationtaskExecutorRegistration) {
super(log, rpcService,"ResourceManager", ResourceManagerGateway.class, targetAddress,resourceManagerId, retryingRegistrationConfiguration);
this.taskExecutorRegistration= taskExecutorRegistration;
}
@Override
protectedCompletableFuture<RegistrationResponse> invokeRegistration(
ResourceManagerGatewayresourceManager, ResourceManagerId fencingToken, long timeoutMillis) throwsException {
Time timeout =Time.milliseconds(timeoutMillis);
return resourceManager.registerTaskExecutor(
taskExecutorRegistration,
timeout);
}
}
注册成功调用onRegistrationSuccess
protected void onRegistrationSuccess(TaskExecutorRegistrationSuccesssuccess) {
log.info("Successful registrationat resource manager {} under registration id {}.",
getTargetAddress(),success.getRegistrationId());
registrationListener.onRegistrationSuccess(this, success);
}
TaskExecutor.java的内部类ResourceManagerRegistrationListener
public void onRegistrationSuccess(TaskExecutorToResourceManagerConnectionconnection, TaskExecutorRegistrationSuccess success) {
final ResourceID resourceManagerId =success.getResourceManagerId();
final InstanceIDtaskExecutorRegistrationId = success.getRegistrationId();
final ClusterInformationclusterInformation = success.getClusterInformation();
final ResourceManagerGatewayresourceManagerGateway = connection.getTargetGateway();
runAsync(
() -> {
// filter out outdatedconnections
//noinspectionObjectEquality
if(resourceManagerConnection == connection) {
try {
establishResourceManagerConnection(
resourceManagerGateway,
resourceManagerId,
taskExecutorRegistrationId,
clusterInformation);
} catch(Throwable t) {
log.error("EstablishingResource Manager connection in Task Executor failed", t);
}
}
});
}
private void establishResourceManagerConnection(
ResourceManagerGateway resourceManagerGateway,
ResourceID resourceManagerResourceId,
InstanceID taskExecutorRegistrationId,
ClusterInformation clusterInformation) {
// 向ResourceManager注册slot
finalCompletableFuture<Acknowledge> slotReportResponseFuture =resourceManagerGateway.sendSlotReport(
getResourceID(),
taskExecutorRegistrationId,
taskSlotTable.createSlotReport(getResourceID()),
taskManagerConfiguration.getTimeout());
... ...
}
ResourceManager.java
public CompletableFuture<Acknowledge> sendSlotReport(ResourceIDtaskManagerResourceId, InstanceID taskManagerRegistrationId, SlotReportslotReport, Time timeout) {
final WorkerRegistration<WorkerType> workerTypeWorkerRegistration =taskExecutors.get(taskManagerResourceId);
if(workerTypeWorkerRegistration.getInstanceID().equals(taskManagerRegistrationId)){
if (slotManager.registerTaskManager(workerTypeWorkerRegistration,slotReport)) {
onWorkerRegistered(workerTypeWorkerRegistration.getWorker());
}
returnCompletableFuture.completedFuture(Acknowledge.get());
} else {
return FutureUtils.completedExceptionally(newResourceManagerException(String.format("Unknown TaskManager registrationid %s.", taskManagerRegistrationId)));
}
}
SlotManagerImpl.java
public boolean registerTaskManager(final TaskExecutorConnectiontaskExecutorConnection, SlotReport initialSlotReport) {
checkInit();
LOG.debug("Registering TaskManager{} under {} at the SlotManager.",taskExecutorConnection.getResourceID().getStringWithMetadata(),taskExecutorConnection.getInstanceID());
// we identify task managers by theirinstance id
// 通过实例id判断某个taskmanager是否已经注册过
if(taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())){
// 报告已注册过的taskmanager的slot分配情况,更新slot情况
reportSlotStatus(taskExecutorConnection.getInstanceID(),initialSlotReport);
return false;
} else {
if(isMaxSlotNumExceededAfterRegistration(initialSlotReport)) {
LOG.info("Thetotal number of slots exceeds the max limitation {}, release the excess resource.",maxSlotNum);
resourceActions.releaseResource(taskExecutorConnection.getInstanceID(),new FlinkException("The total number of slots exceeds the maxlimitation."));
return false;
}
// first register theTaskManager
ArrayList<SlotID> reportedSlots= new ArrayList<>();
for (SlotStatus slotStatus :initialSlotReport) {
reportedSlots.add(slotStatus.getSlotID());
}
TaskManagerRegistrationtaskManagerRegistration = new TaskManagerRegistration(
taskExecutorConnection,
reportedSlots);
taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(),taskManagerRegistration);
// next register the new slots
for (SlotStatus slotStatus :initialSlotReport) {
// 注册新的slot,根据slot请求进行分配
registerSlot(
slotStatus.getSlotID(),
slotStatus.getAllocationID(),
slotStatus.getJobID(),
slotStatus.getResourceProfile(),
taskExecutorConnection);
}
return true;
}
}
12、ResourceManager分配Slot
SlotManagerImpl.java
private void registerSlot(
SlotID slotId,
AllocationID allocationId,
JobID jobId,
ResourceProfileresourceProfile,
TaskExecutorConnectiontaskManagerConnection) {
if (slots.containsKey(slotId)) {
// remove the old slot first
// 移除旧slot
removeSlot(
slotId,
newSlotManagerException(
String.format(
"Re-registrationof slot %s. This indicates that the TaskExecutor has re-connected.",
slotId)));
}
// 创建和注册TaskManager的slot
final TaskManagerSlot slot = createAndRegisterTaskManagerSlot(slotId,resourceProfile, taskManagerConnection);
final PendingTaskManagerSlotpendingTaskManagerSlot;
if (allocationId == null) {
pendingTaskManagerSlot = findExactlyMatchingPendingTaskManagerSlot(resourceProfile);
} else {
pendingTaskManagerSlot = null;
}
if (pendingTaskManagerSlot == null) {
updateSlot(slotId,allocationId, jobId);
} else {
pendingSlots.remove(pendingTaskManagerSlot.getTaskManagerSlotId());
final PendingSlotRequestassignedPendingSlotRequest =pendingTaskManagerSlot.getAssignedPendingSlotRequest();
// 分配slot给请求
if (assignedPendingSlotRequest== null) {
handleFreeSlot(slot);
} else {
assignedPendingSlotRequest.unassignPendingTaskManagerSlot();
allocateSlot(slot,assignedPendingSlotRequest);
}
}
}
private void allocateSlot(TaskManagerSlottaskManagerSlot, PendingSlotRequest pendingSlotRequest) {
... ...
taskManagerRegistration.markUsed();
// RPC call to the task manager
CompletableFuture<Acknowledge>requestFuture = gateway.requestSlot(
slotId,
pendingSlotRequest.getJobId(),
allocationId,
pendingSlotRequest.getResourceProfile(),
pendingSlotRequest.getTargetAddress(),
resourceManagerId,
taskManagerRequestTimeout);
... ...
}
13、TaskManager提供Slot
TaskExecutor.java
public CompletableFuture<Acknowledge> requestSlot(
final SlotID slotId,
final JobID jobId,
final AllocationID allocationId,
final ResourceProfile resourceProfile,
final String targetAddress,
final ResourceManagerIdresourceManagerId,
final Time timeout) {
... ...
try {
// 分配taskmanager上的slot
allocateSlot(
slotId,
jobId,
allocationId,
resourceProfile);
} catch (SlotAllocationException sae) {
returnFutureUtils.completedExceptionally(sae);
}
final JobTable.Job job;
try {
job =jobTable.getOrCreateJob(jobId, () -> registerNewJobAndCreateServices(jobId,targetAddress));
} catch (Exception e) {
// free the allocated slot
try {
taskSlotTable.freeSlot(allocationId);
} catch (SlotNotFoundExceptionslotNotFoundException) {
// slot no longerexistent, this should actually never happen, because we've
// just allocated theslot. So let's fail hard in this case!
onFatalError(slotNotFoundException);
}
// release local state underthe allocation id.
localStateStoresManager.releaseLocalStateForAllocationId(allocationId);
// sanity check
if(!taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
onFatalError(newException("Could not free slot " + slotId));
}
returnFutureUtils.completedExceptionally(new SlotAllocationException("Could notcreate new job.", e));
}
if (job.isConnected()) {
// 连接上job,提供slot给JobManager
offerSlotsToJobManager(jobId);
}
return CompletableFuture.completedFuture(Acknowledge.get());
}
private void internalOfferSlotsToJobManager(JobTable.ConnectionjobManagerConnection) {
final JobID jobId =jobManagerConnection.getJobId();
if(taskSlotTable.hasAllocatedSlots(jobId)) {
log.info("Offer reservedslots to the leader of job {}.", jobId);
final JobMasterGatewayjobMasterGateway = jobManagerConnection.getJobManagerGateway();
finalIterator<TaskSlot<Task>> reservedSlotsIterator =taskSlotTable.getAllocatedSlots(jobId);
final JobMasterId jobMasterId= jobManagerConnection.getJobMasterId();
finalCollection<SlotOffer> reservedSlots = new HashSet<>(2);
while(reservedSlotsIterator.hasNext()) {
SlotOffer offer =reservedSlotsIterator.next().generateSlotOffer();
reservedSlots.add(offer);
}
CompletableFuture<Collection<SlotOffer>>acceptedSlotsFuture = jobMasterGateway.offerSlots(
getResourceID(),
reservedSlots,
taskManagerConfiguration.getTimeout());
acceptedSlotsFuture.whenCompleteAsync(
handleAcceptedSlotOffers(jobId,jobMasterGateway, jobMasterId, reservedSlots),
getMainThreadExecutor());
} else {
log.debug("There are nounassigned slots for the job {}.", jobId);
}
}
JobMaster.java
public CompletableFuture<Collection<SlotOffer>> offerSlots(
final ResourceIDtaskManagerId,
final Collection<SlotOffer>slots,
final Time timeout) {
Tuple2<TaskManagerLocation,TaskExecutorGateway> taskManager =registeredTaskManagers.get(taskManagerId);
if (taskManager == null) {
returnFutureUtils.completedExceptionally(new Exception("Unknown TaskManager" + taskManagerId));
}
final TaskManagerLocationtaskManagerLocation = taskManager.f0;
final TaskExecutorGatewaytaskExecutorGateway = taskManager.f1;
final RpcTaskManagerGatewayrpcTaskManagerGateway = new RpcTaskManagerGateway(taskExecutorGateway,getFencingToken());
return CompletableFuture.completedFuture(
slotPool.offerSlots(
taskManagerLocation,
rpcTaskManagerGateway,
slots));
}
SlotPoolImpl.java
public Collection<SlotOffer> offerSlots(
TaskManagerLocation taskManagerLocation,
TaskManagerGateway taskManagerGateway,
Collection<SlotOffer>offers) {
ArrayList<SlotOffer> result = newArrayList<>(offers.size());
for (SlotOffer offer : offers) {
if (offerSlot(
taskManagerLocation,
taskManagerGateway,
offer)) {
result.add(offer);
}
}
return result;
}
boolean offerSlot(
final TaskManagerLocationtaskManagerLocation,
final TaskManagerGatewaytaskManagerGateway,
final SlotOffer slotOffer) {
... ...
// use the slot to fulfill pendingrequest, in requested order
// 按照请求顺序,使用slot来完成挂起的请求
tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
// we accepted the request in any case.slot will be released after it idled for
// too long and timed out
return true;
}
往期精彩内容:
用flink能替代spark的批处理功能吗
Flink进阶之滑动窗口统计实时热门商品
Flink进阶之使用CEP实现恶意登陆检测
重磅!Flink源码解析环境准备及提交流程之环境准备
大咖分享 | 通过制作一个迷你版Flink来学习Flink源码
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/6309.html