Flink: 并行度理解

相关概念

在理解flink并行度概念之前先理解相关概念。

Flink Job 代表运行时的 Flink 程序。Flink Job 可以提交到长时间运行的 Flink Session Cluster,也可以作为独立的 Flink Application Cluster 启动。

JobManager 是在 Flink Master 运行中的组件之一。JobManager 负责监督单个作业 Task 的执行。以前,整个 Flink Master 都叫做 JobManager。

Sub-Task

Sub-Task 是负责处理数据流 Partition 的 Task。”Sub-Task”强调的是同一个 Operator 或者 Operator Chain 具有多个并行的 Task 。

Task

Task 是 Physical Graph 的节点。它是基本的工作单元,由 Flink 的 runtime 来执行。Task 正好封装了一个 Operator 或者 Operator Chain 的 parallel instance。

TaskManager 是 Flink Cluster 的工作进程。Task 被调度到 TaskManager 上执行。TaskManager 相互通信,只为在后续的 Task 之间交换数据

什么是并行度

在flink架构中包含了JobManager和TaskManager两个主要角色,TM负责从JM接收任务,并提供对应的资源。这个任务也就是JM分配slot,这个资源单位也就是Flink中的slot(TaskSlot)。

在Flink中TM为了对资源进行隔离和增加允许的task数,引入了slot的概念,这个slot对资源的隔离仅仅是对内存进行隔离,策略是均分,比如TM的管理内存是3GB,假如有两个个slot,那么每个slot就仅仅有1.5GB内存可用。

flink在启动TaskManager后,会自动管理当前TaskManager上所有能提供的slot,slot在flink中可以被叫做资源组,我觉得叫线程池更加好理解。

yFouAwZ2sTqC3pM

并行度在这里边也就是在一个task提交后,能够同时执行多少个sub task,在一些情况下TM适当的提高并行度能够提高整个任务的执行效率,比如在消费kafka数据时,通常情况下使用针对每一个parttion使用一个source子任务。

如果 Task Manager 有四个 slot,那么它将为每个 slot 分配 25% 的内存。 可以在一个 slot 中运行一个或多个线程。 同一 slot 中的线程共享相同的 JVM。 同一 JVM 中的任务共享 TCP 连接和心跳消息。Task Manager 的一个 Slot 代表一个可用线程,该线程具有固定的内存,注意 Slot 只对内存隔离,没有对 CPU 隔离。默认情况下,Flink 允许子任务共享 Slot,即使它们是不同 task 的 subtask,只要它们来自相同的 job。这种共享可以有更好的资源利用率。

一般情况下TM提供的slot数量与当前节点的可用CPU数量成正比,也就是说多少个cpu就有多少个slot,不过可以通过配置文件中TasManager.numberOfTaskSlots进行配置。

如何设置并行度

flink中设置并行度主要分为几个层次。

算子层次

算子层次包含了,Source、Transform、Sink等可以通过调用setParallelism()方法进行指定。

1
2
3
4
5
6
7
8
9
10
DataStream<WordCount> wordAndOneStream = dataStreamSource.flatMap(new FlatMapFunction<String, WordCount>() {
public void flatMap(String line, Collector<WordCount> out) {
String[] fields = line.split(",");
for (String word : fields) {
out.collect(new WordCount(word, 1L));
}
}
}).setParallelism(2);
.....
count.print().setParallelism(2);

执行环境层次

flink程序运行在执行环境的上下文中StreamExecutionEnvironment。在设定了执行环境的并行度后,那么所有Source、Transform、Sink都被定义了一个默认的并行度。不过这个并行度在每个算子层次再次定义时将会被覆盖掉。

定义方式如下:

1
2
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3)

客户端层次

将作业提交到 Flink 时可在客户端设定其并行度。客户端可以是 Java 或 Scala 程序,Flink 的命令行接口(CLI)就是一种典型的客户端。

在 CLI 客户端中,可以通过 -p 参数指定并行度,例如:

1
./bin/flink run -p 10 ../examples/*WordCount-java*.jar

在 Java/Scala 程序中,可以通过如下方式指定并行度:

1
2
3
4
5
6
7
8
9
10
11
12
13
try {
PackagedProgram program = new PackagedProgram(file, args);
InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123");
Configuration config = new Configuration();

Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());

// set the parallelism to 10 here
client.run(program, 10, true);

} catch (ProgramInvocationException e) {
e.printStackTrace();
}

系统层次

可以通过设置 ./conf/flink-conf.yaml 文件中的 parallelism.default 参数,在系统层次来指定所有执行环境的默认并行度。

优先级

可以看到,几种方式是从不同的角度去设定的并行度,这样就会产生一个设定的优先级问题。在flink中并行度设定的优先级类似于就近原则。

优先级由高 -> 低

算子层次 -> 执行环境层次 -> 客户端层次 -> 系统层次

总结

本篇记录了flink在并行度方面的相关知识,通过对相关概念的分析,我们知道并行度其实就是在flink中一个task的并行实例数量就称为该task的并行度。
同样分析了flink中最小的资源单位slot,并解释了为何要引入slot,以及并行度与slot的相关关系。
最后,提供了四种flink的并行度如何设定,以及每种方式的优先级

相关引用

Apache Flink 文档

Flink 从 0 到 1 学习 —— Flink parallelism 和 Slot 介绍

flink solt和并行度