java提交flink任务_flink提交任务参数

java提交flink任务_flink提交任务参数Flink Yarn-per-job模式提交流程如图所示:1、启动Yarn客户端AbstractJobClusterExecutor.

大家好,欢迎来到IT知识分享网。

Flink Yarn-per-job模式提交流程如图所示:

java提交flink任务_flink提交任务参数

1、启动Yarn客户端

AbstractJobClusterExecutor.java

public CompletableFuture<JobClient> execute(@Nonnull finalPipeline pipeline, @Nonnull final Configuration configuration) throws Exception{
         final JobGraph jobGraph =ExecutorUtils.getJobGraph(pipeline, configuration);
         // 创建并启动yarn客户端
         try (finalClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)){
                  final ExecutionConfigAccessorconfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
                  // 获取集群配置参数
                  final ClusterSpecificationclusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
                  // 部署集群
                  final ClusterClientProvider<ClusterID> clusterClientProvider =clusterDescriptor
                                   .deployJobCluster(clusterSpecification,jobGraph, configAccessor.getDetachedMode());
                  LOG.info("Job has beensubmitted with JobID " + jobGraph.getJobID());
 
                  return CompletableFuture.completedFuture(
                                   newClusterClientJobClientAdapter<>(clusterClientProvider,jobGraph.getJobID()));
         }
}

IT知识分享网

YarnClusterClientFactory.java

IT知识分享网public YarnClusterDescriptor createClusterDescriptor(Configuration configuration){
... ...
         return getClusterDescriptor(configuration);
}
 
private YarnClusterDescriptor getClusterDescriptor(Configurationconfiguration) {
         final YarnClient yarnClient = YarnClient.createYarnClient();
         final YarnConfigurationyarnConfiguration = new YarnConfiguration();
 
         yarnClient.init(yarnConfiguration);
         yarnClient.start();
 
         return new YarnClusterDescriptor(
                          configuration,
                          yarnConfiguration,
                          yarnClient,
                          YarnClientYarnClusterInformationRetriever.create(yarnClient),
                          false);
}

2、获取集群配置参数

AbstractContainerizedClusterClientFactory.java

public ClusterSpecification getClusterSpecification(Configurationconfiguration) {
... ...
         final int jobManagerMemoryMB =JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
                  configuration,
                  JobManagerOptions.TOTAL_PROCESS_MEMORY)
         .getTotalProcessMemorySize()
         .getMebiBytes();
 
         final int taskManagerMemoryMB = TaskExecutorProcessUtils
.processSpecFromConfig(TaskExecutorProcessUtils.getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption(
                          configuration,TaskManagerOptions.TOTAL_PROCESS_MEMORY))
                  .getTotalProcessMemorySize()
                  .getMebiBytes();
 
         int slotsPerTaskManager =configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
 
         return newClusterSpecification.ClusterSpecificationBuilder()
                  .setMasterMemoryMB(jobManagerMemoryMB)
                  .setTaskManagerMemoryMB(taskManagerMemoryMB)
                  .setSlotsPerTaskManager(slotsPerTaskManager)
                  .createClusterSpecification();
}

3、部署集群

YarnClusterDescriptor.java

IT知识分享网public ClusterClientProvider<ApplicationId> deployJobCluster(
         ClusterSpecification clusterSpecification,
         JobGraph jobGraph,
         boolean detached) throws ClusterDeploymentException{
         try {
                  return deployInternal(
                          clusterSpecification,
                          "Flink per-jobcluster",
                          getYarnJobClusterEntrypoint(),  //获取YarnJobClusterEntrypoint,启动AM的入口
                          jobGraph,
                          detached);
         } catch (Exception e) {
                  throw new ClusterDeploymentException("Couldnot deploy Yarn job cluster.", e);
         }
}

上传 jar 包和配置文件到 HDFS

YarnClusterDescriptor.java

private ClusterClientProvider<ApplicationId> deployInternal(
                  ClusterSpecificationclusterSpecification,
                  String applicationName,
                  String yarnClusterEntrypoint,
                  @Nullable JobGraph jobGraph,
                  boolean detached) throwsException {
... ...
// 创建应用
         final YarnClientApplicationyarnApplication = yarnClient.createApplication();
... ...
         ApplicationReport report = startAppMaster(
                          flinkConfiguration,
                          applicationName,
                          yarnClusterEntrypoint,
                          jobGraph,
                          yarnClient,
                          yarnApplication,
                          validClusterSpecification);
... ...
}
private ApplicationReport startAppMaster(
                  Configuration configuration,
                  String applicationName,
                  String yarnClusterEntrypoint,
                  JobGraph jobGraph,
                  YarnClient yarnClient,
                  YarnClientApplicationyarnApplication,
                  ClusterSpecificationclusterSpecification) throws Exception {
... ...
         // 初始化文件系统(HDFS)
         final FileSystem fs = FileSystem.get(yarnConfiguration);
... ...
ApplicationSubmissionContextappContext =yarnApplication.getApplicationSubmissionContext();
 
final List<Path> providedLibDirs = getRemoteSharedPaths(configuration);
// 上传文件的工具类
final YarnApplicationFileUploader fileUploader=YarnApplicationFileUploader.from(
         fs,
         fs.getHomeDirectory(),
         providedLibDirs,
         appContext.getApplicationId(),
         getFileReplication());
... ...
         final ApplicationId appId =appContext.getApplicationId();
... ...
         if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)){
                  // yarn重试次数,默认2
                  appContext.setMaxAppAttempts(
                                   configuration.getInteger(
                                                     YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
                                                     YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
 
                  activateHighAvailabilitySupport(appContext);
         } else {
                  //不是高可用重试次数为1
                  appContext.setMaxAppAttempts(
                                   configuration.getInteger(
                                                     YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
                                                     1));
         }
... ...
        
         // 多次调用上传HDFS的方法,分别是:
         // => systemShipFiles:日志的配置文件、lib/目录下除了dist的jar包
         // => shipOnlyFiles:plugins/目录下的文件
         // => userJarFiles:用户代码的jar包
fileUploader.registerMultipleLocalResources (... ...);
... ...
         // 上传和配置ApplicationMaster的jar包:flink-dist*.jar
         final YarnLocalResourceDescriptorlocalResourceDescFlinkJar = fileUploader.uploadFlinkDist(flinkJarPath);
... ...
//
fileUploader.registerSingleLocalResource(
                                            jobGraphFilename,
                                            newPath(tmpJobGraphFile.toURI()),
                                            "",
                                            true,
                                            false);
... ...
         // 上传flink配置文件
         String flinkConfigKey ="flink-conf.yaml";
         Path remotePathConf = setupSingleLocalResource(
                  flinkConfigKey,
                  fs,
                  appId,
                  newPath(tmpConfigurationFile.getAbsolutePath()),
                  localResources,
                  homeDir,
                  "");
... ...
         // 将JobGraph写入tmp文件并添加到本地资源,并上传到HDFS
         fileUploader.registerSingleLocalResource(
                  jobGraphFilename,
                  newPath(tmpJobGraphFile.toURI()),
                  "",
                  true,
                  false);
... ...
// 上传flink配置文件
String flinkConfigKey = "flink-conf.yaml";
fileUploader.registerSingleLocalResource(
         flinkConfigKey,
         newPath(tmpConfigurationFile.getAbsolutePath()),
         "",
         true,
         true);
... ...
final JobManagerProcessSpec processSpec =JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
                          flinkConfiguration,
                          JobManagerOptions.TOTAL_PROCESS_MEMORY);
         //封装启动AM container的Java命令
         final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
                          yarnClusterEntrypoint,
                          hasKrb5,
                          processSpec);
... ... 
appContext.setApplicationName(customApplicationName);
appContext.setApplicationType(applicationType!= null ? applicationType : "Apache Flink");
appContext.setAMContainerSpec(amContainer);
appContext.setResource(capability);
... ...
         yarnClient.submitApplication(appContext);
... ... 
}

封装 AM 参数和命令

YarnClusterDescriptor.java

ContainerLaunchContext setupApplicationMasterContainer(
                  String yarnClusterEntrypoint,
                  boolean hasKrb5,
                  JobManagerProcessSpec processSpec) {
        
         // respect custom JVM options in theYAML file
         String javaOpts =flinkConfiguration.getString(CoreOptions.FLINK_JVM_OPTIONS);
         if (flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS).length()> 0) {
                  javaOpts += " " +flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS);
         }
         //applicable only for YarnMiniClustersecure test run
         //krb5.conf file will be available aslocal resource in JM/TM container
         if (hasKrb5) {
                  javaOpts += "-Djava.security.krb5.conf=krb5.conf";
         }
 
         // 创建AM的容器启动上下文
         ContainerLaunchContext amContainer =Records.newRecord(ContainerLaunchContext.class);
 
         final Map<String, String> startCommandValues = new HashMap<>();
         startCommandValues.put("java","$JAVA_HOME/bin/java");
 
         String jvmHeapMem =JobManagerProcessUtils.generateJvmParametersStr(processSpec,flinkConfiguration);
         startCommandValues.put("jvmmem", jvmHeapMem);
 
         startCommandValues.put("jvmopts", javaOpts);
         startCommandValues.put("logging",YarnLogConfigUtil.getLoggingYarnCommand(flinkConfiguration));
 
         startCommandValues.put("class",yarnClusterEntrypoint);
         startCommandValues.put("redirects",
                  "1> " +ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.out " +
                  "2> " +ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.err");
         startCommandValues.put("args", "");
 
         final String commandTemplate =flinkConfiguration
                          .getString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
                                            ConfigConstants.DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE);
         final String amCommand =
                  BootstrapTools.getStartCommand(commandTemplate,startCommandValues);
 
         amContainer.setCommands(Collections.singletonList(amCommand));
 
         LOG.debug("Application Masterstart command: " + amCommand);
 
         return amContainer;
}

封装 AM 参数:

private ApplicationReport startAppMaster(
                  Configuration configuration,
                  String applicationName,
                  String yarnClusterEntrypoint,
                  JobGraph jobGraph,
                  YarnClient yarnClient,
                  YarnClientApplicationyarnApplication,
                  ClusterSpecificationclusterSpecification) throws Exception {
 
                  ... ...
                  final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
                                   yarnClusterEntrypoint,
                                   hasKrb5,
                                   processSpec);
                  ... ...
                  // 封装AM 的classpath和环境参数
                  final Map<String,String> appMasterEnv = new HashMap<>();
                  // set user specified appmaster environment variables
                  appMasterEnv.putAll(
                  ConfigurationUtils.getPrefixedKeyValuePairs(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX,configuration));
                  // set Flink app class path
                  appMasterEnv.put(YarnConfigKeys.ENV_FLINK_CLASSPATH,classPathBuilder.toString());
 
                  // set Flink on YARN internalconfiguration values
                  appMasterEnv.put(YarnConfigKeys.FLINK_DIST_JAR,localResourceDescFlinkJar.toString());
                  appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString());
                  appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR,fileUploader.getHomeDir().toString());
                  appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, encodeYarnLocalResourceDescriptorListToString(fileUploader.getEnvShipResourceList()));
                  appMasterEnv.put(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE, getZookeeperNamespace());
                  appMasterEnv.put(YarnConfigKeys.FLINK_YARN_FILES,fileUploader.getApplicationDir().toUri().toString());
 
                  //https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_name
                  appMasterEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME,UserGroupInformation.getCurrentUser().getUserName());
 
                  if (localizedKeytabPath !=null) {
                          appMasterEnv.put(YarnConfigKeys.LOCAL_KEYTAB_PATH, localizedKeytabPath);
                          String principal =configuration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
                          appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal);
                          if (remotePathKeytab!= null) {
                                   appMasterEnv.put(YarnConfigKeys.REMOTE_KEYTAB_PATH,remotePathKeytab.toString());
                          }
                  }
 
                  //To support Yarn SecureIntegration Test Scenario
                  if (remoteYarnSiteXmlPath !=null) {
                          appMasterEnv.put(YarnConfigKeys.ENV_YARN_SITE_XML_PATH,remoteYarnSiteXmlPath.toString());
                  }
                  if (remoteKrb5Path != null) {
                          appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, remoteKrb5Path.toString());
                  }
 
                  // set classpath from YARNconfiguration
                  Utils.setupYarnClassPath(yarnConfiguration,appMasterEnv);           
                  //设置 AM 参数
                  amContainer.setEnvironment(appMasterEnv);
                  ... ...
                  yarnClient.submitApplication(appContext);
... ...  
}

4、提交应用

YarnClientImpl.java

public ApplicationId submitApplication(ApplicationSubmissionContextappContext) throws YarnException, IOException {
    ApplicationId applicationId =appContext.getApplicationId();
    ... ...
    SubmitApplicationRequest request =
       Records.newRecord(SubmitApplicationRequest.class);
   request.setApplicationSubmissionContext(appContext);
 
    //TODO: YARN-1763:Handle RM failoversduring the submitApplication call.
rmClient.submitApplication(request);
... ...
}

ApplicationClientProtocolPBClientImpl.java

public SubmitApplicationResponse submitApplication(
    SubmitApplicationRequest request) throwsYarnException,
IOException {
//取出报文
    SubmitApplicationRequestProto requestProto =
        ((SubmitApplicationRequestPBImpl)request).getProto();
    //将报文发送发送到服务端,并将返回结果构成response
    try {
      return newSubmitApplicationResponsePBImpl(proxy.submitApplication(null,
        requestProto));
    } catch (ServiceException e) {
      RPCUtil.unwrapAndThrowException(e);
      return null;
    }
}

ApplicationClientProtocolPBServiceImpl.java

public SubmitApplicationResponseProto submitApplication(RpcController arg0,
SubmitApplicationRequestProto proto) throws ServiceException {
//服务端重新构建报文
    SubmitApplicationRequestPBImpl request = newSubmitApplicationRequestPBImpl(proto);
    ......
 
    SubmitApplicationResponse response = real.submitApplication(request);
    return((SubmitApplicationResponsePBImpl)response).getProto();
    ......
}

ClientRMService.java

public SubmitApplicationResponse submitApplication(SubmitApplicationRequestrequest) throws YarnException {
         ... ...
         //将应用请求提交到Yarn上的RMAppManager去提交任务
         this.rmAppManager.submitApplication(submissionContext,System.currentTimeMillis(), user);
         ... ...
}

5、创建Dispatcher、ResourceManager

Per-job模式的AM container加载运行入口是YarnJobClusterEntryPoint中的main()方法

YarnJobClusterEntrypoint.java

public staticvoid main(String[] args) {
         ... ...
         Configuration configuration = YarnEntrypointUtils.loadConfiguration(workingDirectory,env);
 
         YarnJobClusterEntrypoint yarnJobClusterEntrypoint = newYarnJobClusterEntrypoint(configuration);
 
         ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);
}

ClusterEntrypoint.java

private void runCluster(Configuration configuration,PluginManager pluginManager) throws Exception {
         synchronized (lock) {
                  initializeServices(configuration,pluginManager);
                  ... ...
                  //1、创建dispatcher、ResourceManager对象的工厂类 
                  //       其中有从本地重新创建JobGraph的过程
                  finalDispatcherResourceManagerComponentFactory dispatcherResourceManagerComponentFactory= createDispatcherResourceManagerComponentFactory(configuration);
                  //2、通过工厂类创建dispatcher、ResourceManager对象
                  //   Entry 启动RpcService、HAService、BlobServer、HeartbeatServices、MetricRegistry、ExecutionGraphStore等
                  clusterComponent = dispatcherResourceManagerComponentFactory.create(
                          configuration,
                          ioExecutor,
                          commonRpcService,
                          haServices,
                          blobServer,
                          heartbeatServices,
                          metricRegistry,
                          archivedExecutionGraphStore,
                         newRpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
                          this);
                  ... ...
         }
}

DefaultDispatcherResourceManagerComponentFactory.java

public DispatcherResourceManagerComponent create(
                  Configuration configuration,
                  Executor ioExecutor,
                  RpcService rpcService,
                  HighAvailabilityServices highAvailabilityServices,
                  BlobServer blobServer,
                  HeartbeatServices heartbeatServices,
                  MetricRegistry metricRegistry,
                  ArchivedExecutionGraphStore archivedExecutionGraphStore,
                  MetricQueryServiceRetriever metricQueryServiceRetriever,
                  FatalErrorHandler fatalErrorHandler) throws Exception {
 
         LeaderRetrievalService dispatcherLeaderRetrievalService = null;
         LeaderRetrievalService resourceManagerRetrievalService = null;
         WebMonitorEndpoint<?> webMonitorEndpoint = null;
         ResourceManager<?> resourceManager = null;
         ResourceManagerMetricGroup resourceManagerMetricGroup = null;
         DispatcherRunner dispatcherRunner =null;
 
         try {
                  dispatcherLeaderRetrievalService =highAvailabilityServices.getDispatcherLeaderRetriever();
 
                  resourceManagerRetrievalService = highAvailabilityServices.getResourceManagerLeaderRetriever();
 
                  final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = newRpcGatewayRetriever<>(
                          rpcService,
                          DispatcherGateway.class,
                          DispatcherId::fromUuid,
                          10,
                          Time.milliseconds(50L));
 
                  final LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever = newRpcGatewayRetriever<>(
                          rpcService,
                          ResourceManagerGateway.class,
                          ResourceManagerId::fromUuid,
                          10,
                          Time.milliseconds(50L));
 
                  ... ...
                 
                  // 创建接收前端Rest请求的节点
                  webMonitorEndpoint =restEndpointFactory.createRestEndpoint(
                          configuration,
                          dispatcherGatewayRetriever,
                          resourceManagerGatewayRetriever,
                          blobServer,
                          executor,
                          metricFetcher,
                          highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
                          fatalErrorHandler);
 
                  log.debug("StartingDispatcher REST endpoint.");
                  webMonitorEndpoint.start();
 
                  ... ...
                  // 创建ResourceManager对象,返回的是new YarnResourceManager
        // 调度过程:AbstractDispatcherResourceManagerComponentFactory
        //                 -> ActiveResourceManagerFactory
        //                 -> YarnResourceManagerFactory
                  resourceManager = resourceManagerFactory.createResourceManager(
                          configuration,
                          ResourceID.generate(),
                          rpcService,
                          highAvailabilityServices,
                          heartbeatServices,
                          fatalErrorHandler,
                          newClusterInformation(hostname, blobServer.getPort()),
                          webMonitorEndpoint.getRestBaseUrl(),
                          resourceManagerMetricGroup);
 
                  ... ...
 
                  // 创建dispatcherRunner对象并启动
                  log.debug("StartingDispatcher.");
                  dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner(
                          highAvailabilityServices.getDispatcherLeaderElectionService(),
                          fatalErrorHandler,
                          newHaServicesJobGraphStoreFactory(highAvailabilityServices),
                          ioExecutor,
                          rpcService,
                          partialDispatcherServices);
 
                  // 启动ResourceManager
                  log.debug("StartingResourceManager.");
                  resourceManager.start();
 
                  resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
                  dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
 
                  return newDispatcherResourceManagerComponent(
                          dispatcherRunner,
                          resourceManager,
                          dispatcherLeaderRetrievalService,
                          resourceManagerRetrievalService,
                          webMonitorEndpoint);
 
         }
         ... ...
}

创建 YarnResourceManager

ResourceManagerFactory.java

public ResourceManager<T> createResourceManager(
                  Configuration configuration,
                  ResourceID resourceId,
                  RpcService rpcService,
                  HighAvailabilityServices highAvailabilityServices,
                  HeartbeatServices heartbeatServices,
                  FatalErrorHandler fatalErrorHandler,
                  ClusterInformation clusterInformation,
                  @Nullable String webInterfaceUrl,
                  MetricRegistry metricRegistry,
                  String hostname) throwsException {
 
         final ResourceManagerMetricGroup resourceManagerMetricGroup = ResourceManagerMetricGroup.create(metricRegistry,hostname);
         final SlotManagerMetricGroup slotManagerMetricGroup = SlotManagerMetricGroup.create(metricRegistry,hostname);
 
         final ResourceManagerRuntimeServices resourceManagerRuntimeServices = createResourceManagerRuntimeServices(
                  configuration, rpcService,highAvailabilityServices, slotManagerMetricGroup);
 
         return createResourceManager(
                  configuration,
                  resourceId,
                  rpcService,
                  highAvailabilityServices,
                  heartbeatServices,
                  fatalErrorHandler,
                  clusterInformation,
                  webInterfaceUrl,
                  resourceManagerMetricGroup,
                  resourceManagerRuntimeServices);
}

YarnResourceManagerFactory.java

public ResourceManager<YarnWorkerNode> createResourceManager(
                  Configuration configuration,
                  ResourceID resourceId,
                  RpcService rpcService,
                  HighAvailabilityServices highAvailabilityServices,
                  HeartbeatServices heartbeatServices,
                  FatalErrorHandler fatalErrorHandler,
                  ClusterInformation clusterInformation,
                  @Nullable String webInterfaceUrl,
                  ResourceManagerMetricGroup resourceManagerMetricGroup,
                  ResourceManagerRuntimeServices resourceManagerRuntimeServices) {
 
         return new YarnResourceManager(
                  rpcService,
                  resourceId,
                  configuration,
                  System.getenv(),
                  highAvailabilityServices,
                  heartbeatServices,
                  resourceManagerRuntimeServices.getSlotManager(),
                  ResourceManagerPartitionTrackerImpl::new,
                  resourceManagerRuntimeServices.getJobLeaderIdService(),
                  clusterInformation,
                  fatalErrorHandler,
                  webInterfaceUrl,
                  resourceManagerMetricGroup);
}

创建YarnResourceManager时,创建了SlotManager

ResourceManagerFactory.java

private ResourceManagerRuntimeServices createResourceManagerRuntimeServices(
                  Configuration configuration,
                  RpcService rpcService,
                  HighAvailabilityServices highAvailabilityServices,
                  SlotManagerMetricGroup slotManagerMetricGroup) throws ConfigurationException {
 
         return ResourceManagerRuntimeServices.fromConfiguration(
                  createResourceManagerRuntimeServicesConfiguration(configuration),
                  highAvailabilityServices,
                  rpcService.getScheduledExecutor(),
                  slotManagerMetricGroup);
}

ResourceManagerRuntimeServices.java

public static ResourceManagerRuntimeServices fromConfiguration(
                  ResourceManagerRuntimeServicesConfiguration configuration,
                  HighAvailabilityServices highAvailabilityServices,
                  ScheduledExecutor scheduledExecutor,
                  SlotManagerMetricGroup slotManagerMetricGroup) {
 
         final SlotManager slotManager = createSlotManager(configuration,scheduledExecutor, slotManagerMetricGroup);
 
         final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
                  highAvailabilityServices,
                  scheduledExecutor,
                  configuration.getJobTimeout());
 
         return new ResourceManagerRuntimeServices(slotManager, jobLeaderIdService);
}

创建并启动 Dispatcher

DefaultDispatcherRunnerFactory.java

public DispatcherRunner createDispatcherRunner(
                  LeaderElectionService leaderElectionService,
                  FatalErrorHandler fatalErrorHandler,
                  JobGraphStoreFactory jobGraphStoreFactory,
                  Executor ioExecutor,
                  RpcService rpcService,
                  PartialDispatcherServices partialDispatcherServices) throws Exception {
 
         final DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory =dispatcherLeaderProcessFactoryFactory.createFactory(
                  jobGraphStoreFactory,
                  ioExecutor,
                  rpcService,
                  partialDispatcherServices,
                  fatalErrorHandler);
 
         return DefaultDispatcherRunner.create(
                  leaderElectionService,
                  fatalErrorHandler,
                  dispatcherLeaderProcessFactory);
}

DefaultDispatcherRunner.java

public static DispatcherRunner create(
                  LeaderElectionService leaderElectionService,
                  FatalErrorHandler fatalErrorHandler,
                  DispatcherLeaderProcessFactory dispatcherLeaderProcessFactory) throws Exception {
         final DefaultDispatcherRunner dispatcherRunner = new DefaultDispatcherRunner(
                  leaderElectionService,
                  fatalErrorHandler,
                  dispatcherLeaderProcessFactory);
         return DispatcherRunnerLeaderElectionLifecycleManager.createFor(dispatcherRunner,leaderElectionService);
}

DispatcherRunnerLeaderElectionLifecycleManager.java

public static <T extends DispatcherRunner & LeaderContender> DispatcherRunner createFor(T dispatcherRunner,LeaderElectionService leaderElectionService) throws Exception {
         return new DispatcherRunnerLeaderElectionLifecycleManager<>(dispatcherRunner,leaderElectionService);
}
 
private DispatcherRunnerLeaderElectionLifecycleManager(TdispatcherRunner, LeaderElectionService leaderElectionService) throws Exception{
         this.dispatcherRunner =dispatcherRunner;
         this.leaderElectionService =leaderElectionService;
         // 启动dispacher的leader选举
         leaderElectionService.start(dispatcherRunner);
}

StandaloneLeaderElectionService.java

public void start(LeaderContender newContender) throws Exception {
         ... ...
         contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);
}

DefaultDispatcherRunner.java

public void grantLeadership(UUID leaderSessionID) {
         runActionIfRunning(() -> startNewDispatcherLeaderProcess(leaderSessionID));
}
private void startNewDispatcherLeaderProcess(UUIDleaderSessionID) {
         ... ...         previousDispatcherLeaderProcessTerminationFuture.thenRun(newDispatcherLeaderProcess::start));
}

AbstractDispatcherLeaderProcess.java

public final void start() {
         runIfStateIs(
                  State.CREATED,
                  this::startInternal);
}
private void startInternal() {
         log.info("Start {}.",getClass().getSimpleName());
         state = State.RUNNING;
         onStart();
}

JobDispatcherLeaderProcess.java

protected void onStart() {
         final DispatcherGatewayServicedispatcherService = dispatcherGatewayServiceFactory.create(
                  DispatcherId.fromUuid(getLeaderSessionId()),
                  Collections.singleton(jobGraph),
                  ThrowingJobGraphWriter.INSTANCE);
 
         completeDispatcherSetup(dispatcherService);
}
public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
                  DispatcherId fencingToken,
                  Collection<JobGraph> recoveredJobs,
                  JobGraphWriter jobGraphWriter){
         ... ...
         // 启动dispacher
         dispatcher.start();
         ... ...
}

启动 ResourceManager

DefaultDispatcherResourceManagerComponentFactory.java

public DispatcherResourceManagerComponent create(
                  Configuration configuration,
                  Executor ioExecutor,
                  RpcService rpcService,
                  HighAvailabilityServices highAvailabilityServices,
                  BlobServer blobServer,
                  HeartbeatServices heartbeatServices,
                  MetricRegistry metricRegistry,
                  ArchivedExecutionGraphStore archivedExecutionGraphStore,
                  MetricQueryServiceRetriever metricQueryServiceRetriever,
                  FatalErrorHandler fatalErrorHandler) throws Exception {
         ... ...
                  // 启动ResourceManager
                  log.debug("StartingResourceManager.");
                  resourceManager.start();
         ... ...
}

ResourceManager.java

public void onStart() throws Exception {
         ... ...
                  startResourceManagerServices();
         ... ...
}
private void startResourceManagerServices() throwsException {
         try {
                  leaderElectionService =highAvailabilityServices.getResourceManagerLeaderElectionService();
 
                  initialize();
 
                  leaderElectionService.start(this);
                  jobLeaderIdService.start(new JobLeaderIdActionsImpl());
 
                  registerTaskExecutorMetrics();
         } catch (Exception e) {
                  handleStartResourceManagerServicesException(e);
         }
}

6、Dispatcher启动JobManager

Dispatcher.java

public void onStart() throws Exception {
         try {
                  // 启动Dispatcher
                  startDispatcherServices();
         }
         ... ...
         // 启动Job
         startRecoveredJobs();
         ... ...
}

Dispatcher.java

CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraphjobGraph, long initializationTimestamp) {
         final RpcService rpcService =getRpcService();
         return CompletableFuture.supplyAsync(
                  () -> {
                          try {
                                   JobManagerRunnerrunner = jobManagerRunnerFactory.createJobManagerRunner(
                                            jobGraph,
                                            configuration,
                                            rpcService,
                                            highAvailabilityServices,
                                            heartbeatServices,
                                            jobManagerSharedServices,
                                            newDefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
                                            fatalErrorHandler,
                                            initializationTimestamp);
                                   // 启动 JobManagerRunner
                                   runner.start();
                                   return runner;
                          }
                  ......
}

JobManagerRunnerImpl.java

public void start() throws Exception {
         try {
                  leaderElectionService.start(this);
         } catch (Exception e) {
                  log.error("Could notstart the JobManager because the leader election service did not start.",e);
                  throw new Exception("Couldnot start the leader election service.", e);
         }
}

StandaloneLeaderElectionService.java

public void start(LeaderContendernewContender) throws Exception {
         ... ...
         contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);
}

JobManagerRunnerImpl.java

public void grantLeadership(final UUID leaderSessionID){
         synchronized (lock) {
                  if (shutdown) {
                          log.debug("JobManagerRunnercannot be granted leadership because it is already shut down.");
                          return;
                  }
 
                  leadershipOperation =leadershipOperation.thenCompose(
                          (ignored) -> {
                                   synchronized(lock) {
                                            // 校验作业的调度状态然后启动作业管理器
                                            returnverifyJobSchedulingStatusAndStartJobManager(leaderSessionID);
                                   }
                          });
 
                  handleException(leadershipOperation,"Could not start the job manager.");
         }
}
private CompletableFuture<Void> verifyJobSchedulingStatusAndStartJobManager(UUIDleaderSessionId) {
         final CompletableFuture<JobSchedulingStatus> jobSchedulingStatusFuture = getJobSchedulingStatus();
 
         return jobSchedulingStatusFuture.thenCompose(
                  jobSchedulingStatus -> {
                          if(jobSchedulingStatus == JobSchedulingStatus.DONE) {
                                   returnjobAlreadyDone();
                          } else {
                                   return startJobMaster(leaderSessionId);
                          }
                  });
}
private CompletionStage<Void> startJobMaster(UUIDleaderSessionId) {
         ... ...
                  startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));
         ... ...
}

JobMaster.java

public CompletableFuture<Acknowledge> start(finalJobMasterId newJobMasterId) throws Exception {
         // make sure we receive RPC and asynccalls
         start();
 
         return callAsyncWithoutFencing(() ->startJobExecution(newJobMasterId),RpcUtils.INF_TIMEOUT);
}
private Acknowledge startJobExecution(JobMasterId newJobMasterId)throws Exception {
... ...
// 启动JobMaster
         startJobMasterServices();
 
         log.info("Starting execution ofjob {} ({}) under job master id {}.", jobGraph.getName(),jobGraph.getJobID(), newJobMasterId);
         // 重置开始调度
         resetAndStartScheduler();
... ...
}

7、ResourceManager启动SlotManager

ResourceManager.java

public final void onStart() throws Exception {
         ... ...
                  startResourceManagerServices();
         ... ...
}
private void startResourceManagerServices() throwsException {
         try {
                  leaderElectionService =highAvailabilityServices.getResourceManagerLeaderElectionService();
 
                  initialize();
 
                  leaderElectionService.start(this);
                  jobLeaderIdService.start(newJobLeaderIdActionsImpl());
 
                  registerTaskExecutorMetrics();
         } catch (Exception e) {
                  handleStartResourceManagerServicesException(e);
         }
}

创建 Yarn 的 RM 和 NM 客户端

ActiveResourceManager.java

protected void initialize() throws ResourceManagerException{
         try {
                  resourceManagerDriver.initialize(
                                   this,
                                   newGatewayMainThreadExecutor(),
                                   ioExecutor);
         } catch (Exception e) {
                  throw newResourceManagerException("Cannot initialize resource provider.", e);
         }
}

AbstractResourceManagerDriver.java

public final void initialize(
                  ResourceEventHandler<WorkerType> resourceEventHandler,
                  ScheduledExecutor mainThreadExecutor,
                  Executor ioExecutor) throwsException {
         this.resourceEventHandler =Preconditions.checkNotNull(resourceEventHandler);
         this.mainThreadExecutor =Preconditions.checkNotNull(mainThreadExecutor);
         this.ioExecutor =Preconditions.checkNotNull(ioExecutor);
 
         initializeInternal();
}

YarnResourceManagerDriver.java

protected void initializeInternal() throws Exception {
         final YarnContainerEventHandler yarnContainerEventHandler = new YarnContainerEventHandler();
         try {
                  // 创建和启动yarn的resourcemanager客户端
                  resourceManagerClient =yarnResourceManagerClientFactory.createResourceManagerClient(
                          yarnHeartbeatIntervalMillis,
                          yarnContainerEventHandler);
                  resourceManagerClient.init(yarnConfig);
                  resourceManagerClient.start();
 
                  final RegisterApplicationMasterResponse registerApplicationMasterResponse = registerApplicationMaster();
                  getContainersFromPreviousAttempts(registerApplicationMasterResponse);
                  taskExecutorProcessSpecContainerResourcePriorityAdapter=
                          newTaskExecutorProcessSpecContainerResourcePriorityAdapter(
                                   registerApplicationMasterResponse.getMaximumResourceCapability(),
                                   ExternalResourceUtils.getExternalResources(flinkConfig,YarnConfigOptions.EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX));
         } catch (Exception e) {
                  throw newResourceManagerException("Could not start resource manager client.",e);
         }
        
         // 创建和启动yarn的nodemanager客户端
         nodeManagerClient =yarnNodeManagerClientFactory.createNodeManagerClient(yarnContainerEventHandler);
         nodeManagerClient.init(yarnConfig);
         nodeManagerClient.start();
}

启动 SlotManager

StandaloneLeaderElectionService.java

private void startServicesOnLeadership() {
         startHeartbeatServices();
 
         slotManager.start(getFencingToken(),getMainThreadExecutor(), new ResourceActionsImpl());
 
         onLeadership();
}

SlotManagerImpl.java

public void start(ResourceManagerId newResourceManagerId,Executor newMainThreadExecutor, ResourceActions newResourceActions) {
         LOG.info("Starting theSlotManager.");
 
         this.resourceManagerId =Preconditions.checkNotNull(newResourceManagerId);
         mainThreadExecutor =Preconditions.checkNotNull(newMainThreadExecutor);
         resourceActions =Preconditions.checkNotNull(newResourceActions);
 
         started = true;
 
         taskManagerTimeoutsAndRedundancyCheck =scheduledExecutor.scheduleWithFixedDelay(
                  () ->mainThreadExecutor.execute(
                          () -> checkTaskManagerTimeoutsAndRedundancy()),
                  0L,
                  taskManagerTimeout.toMilliseconds(),
                  TimeUnit.MILLISECONDS);
 
         slotRequestTimeoutCheck =scheduledExecutor.scheduleWithFixedDelay(
                  () ->mainThreadExecutor.execute(
                          () -> checkSlotRequestTimeouts()),
                  0L,
                  slotRequestTimeout.toMilliseconds(),
                  TimeUnit.MILLISECONDS);
 
         registerSlotManagerMetrics();
}
void checkTaskManagerTimeoutsAndRedundancy() {
         if (!taskManagerRegistrations.isEmpty()){
                  long currentTime =System.currentTimeMillis();
 
                  ArrayList<TaskManagerRegistration>timedOutTaskManagers = new ArrayList<>(taskManagerRegistrations.size());
 
                  // first retrieve the timedout TaskManagers
                  for (TaskManagerRegistrationtaskManagerRegistration : taskManagerRegistrations.values()) {
                          if (currentTime -taskManagerRegistration.getIdleSince() >=taskManagerTimeout.toMilliseconds()) {
                                   // we collectthe instance ids first in order to avoid concurrent modifications by the
                                   //ResourceActions.releaseResource call
                                   timedOutTaskManagers.add(taskManagerRegistration);
                          }
                  }
                 
                  int slotsDiff =redundantTaskManagerNum * numSlotsPerWorker - freeSlots.size();
                  if (freeSlots.size() == slots.size()){
                          // No need tokeep redundant taskManagers if no job is running.
                          // 如果没有job在运行,释放taskmanager
                          releaseTaskExecutors(timedOutTaskManagers,timedOutTaskManagers.size());
                  } else if (slotsDiff > 0) {
                          // Keep enoughredundant taskManagers from time to time.
            // 保证随时有足够的taskmanager
                          intrequiredTaskManagers = MathUtils.divideRoundUp(slotsDiff, numSlotsPerWorker);
                          allocateRedundantTaskManagers(requiredTaskManagers);
                  } else {
                          // second we triggerthe release resource callback which can decide upon the resource release
                          int maxReleaseNum =(-slotsDiff) / numSlotsPerWorker;
                          releaseTaskExecutors(timedOutTaskManagers,Math.min(maxReleaseNum, timedOutTaskManagers.size()));
                  }
         }
}

8、JobManager申请Slot

启动 SlotPool

接6,JobMaster启动时,启动SlotPool,向ResourceManager注册

private void startJobMasterServices() throwsException {
         // 启动心跳服务
         startHeartbeatServices();
 
         // 启动slotPool
         slotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor());
 
         // 连接到之前已知的ResourceManager
         reconnectToResourceManager(newFlinkException("Starting JobMaster component."));
 
         // 启动后slotpool开始向slot manager请求slot
         resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
}

向 ResourceManager 注册

经过下面层层调用:

resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());

-> notifyOfNewResourceManagerLeader()

-> notifyOfNewResourceManagerLeader()

-> reconnectToResourceManager()

-> tryConnectToResourceManager()

-> connectToResourceManager()

private void connectToResourceManager() {
         ... ...
 
         resourceManagerConnection = new ResourceManagerConnection(
                  log,
                  jobGraph.getJobID(),
                  resourceId,
                  getAddress(),
                  getFencingToken(),
                  resourceManagerAddress.getAddress(),
                  resourceManagerAddress.getResourceManagerId(),
                  scheduledExecutorService);
 
         resourceManagerConnection.start();
}

RegisteredRpcConnection.java

public void start() {
         ... ...
 
         final RetryingRegistration<F, G,S> newRegistration = createNewRegistration();
 
         if (REGISTRATION_UPDATER.compareAndSet(this,null, newRegistration)) {
                  newRegistration.startRegistration();
         } else {
                  // concurrent start operation
                  newRegistration.cancel();
         }
}
privateRetryingRegistration<F, G, S> createNewRegistration() {
         RetryingRegistration<F, G, S>newRegistration = checkNotNull(generateRegistration());
 
         ... ...
}

JobMaster.java的内部类ResourceManagerConnection

protected RetryingRegistration<ResourceManagerId, ResourceManagerGateway,JobMasterRegistrationSuccess> generateRegistration() {
         return newRetryingRegistration<ResourceManagerId, ResourceManagerGateway,JobMasterRegistrationSuccess>(
                  log,
                  getRpcService(),
                  "ResourceManager",
                  ResourceManagerGateway.class,
                  getTargetAddress(),
                  getTargetLeaderId(),
                  jobMasterConfiguration.getRetryingRegistrationConfiguration()){
 
                  @Override
                  protectedCompletableFuture<RegistrationResponse> invokeRegistration(
                                   ResourceManagerGatewaygateway, ResourceManagerId fencingToken, long timeoutMillis) {
                          Time timeout =Time.milliseconds(timeoutMillis);
 
                          return gateway.registerJobManager(
                                   jobMasterId,
                                   jobManagerResourceID,
                                   jobManagerRpcAddress,
                                   jobID,
                                   timeout);
                  }
         };
}

SlotPool 申请 slot

注册成功调用onRegistrationSuccess(),向ResourceManager进行slot的申请:

JobMaster.java的内部类ResourceManagerConnection

protected void onRegistrationSuccess(finalJobMasterRegistrationSuccess success) {
         runAsync(() -> {
                  // filter out outdatedconnections
                  //noinspection ObjectEquality
                  if (this ==resourceManagerConnection) {
                          establishResourceManagerConnection(success);
                  }
         });
}
private void establishResourceManagerConnection(finalJobMasterRegistrationSuccess success) {
         ... ...
         slotPool.connectToResourceManager(resourceManagerGateway);
         ... ...
}
SlotPoolImpl.java
public void connectToResourceManager(@NonnullResourceManagerGateway resourceManagerGateway) {
         this.resourceManagerGateway =checkNotNull(resourceManagerGateway);
 
         // work on all slots waiting for thisconnection
         for (PendingRequest pendingRequest :waitingForResourceManager.values()) {
                  // 向ResourceManager申请slot
                  requestSlotFromResourceManager(resourceManagerGateway,pendingRequest);
         }
 
         // all sent off
         waitingForResourceManager.clear();
}
private void requestSlotFromResourceManager(
                  final ResourceManagerGatewayresourceManagerGateway,
                  final PendingRequestpendingRequest) {
         ... ...
         CompletableFuture<Acknowledge>rmResponse = resourceManagerGateway.requestSlot(
                  jobMasterId,
                  new SlotRequest(jobId,allocationId, pendingRequest.getResourceProfile(), jobManagerAddress),
                  rpcTimeout);
 
         ... ...
}

ResourceManager.java:由ResourceManager里的SlotManager处理请求

public CompletableFuture<Acknowledge> requestSlot(
                  JobMasterId jobMasterId,
                  SlotRequest slotRequest,
                  final Time timeout) {
 
         ... ...
                          try {
                                   // SlotManager处理slot请求
                                   slotManager.registerSlotRequest(slotRequest);
                          }
... ...
}

SlotManagerImpl.java

public boolean registerSlotRequest(SlotRequest slotRequest)throws ResourceManagerException {
         checkInit();
 
         ... ...
 
                  PendingSlotRequest pendingSlotRequest = new PendingSlotRequest(slotRequest);
 
                  pendingSlotRequests.put(slotRequest.getAllocationId(),pendingSlotRequest);
 
                  try {
                          internalRequestSlot(pendingSlotRequest);
                  }
         ... ...
}
private void internalRequestSlot(PendingSlotRequestpendingSlotRequest) throws ResourceManagerException {
         final ResourceProfile resourceProfile =pendingSlotRequest.getResourceProfile();
 
         OptionalConsumer.of(findMatchingSlot(resourceProfile))
                  .ifPresent(taskManagerSlot-> allocateSlot(taskManagerSlot, pendingSlotRequest))
                  .ifNotPresent(() -> fulfillPendingSlotRequestWithPendingTaskManagerSlot(pendingSlotRequest));
}
private void fulfillPendingSlotRequestWithPendingTaskManagerSlot(PendingSlotRequestpendingSlotRequest) throws ResourceManagerException {
         ... ...
         if (!pendingTaskManagerSlotOptional.isPresent()){
                  pendingTaskManagerSlotOptional= allocateResource(resourceProfile);
         }
 
         ... ...
}

9、ResourceManager申请资源

ResourceManager.java

public boolean allocateResource(WorkerResourceSpec workerResourceSpec){
         validateRunsInMainThread();
         return startNewWorker(workerResourceSpec);
}
ActiveResourceManager.java
public boolean startNewWorker(WorkerResourceSpecworkerResourceSpec) {
         requestNewWorker(workerResourceSpec);
         return true;
}
private void requestNewWorker(WorkerResourceSpecworkerResourceSpec) {
         // 从配置中获取taskexecutor配置
         final TaskExecutorProcessSpectaskExecutorProcessSpec =
                          TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig,workerResourceSpec);
... ...
// 申请资源
         CompletableFuture<WorkerType> requestResourceFuture = resourceManagerDriver.requestResource(taskExecutorProcessSpec);
... ...
}

YarnResourceManagerDriver.java

public CompletableFuture<YarnWorkerNode> requestResource(TaskExecutorProcessSpec taskExecutorProcessSpec) {
         checkInitialized();
 
         final CompletableFuture<YarnWorkerNode> requestResourceFuture = newCompletableFuture<>();
 
         final Optional<TaskExecutorProcessSpecContainerResourcePriorityAdapter.PriorityAndResource>priorityAndResourceOpt =
         taskExecutorProcessSpecContainerResourcePriorityAdapter.getPriorityAndResource(taskExecutorProcessSpec);
 
         if(!priorityAndResourceOpt.isPresent()) {
                  requestResourceFuture.completeExceptionally(
                          newResourceManagerException(
                                   String.format("Couldnot compute the container Resource from the given TaskExecutorProcessSpec %s." +
                                                     "Thisusually indicates the requested resource is larger than Yarn's max containerresource limit.",
                                            taskExecutorProcessSpec)));
         } else {
                  final Priority priority =priorityAndResourceOpt.get().getPriority();
                  final Resource resource =priorityAndResourceOpt.get().getResource();
                  resourceManagerClient.addContainerRequest(getContainerRequest(resource, priority));
 
                  // make sure we transmit therequest fast and receive fast news of granted allocations
                  resourceManagerClient.setHeartbeatInterval(containerRequestHeartbeatIntervalMillis);
 
                  requestResourceFutures.computeIfAbsent(taskExecutorProcessSpec,ignore -> new LinkedList<>()).add(requestResourceFuture);
 
                  log.info("Requesting newTaskExecutor container with resource {}, priority {}.",taskExecutorProcessSpec, priority);
         }
 
         return requestResourceFuture;
}

10、TaskManager启动

YarnTaskExecutorRunner.java

public static void main(String[] args) {
         EnvironmentInformation.logEnvironmentInfo(LOG,"YARN TaskExecutor runner", args);
         SignalHandler.register(LOG);
         JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
         runTaskManagerSecurely(args);
}
private static void runTaskManagerSecurely(String[] args) {
         try {
                  LOG.debug("Allenvironment variables: {}", ENV);
 
                  final String currDir =ENV.get(Environment.PWD.key());
                  LOG.info("Current workingDirectory: {}", currDir);
 
                  final Configurationconfiguration = TaskManagerRunner.loadConfiguration(args);
                  setupAndModifyConfiguration(configuration,currDir, ENV);
 
                  TaskManagerRunner.runTaskManagerSecurely(configuration);
         }
         catch (Throwable t) {
                  final ThrowablestrippedThrowable = ExceptionUtils.stripException(t,UndeclaredThrowableException.class);
                  // make sure that everythingwhatever ends up in the log
                  LOG.error("YARN TaskManagerinitialization failed.", strippedThrowable);
                  System.exit(INIT_ERROR_EXIT_CODE);
         }
}

TaskManagerRunner.java

public void start() throws Exception {
         taskExecutorService.start();
}

TaskExecutorToServiceAdapter.java

public void start() {
         taskExecutor.start();
}

TaskExecutor.java

public void onStart() throws Exception {
         try {
                  startTaskExecutorServices();
         } catch (Throwable t) {
                  final TaskManagerExceptionexception = new TaskManagerException(String.format("Could not start theTaskExecutor %s", getAddress()), t);
                  onFatalError(exception);
                  throw exception;
         }
 
         startRegistrationTimeout();
}

11、向ResourceManager注册

TaskExecutor.java

private void startTaskExecutorServices() throwsException {
         try {
                  // start by connecting to theResourceManager
                  resourceManagerLeaderRetriever.start(newResourceManagerLeaderListener());
 
                  // tell the task slot tablewho's responsible for the task slot actions
                  taskSlotTable.start(newSlotActionsImpl(), getMainThreadExecutor());
 
                  // start the job leaderservice
                  jobLeaderService.start(getAddress(),getRpcService(), haServices, new JobLeaderListenerImpl());
 
                  fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(),blobCacheService.getPermanentBlobService());
         } catch (Exception e) {
                  handleStartTaskExecutorServicesException(e);
         }
}

resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());

-> notifyOfNewResourceManagerLeader()

-> TaskExecutor的notifyOfNewResourceManagerLeader()

-> TaskExecutor的reconnectToResourceManager()

-> TaskExecutor的tryConnectToResourceManager()

-> TaskExecutor的connectToResourceManager()

-> TaskExecutor的resourceManagerConnection.start()

执行 createNewRegistration()->generateRegistration()

TaskExecutorToResourceManagerConnection.java

protected RetryingRegistration<ResourceManagerId, ResourceManagerGateway,TaskExecutorRegistrationSuccess> generateRegistration() {
         return newTaskExecutorToResourceManagerConnection.ResourceManagerRegistration(
                  log,
                  rpcService,
                  getTargetAddress(),
                  getTargetLeaderId(),
                  retryingRegistrationConfiguration,
                  taskExecutorRegistration);
}

开始注册newRegistration. startRegistration()会调用invokeRegistration():

TaskExecutorToResourceManagerConnection.java的内部类ResourceManagerRegistration

private static class ResourceManagerRegistration
                  extendsRetryingRegistration<ResourceManagerId, ResourceManagerGateway,TaskExecutorRegistrationSuccess> {
 
         private final TaskExecutorRegistration taskExecutorRegistration;
 
         ResourceManagerRegistration(
                          Logger log,
                          RpcService rpcService,
                          String targetAddress,
                          ResourceManagerIdresourceManagerId,
                          RetryingRegistrationConfigurationretryingRegistrationConfiguration,
                          TaskExecutorRegistrationtaskExecutorRegistration) {
 
                  super(log, rpcService,"ResourceManager", ResourceManagerGateway.class, targetAddress,resourceManagerId, retryingRegistrationConfiguration);
                  this.taskExecutorRegistration= taskExecutorRegistration;
         }
 
         @Override
         protectedCompletableFuture<RegistrationResponse> invokeRegistration(
                          ResourceManagerGatewayresourceManager, ResourceManagerId fencingToken, long timeoutMillis) throwsException {
 
                  Time timeout =Time.milliseconds(timeoutMillis);
                  return resourceManager.registerTaskExecutor(
                          taskExecutorRegistration,
                          timeout);
         }
}

注册成功调用onRegistrationSuccess

protected void onRegistrationSuccess(TaskExecutorRegistrationSuccesssuccess) {
         log.info("Successful registrationat resource manager {} under registration id {}.",
                  getTargetAddress(),success.getRegistrationId());
 
         registrationListener.onRegistrationSuccess(this, success);
}

TaskExecutor.java的内部类ResourceManagerRegistrationListener

public void onRegistrationSuccess(TaskExecutorToResourceManagerConnectionconnection, TaskExecutorRegistrationSuccess success) {
         final ResourceID resourceManagerId =success.getResourceManagerId();
         final InstanceIDtaskExecutorRegistrationId = success.getRegistrationId();
         final ClusterInformationclusterInformation = success.getClusterInformation();
         final ResourceManagerGatewayresourceManagerGateway = connection.getTargetGateway();
 
         runAsync(
                  () -> {
                          // filter out outdatedconnections
                          //noinspectionObjectEquality
                          if(resourceManagerConnection == connection) {
                                   try {
                                            establishResourceManagerConnection(
                                                     resourceManagerGateway,
                                                     resourceManagerId,
                                                     taskExecutorRegistrationId,
                                                     clusterInformation);
                                   } catch(Throwable t) {
                                            log.error("EstablishingResource Manager connection in Task Executor failed", t);
                                   }
                          }
                  });
}
private void establishResourceManagerConnection(
                  ResourceManagerGateway resourceManagerGateway,
                  ResourceID resourceManagerResourceId,
                  InstanceID taskExecutorRegistrationId,
                  ClusterInformation clusterInformation) {
        
         // 向ResourceManager注册slot
         finalCompletableFuture<Acknowledge> slotReportResponseFuture =resourceManagerGateway.sendSlotReport(
                  getResourceID(),
                  taskExecutorRegistrationId,
                  taskSlotTable.createSlotReport(getResourceID()),
                  taskManagerConfiguration.getTimeout());
 
         ... ...
}

ResourceManager.java

public CompletableFuture<Acknowledge> sendSlotReport(ResourceIDtaskManagerResourceId, InstanceID taskManagerRegistrationId, SlotReportslotReport, Time timeout) {
         final WorkerRegistration<WorkerType> workerTypeWorkerRegistration =taskExecutors.get(taskManagerResourceId);
 
         if(workerTypeWorkerRegistration.getInstanceID().equals(taskManagerRegistrationId)){
                  if (slotManager.registerTaskManager(workerTypeWorkerRegistration,slotReport)) {
                          onWorkerRegistered(workerTypeWorkerRegistration.getWorker());
                  }
                  returnCompletableFuture.completedFuture(Acknowledge.get());
         } else {
                  return FutureUtils.completedExceptionally(newResourceManagerException(String.format("Unknown TaskManager registrationid %s.", taskManagerRegistrationId)));
  }
}       

SlotManagerImpl.java

public boolean registerTaskManager(final TaskExecutorConnectiontaskExecutorConnection, SlotReport initialSlotReport) {
         checkInit();
 
         LOG.debug("Registering TaskManager{} under {} at the SlotManager.",taskExecutorConnection.getResourceID().getStringWithMetadata(),taskExecutorConnection.getInstanceID());
 
         // we identify task managers by theirinstance id
         // 通过实例id判断某个taskmanager是否已经注册过
         if(taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())){
                  // 报告已注册过的taskmanager的slot分配情况,更新slot情况
                  reportSlotStatus(taskExecutorConnection.getInstanceID(),initialSlotReport);
                  return false;
         } else {
                  if(isMaxSlotNumExceededAfterRegistration(initialSlotReport)) {
                          LOG.info("Thetotal number of slots exceeds the max limitation {}, release the excess resource.",maxSlotNum);
                          resourceActions.releaseResource(taskExecutorConnection.getInstanceID(),new FlinkException("The total number of slots exceeds the maxlimitation."));
                          return false;
                  }
 
                  // first register theTaskManager
                  ArrayList<SlotID> reportedSlots= new ArrayList<>();
 
                  for (SlotStatus slotStatus :initialSlotReport) {
                          reportedSlots.add(slotStatus.getSlotID());
                  }
 
                  TaskManagerRegistrationtaskManagerRegistration = new TaskManagerRegistration(
                          taskExecutorConnection,
                          reportedSlots);
 
                  taskManagerRegistrations.put(taskExecutorConnection.getInstanceID(),taskManagerRegistration);
 
                  // next register the new slots
                  for (SlotStatus slotStatus :initialSlotReport) {
                          // 注册新的slot,根据slot请求进行分配
                          registerSlot(
                                   slotStatus.getSlotID(),
                                   slotStatus.getAllocationID(),
                                   slotStatus.getJobID(),
                                   slotStatus.getResourceProfile(),
                                   taskExecutorConnection);
                  }
 
                  return true;
         }
 
}

12、ResourceManager分配Slot

SlotManagerImpl.java

private void registerSlot(
                  SlotID slotId,
                  AllocationID allocationId,
                  JobID jobId,
                  ResourceProfileresourceProfile,
                  TaskExecutorConnectiontaskManagerConnection) {
 
         if (slots.containsKey(slotId)) {
                  // remove the old slot first
                  // 移除旧slot
                  removeSlot(
                          slotId,
                          newSlotManagerException(
                                   String.format(
                                            "Re-registrationof slot %s. This indicates that the TaskExecutor has re-connected.",
                                            slotId)));
         }
 
         // 创建和注册TaskManager的slot
         final TaskManagerSlot slot = createAndRegisterTaskManagerSlot(slotId,resourceProfile, taskManagerConnection);
 
         final PendingTaskManagerSlotpendingTaskManagerSlot;
 
         if (allocationId == null) {
                  pendingTaskManagerSlot = findExactlyMatchingPendingTaskManagerSlot(resourceProfile);
         } else {
                  pendingTaskManagerSlot = null;
         }
 
         if (pendingTaskManagerSlot == null) {
                  updateSlot(slotId,allocationId, jobId);
         } else {
                  pendingSlots.remove(pendingTaskManagerSlot.getTaskManagerSlotId());
                  final PendingSlotRequestassignedPendingSlotRequest =pendingTaskManagerSlot.getAssignedPendingSlotRequest();
 
                  // 分配slot给请求
                  if (assignedPendingSlotRequest== null) {
                          handleFreeSlot(slot);
                  } else {
                          assignedPendingSlotRequest.unassignPendingTaskManagerSlot();
                          allocateSlot(slot,assignedPendingSlotRequest);
                  }
         }
}
private void allocateSlot(TaskManagerSlottaskManagerSlot, PendingSlotRequest pendingSlotRequest) {
         ... ...
         taskManagerRegistration.markUsed();
 
         // RPC call to the task manager
         CompletableFuture<Acknowledge>requestFuture = gateway.requestSlot(
                  slotId,
                  pendingSlotRequest.getJobId(),
                  allocationId,
                  pendingSlotRequest.getResourceProfile(),
                  pendingSlotRequest.getTargetAddress(),
                  resourceManagerId,
                  taskManagerRequestTimeout);
 
         ... ...
}

13、TaskManager提供Slot

TaskExecutor.java

public CompletableFuture<Acknowledge> requestSlot(
         final SlotID slotId,
         final JobID jobId,
         final AllocationID allocationId,
         final ResourceProfile resourceProfile,
         final String targetAddress,
         final ResourceManagerIdresourceManagerId,
         final Time timeout) {
         ... ...
 
         try { 
                  // 分配taskmanager上的slot
                  allocateSlot(
                          slotId,
                          jobId,
                          allocationId,
                          resourceProfile);
         } catch (SlotAllocationException sae) {
                  returnFutureUtils.completedExceptionally(sae);
         }
 
         final JobTable.Job job;
 
         try {
                  job =jobTable.getOrCreateJob(jobId, () -> registerNewJobAndCreateServices(jobId,targetAddress));
         } catch (Exception e) {
                  // free the allocated slot
                  try {
                          taskSlotTable.freeSlot(allocationId);
                  } catch (SlotNotFoundExceptionslotNotFoundException) {
                          // slot no longerexistent, this should actually never happen, because we've
                          // just allocated theslot. So let's fail hard in this case!
                          onFatalError(slotNotFoundException);
                  }
 
                  // release local state underthe allocation id.
                  localStateStoresManager.releaseLocalStateForAllocationId(allocationId);
 
                  // sanity check
                  if(!taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
                          onFatalError(newException("Could not free slot " + slotId));
                  }
 
                  returnFutureUtils.completedExceptionally(new SlotAllocationException("Could notcreate new job.", e));
         }
 
         if (job.isConnected()) {
                  // 连接上job,提供slot给JobManager
                  offerSlotsToJobManager(jobId);
         }
 
         return CompletableFuture.completedFuture(Acknowledge.get());
}
private void internalOfferSlotsToJobManager(JobTable.ConnectionjobManagerConnection) {
         final JobID jobId =jobManagerConnection.getJobId();
 
         if(taskSlotTable.hasAllocatedSlots(jobId)) {
                  log.info("Offer reservedslots to the leader of job {}.", jobId);
 
                  final JobMasterGatewayjobMasterGateway = jobManagerConnection.getJobManagerGateway();
 
                  finalIterator<TaskSlot<Task>> reservedSlotsIterator =taskSlotTable.getAllocatedSlots(jobId);
                  final JobMasterId jobMasterId= jobManagerConnection.getJobMasterId();
 
                  finalCollection<SlotOffer> reservedSlots = new HashSet<>(2);
 
                  while(reservedSlotsIterator.hasNext()) {
                          SlotOffer offer =reservedSlotsIterator.next().generateSlotOffer();
                          reservedSlots.add(offer);
                  }
 
                  CompletableFuture<Collection<SlotOffer>>acceptedSlotsFuture = jobMasterGateway.offerSlots(
                          getResourceID(),
                          reservedSlots,
                          taskManagerConfiguration.getTimeout());
 
                  acceptedSlotsFuture.whenCompleteAsync(
                          handleAcceptedSlotOffers(jobId,jobMasterGateway, jobMasterId, reservedSlots),
                          getMainThreadExecutor());
         } else {
                  log.debug("There are nounassigned slots for the job {}.", jobId);
         }
}

JobMaster.java

public CompletableFuture<Collection<SlotOffer>> offerSlots(
                  final ResourceIDtaskManagerId,
                  final Collection<SlotOffer>slots,
                  final Time timeout) {
 
         Tuple2<TaskManagerLocation,TaskExecutorGateway> taskManager =registeredTaskManagers.get(taskManagerId);
 
         if (taskManager == null) {
                  returnFutureUtils.completedExceptionally(new Exception("Unknown TaskManager" + taskManagerId));
         }
 
         final TaskManagerLocationtaskManagerLocation = taskManager.f0;
         final TaskExecutorGatewaytaskExecutorGateway = taskManager.f1;
 
         final RpcTaskManagerGatewayrpcTaskManagerGateway = new RpcTaskManagerGateway(taskExecutorGateway,getFencingToken());
 
         return CompletableFuture.completedFuture(
                  slotPool.offerSlots(
                          taskManagerLocation,
                          rpcTaskManagerGateway,
                          slots));
}

SlotPoolImpl.java

public Collection<SlotOffer> offerSlots(
                  TaskManagerLocation taskManagerLocation,
                  TaskManagerGateway taskManagerGateway,
                  Collection<SlotOffer>offers) {
 
         ArrayList<SlotOffer> result = newArrayList<>(offers.size());
 
         for (SlotOffer offer : offers) {
                  if (offerSlot(
                          taskManagerLocation,
                          taskManagerGateway,
                          offer)) {
 
                          result.add(offer);
                  }
         }
 
         return result;
}
boolean offerSlot(
                  final TaskManagerLocationtaskManagerLocation,
                  final TaskManagerGatewaytaskManagerGateway,
                  final SlotOffer slotOffer) {
 
         ... ...
 
         // use the slot to fulfill pendingrequest, in requested order
         // 按照请求顺序,使用slot来完成挂起的请求
         tryFulfillSlotRequestOrMakeAvailable(allocatedSlot);
 
         // we accepted the request in any case.slot will be released after it idled for
         // too long and timed out
         return true;
}

往期精彩内容:

用flink能替代spark的批处理功能吗

Flink进阶之滑动窗口统计实时热门商品

Flink进阶之使用CEP实现恶意登陆检测

重磅!Flink源码解析环境准备及提交流程之环境准备

大咖分享 | 通过制作一个迷你版Flink来学习Flink源码

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/6309.html

(0)
上一篇 2022-12-14 22:56
下一篇 2022-12-14 22:56

相关推荐

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注

关注微信