Loading...
墨滴

在IT中穿梭旅行

2021/12/13  阅读:71  主题:橙心

Flink on yarn per-job提交流程

大家好,我是土哥。

今天为大家带来 秒懂 Flink 系列的第 25 篇原创文章 Flink On Yarn per-job 模式的提交流程及源码分析。文章比较硬核,由于前期部署的 Flink 生态是 Flink 1.13.2 版本,所以下面就通过 Flink 1.13.2 版本进行教学,希望小伙伴们可以快速学会~ 具体内容如下:

  1. 提交流程
  2. 代码实现
  3. 提交命令
  4. 源码分析

获取此文 markdown 格式,请将本篇文章进行 点赞+在看,分享朋友圈后 添加博主微信,截图发给土哥,获取纯净版 markdown 格式

提交流程

1、客户端(入口类 CliFrontend)

⭐1.1 执行启动脚本,进入 CliFrontend 类的 main 方法中,获取 flink conf 目录配置的路径,然后对其进行加载,同时依次添加 3 种客户端类型,并创建 CliFrontend 对象;

⭐1.2 在 main 中执行 parseAndRun 对提交的命令行参数进行解析;

⭐1.3 在解析命令时,根据提交的 run 模式选择对于的run方法,在run方法中选择 FlinkYarnSessionCli 作为客户端;

⭐1.4 在 run 方法中调用 executeProgram 进入用户自定义代码

⭐1.5 在用户自定义代码中执行 execute(),通过 getStreamGraph() 方法生成 streamGraph;

⭐1.6 选择 YarnJobClusterExecutor 作为 pipelineExecutor,并生成 jobGraph;

1.7 创建并启动 yarn 客户端,获取集群配置参数

1.8 部署集群,将应用配置(Flink-conf.yaml、logback.xml、log4j.properties)和相关文件(Flink Jar、配置类文件、用户 Jar 文件、JobGraph 对象等)上传至分布式存储 HDFS 中。

⭐1.9 封装 ApplicationMaster 参数和命令,生出 ClusterClientJobClient

⭐1.10 ClusterClientJobClient 向 Yarn ResourceManager 提交任务信息

2、启动 ApplicationMaster

⭐2 Yarn ResourceManager 收到提交的任务信息后,将分配 Container 资源,并通知对应的 NodeManager 启动一个 ApplicationMaster (每提交一个 Flink job 就会启动一个 ApplicationMaster)

3、作业提交

⭐3.1 ApplicationMaster 启动 Dispatcher 和 ResourceManager ;

⭐3.2 Dispatcher 启动 JobMaster (该步和 Session 不同,Jabmaster 是由 Dispatcher 拉起,而不是 Client 传过来的)。

JobMaster 负责作业调度,管理作业和 Task 的生命周期,构建 ExecutionGraph( JobGraph 的并行化版本,调度层最核心的数据结构。

4、作业调度执行

⭐4 JobMaster 向 ResourceManager 申请 Slot 资源,开始调度 ExecutionGraph。

⭐5 ResourceManager 将资源请求加入等待队列,通过心跳向 YarnResourceManager 申请新的 Container 来启动 TaskManager 进程。

⭐6 YarnResourceManager 启动,然后从 HDFS 加载 Jar 文件等所需相关资源,在容器中启动 TaskManager。

⭐7 TaskManager 在内部启动 TaskExecutor。

⭐8 TaskManager 启动后,向 ResourceManager 注册,并把自己的 Slot 资源情况汇报给 ResourceManager。

⭐9 ResourceManager 从等待队列取出 Slot 请求,向 TaskManager 确认资源可用情况,并告知 TaskManager 将 Slot 分配给哪个 JobMaster。

⭐10 TaskManager 向 JobMaster 回复自己的一个 Slot 属于你这个任务,JobMaser 会将 Slot 缓存到 SlotPool。

⭐11 JobMaster 调度 Task 到 TaskMnager 的 Slot 上执行。

代码实现

通过一个简单案例进行远程 debug 调试

package com.threeknowbigdata.flink.datastream;


import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;

import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.util.Properties;

/**
 * 类描述:
 *
 * @ClassName WordCount_kafka
 * @Description:
 * @Author: 土哥
 * @Date: 2021/12/11 下午6:53
 */

public class WordCount_kafka {
    public static void main(String[] args) throws Exception {
        //Todo 1.准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.enableCheckpointing(5000);
        env.setParallelism(4);

        //Todo 2.准备kafka连接参数
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers""192.168.244.129:9092");
        prop.setProperty("group.id""consumer-group");
        prop.setProperty("auto.offset.reset""earliest");
        //String topic = "adClickLog";
        String topic = "ack";


        //Todo 3.创建kafka数据源
        FlinkKafkaConsumer<String> flinkKafkaConsumer = (FlinkKafkaConsumer<String>)
                new FlinkKafkaConsumer<String>(topic,new SimpleStringSchema(),prop);
        flinkKafkaConsumer.setStartFromEarliest();
        //Todo 4.指定消费者参数
        flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(true);//默认为true
        //Todo 从最早的数据开始消费
        flinkKafkaConsumer.setStartFromEarliest();

        // 1. 从文件中读取数据
        DataStreamSource<String> stringDataStreamSource = env.addSource(flinkKafkaConsumer);
        //3、将接收到的数据转为单词元祖(执行转换操作)
        DataStream<Tuple2<String, Integer>> wordDatastream = stringDataStreamSource
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {

                    @Override
                    public void flatMap(
                            String value,
                            Collector<Tuple2<String, Integer>> out)
 throws Exception 
{
                        String[] line = value.split(",");
                        for (String word : line) {
                            out.collect(new Tuple2<>(word, 1));
                        }

                    }
                }).setParallelism(4);

        KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = wordDatastream
                .keyBy(0);
        DataStream<Tuple2<String, Integer>> sinkDtastream = keyedStream.sum(1).setParallelism(4);

        //4、sink操作
        sinkDtastream.print();


        env.execute("submit job");
    }
}

提交命令

设置远程 debug

在服务器 flink-conf.yaml 配置文件中添加 remote debug 调试命令,具体命令如下:

# jobmanager debug端口
env.java.opts.jobmanager: "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006"
#  taskmanager debug端口
env.java.opts.taskmanager: "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005"
# 设置cliFrontend 客户端的debug端口
env.java.opts.client: "-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5008"

# 设置超时 时间 . 单位 毫秒.
rest.connection-timeout: 360000000
rest.idleness-timeout: 360000000

在 IDEA 设置 remote debug 模式

参数如下:

-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5008

提交命令

[lyz@hlinkui bin]$ ./flink run -t yarn-per-job -c \
com.threeknowbigdata.flink.datastream.WordCount_kafka \
/home/lyz/workspace/javaspace/FlinkStudy/target/FlinkStudy-1.0-SNAPSHOT.jar

源码分析

在服务器提交命令后,程序会卡住,然后进入远程调试阶段,远程调试总共分为 3 大部分,分别为:

  1. CliFrontend 客户端
  2. YarnJobClusterEntryPoint:AM 执行的入口类
  3. YarnTaskExecutorRunner:Yarn 模式下的 TaskManager 的入口类

1.1 提交命令

通过 flink on yarn per-job 模式提交,查看 flink 脚本可以看到,程序被提交后,会寻找 CliFrontend 类。

在 客户端 CliFrontend 类中打上断点,程序运行之后,会通过 Main 方法进入远程调试阶段。

1.2 Main 方法执行

在 CliFrontend.main 方法中,会执行如下操作:

1 获取 flink conf 目录配置的路径;

2 根据 conf 目录加载配置;

3 根据三种方式 GenericCLI、flinkYarnSessionCLI、DefaultCLI 按照顺序依次封装成命令行接口。

4 创建 CLiFrontend 客户端;

5 对 flink run 提交的命令行进行解析。(除过 flink 为脚本,run 及 后面的命令全部需要解析判断 )

具体如下源码:

1.2.1 获取 flink conf 目录配置的路径;

⭐ CliFrontend.java

通过 getConfigurationDirectoryFromEnv 方法可以看到,这一步主要是用来获取 flink conf 的目录配置文件路径

1.2.2 根据 conf 目录加载配置;

⭐ GlobalConfiguration.java

在 loadConfiguration 方法中,加载之前获取的 conf 路径配置

1.2.3 选择创建的客户端类型;

⭐ CliFrontend.java

进入 loadCustomCommandLines 方法中,可以看到 这里依次添加了 GenericCLi、FlinkYarnSessionCli 和 DefaultCLi 三种命令行客户端(后面根据 isActive()按顺序选择):

1.2.4 创建 CliFrontend 客户端

⭐ CliFrontend.java

**1.2.5 调用 parseAndRun 解析参数 **

第五步: cli.parseAndRun(args) 为主要执行方法,我们进入该方法中进行查看:

⭐ CliFrontend.java

parseAndRun 方法中,主要是对之前提交的命令进行解析分析,我们以提交的命令行进行分析

flink run -t yarn-per-job -c xxx xxx.jar

其中 flink 为脚本,不算命令,从 run 开始,所以下图源码中的 String action = args[0] 得到的第一个参数就是 run。说明提交方式为 run。

然后根据 run 动作解析输入参数

1.3 通过 run 方法解析输入参数

run 在 CliFrontend.java 类中

在 run 方法中主要执行以下五步操作:

  1. 使用 CliFrontendParser.getRunCommandOptions()获取默认的运行参数
  2. 使用 this.getCommandLine 根据用户指定的配置项进行解析,然后包装成commandLine
  3. 根据提交的命令选择对应的客户端
  4. 获取有效的配置信息
  5. 执行 executeProgram 进入用户自定义类中

我们主要分析一下 this.executeProgram

1.4 执行 this.executeProgram 进入到用户自定义类中

executeProgram() 在 CliFrontend.java 类中

通过调用 this.executeProgram 方法进入到用户定义的程序主类中,首先创建 StreamExecutionEnvironment 环境,然后接收自定义的 source,本文代码使用 kafka 作为 source 源,通过 transformation 对算子进行转换,最后执行 sink 操作,当提交集群时,需要执行 execute()

1.5 生成 StreamGraph

StreamGraph 在 StreamExecutionEnvironment.java 类生成

通过查看 execute() 方法,发现通过输入形参 jobMame 最终返回一个 JobExecutionResult,继续深入查看 getStreamGraph()方法

在 getStreamGraph 中,通过加载全局配置和转换操作来取得流图生成器,最后生成 StreamGraph

1.6.1 选择 pipelineExecutor

pipelineExecutor 在 StreamExecutionEnvironment.java 类生成

通过 getExecutorFactory(configuration) 选择 YarnJobClusterExecutorFactory 作为工厂类,选择 YarnJobClusterExecutor 作为 pipelineExecutor,

1.6.2 生成 jobGraph

jobGraph 在 AbstractJobClusterExecutor.java 类生成

当 pipelineExecutor 生选择之后,executorFactory 会调用 getExecutor(configuration) 获取之前的配置,然后执行 execute(streamGraph, configuration, userClassloader) 生成 jobClientFuture,在这其中会生成 JobGraph,如下图所示

进入 getJobGraph()方法

1.7 创建并启动 yarn 客户端

YarnClusterDescriptor 在 AbstractJobClusterExecutor.java 类生成

jobGraph 生成后,会进行如下操作:

  1. 会创建并启动 yarn 客户端;
  2. 获取集群配置参数;

1.8 部署集群,上传 jar 包和配置文件到 HDFS

集群部署在 YarnJobCluster.java类中

进入到 deployJobCluster()方法中,会获取 YarnJobClusterEntrypoint,启动 ApplicationMaster 的入口

ApplicationMaster 入口启动后,然后将 jar 包和配置文件上传到 HDFS 中。

1.9 封装 AM 参数和命令,生成 ClusterClientJobClient

⭐ YarnClusterDescriptor.java

在 YarnClusterDescriptor 类的 setupApplicationMasterContainer() 方法中会创建 AM 的容器启动上下文,然后封装 AM 参数和命令,生成 ClusterClientJobClient

1.10 提交任务信息

ClusterClientJobClient 生成后,会 进入到 YarnClientImpl 实现类中提交 jobGraph,提交方法为 submitApplication,具体执行流程图如下:

继续深入 执行 rmClient.submitApplication方法,在ApplicationClientProtocolPBClientImpl.java 类的 submitApplication 方法中会获取到http 报文,然后将获取到的报文发送到服务器,并将返回的结果构成 response

最后将应用请求提交到 Yarn 上的 RMAppManager 去提交任务。

2 在 1.8 部署集群的时候,已经启动了 ApplicationMaster

3.1 ApplicationMaster 创建 Dispatcher、ResourceManager

后面会保证大家更详细的看清楚 JobManager 和 TaskManager的执行过程,我将源码的代码复制出来,添加注释,帮助大家更好理解。

Per-job 模式的 AM container 加载运行入口是 YarnJobClusterEntryPoint 中的 main() 方法,具体如下:

我们进入 runCluster 方法中,

1 、创建 dispatcher、ResourceManager 对象的工厂类。 其中包含从本地重新创建 JobGraph 的过程。

2、通过工厂类创建 dispatcher、ResourceManager 对象,Entry 启动 RpcService、HAService、BlobServer、HeartbeatServices、MetricRegistry、ExecutionGraphStore 等

我们调用 create()方法,该方法会执行以下操作:

1、创建接收前端 Rest 请求的节点

2、创建 ResourceManager对象,返回是 new YarnResourceManager

3、创建 dispatcherRunner 对象并启动

4、启动 ResourceManager

3.3 Dispatcher 启动 JobManager

Dispatcher.java

3.4 ResourceManager 启动 SlotManager

ResourceManager.java

3.4.1 创建 Yarn 的 RM 和 NM 客户端

ActiveResourceManager.java

YarnResourceManagerDriver.java

在 YarnResourceManagerDriver 里面创建和启动 yarn 的 resourcemanager 客户端,创建和启动 yarn 的 nodemanager 客户端

3.4.2 启动 SlotManager

如下为类之间的方法调用,最后在 checkTaskManagerTimeoutsAndRedundancy 方法中 进行判断:

如果没有 job 在运行,释放 taskmanager,保证随时有足够的 taskmanager

5 ResourceManager 申请资源

ResourceManager.java

6 TaskManager 启动

入口类为 YarnTaskExecutorRunner

进入到 runTaskManagerSecurely()方法中

7 TaskManager 启动

TaskExecutor.java

8 向 ResourceManager 注册 slot

TaskExecutor.java

9 ResourceManager 分配 Slot

SlotManagerImpl.java

10 TaskManager 提供 Slot

TaskExecutor.java

11 提交 Task 任务

JobMaster 调度 Task 到 TaskMnager 的 Slot 上执行

源码提交流程总结

1、客户端(入口类 CliFrontend)

⭐1.1 执行启动脚本,进入 CliFrontend 类的 main 方法中,获取 flink conf 目录配置的路径,然后对其进行加载,同时依次添加 3 种客户端类型,并创建 CliFrontend 对象;

⭐1.2 在 main 中执行 parseAndRun 对提交的命令行参数进行解析;

⭐1.3 在解析命令时,根据提交的 run 模式选择对于的run方法,在run方法中选择 FlinkYarnSessionCli 作为客户端;

⭐1.4 在 run 方法中调用 executeProgram 进入用户自定义代码

⭐1.5 在用户自定义代码中执行 execute(),通过 getStreamGraph() 方法生成 streamGraph;

⭐1.6 选择 YarnJobClusterExecutor 作为 pipelineExecutor,并生成 jobGraph;

1.7 创建并启动 yarn 客户端,获取集群配置参数

1.8 部署集群,将应用配置(Flink-conf.yaml、logback.xml、log4j.properties)和相关文件(Flink Jar、配置类文件、用户 Jar 文件、JobGraph 对象等)上传至分布式存储 HDFS 中。

⭐1.9 封装 ApplicationMaster 参数和命令,生出 ClusterClientJobClient

⭐1.10 ClusterClientJobClient 向 Yarn ResourceManager 提交任务信息

2、启动 ApplicationMaster

⭐2 Yarn ResourceManager 收到提交的任务信息后,将分配 Container 资源,并通知对应的 NodeManager 启动一个 ApplicationMaster (每提交一个 Flink job 就会启动一个 ApplicationMaster)

3、作业提交

⭐3.1 ApplicationMaster 启动 Dispatcher 和 ResourceManager ;

⭐3.2 Dispatcher 启动 JobMaster (该步和 Session 不同,Jabmaster 是由 Dispatcher 拉起,而不是 Client 传过来的)。

JobMaster 负责作业调度,管理作业和 Task 的生命周期,构建 ExecutionGraph( JobGraph 的并行化版本,调度层最核心的数据结构。

4、作业调度执行

⭐4 JobMaster 向 ResourceManager 申请 Slot 资源,开始调度 ExecutionGraph。

⭐5 ResourceManager 将资源请求加入等待队列,通过心跳向 YarnResourceManager 申请新的 Container 来启动 TaskManager 进程。

⭐6 YarnResourceManager 启动,然后从 HDFS 加载 Jar 文件等所需相关资源,在容器中启动 TaskManager。

⭐7 TaskManager 在内部启动 TaskExecutor。

⭐8 TaskManager 启动后,向 ResourceManager 注册,并把自己的 Slot 资源情况汇报给 ResourceManager。

⭐9 ResourceManager 从等待队列取出 Slot 请求,向 TaskManager 确认资源可用情况,并告知 TaskManager 将 Slot 分配给哪个 JobMaster。

⭐10 TaskManager 向 JobMaster 回复自己的一个 Slot 属于你这个任务,JobMaser 会将 Slot 缓存到 SlotPool。

⭐11 JobMaster 调度 Task 到 TaskMnager 的 Slot 上执行。

本文原创作者:土哥、一名大数据算法工程师。

文章首发平台:微信公众号【3分钟秒懂大数据】

原创不易,各位觉得文章不错的话,不妨点赞(在看)、留言、转发三连走起!谢谢大家!

此文已经制作成带目录的 PDF,获取本文 PDF 版本,请点击公众号回复:ElasticSearch,进群领取。

获取此文 markdown 格式,请将本篇文章进行 点赞+在看,分享朋友圈后 添加博主微信,截图发给土哥,获取纯净版 markdown 格式

在IT中穿梭旅行

2021/12/13  阅读:71  主题:橙心

作者介绍

在IT中穿梭旅行