Flink入门系列1:架构与部署

Flink是一个通用的框架,支持多种不同的部署场景。本篇从Flink集群架构、提交模式、以及资源部署方案的角度进行介绍。

集群组件

Flink客户端

Flink客户端负责将FIink应用程序编译为数据流图,然后将其提交给JobManager。

JobManager

JobManager是Flink集群工作的中央协调组件。它针对不同的Resource Provider会有不同实现,这些实现在高可用、资源分配以及所支持的提交模式是不同的。

JobManager所提供的作业提交模式有:

  • Application模式
  • Per-Job模式
  • Session模式

TaskManager

TaskManager负责执行Flink作业。

外部组件

Resource Provider

Flink可以通过不同的资源提供框架进行部署,例如Kubernetes、YARN或Mesos。

High Availability Service Provider

File Storage and Persistency

Metrics Storage

应用层的数据源和sink

提交模式

Application模式

在其它的模式中,应用程序的main()方法都是在client中执行的。过程包括:将应用的依赖下载到本地、执行main()方法提取Flink运行时可以理解的应用程序的表示(例如,JobGraph)、将依赖和JobGraph分发到集群。这使客户端成了一个大量消耗资源的组件,因为它需要大量的网络带宽来下载依赖、然后向集群发送这些二进制数据,并需要大量的CPU时间来执行main()方法。如果该客户端节点有很多用户所共享,那么这个问题会变得更为显著。

Application模式下,每个应用程序单独创建一个集群,应用程序的main()方法在JobManager上被执行。不需要事先启动一个Flink集群,然后再向集群提交作业;而是,将应用程序的业务逻辑和其依赖打包到一个可执行的作业JAR,集群的入口(ApplicationClusterEntryPoint)负责调用main()方法来获取JobGraph。每个应用创建一个集群可以被看作是:创建一个session集群,但是这个session集群只能由该应用的作业所使用;应用完成,集群也相应终止。这种架构下,Application模式提供了与Per-Job模式一样的资源隔离和负载均衡的保障,但是是在整个应用粒度上的。在JobManager上执行main()方法节省了所需要的CPU资源,也节省了本地下载依赖所需要的带宽。并且,因为每个应用一个JobManager,它会使得下载应用程序的依赖所需要的网络负载更加均匀得分散到集群中(Per-Job模式和Session模式,Client端负载会过于严重)。

注意:在Application模式中,main()是在集群中执行了,而不是像其它模式一样在客户端上执行。这可能会对你的程序有些影响,例如,使用registerCachedFile()在环境中注册的任何路径必须对于该应用的JobManager是可访问的。

相比于Per-Job模型,Application模式允许提交包含多个作业的应用。作业执行的顺序不受部署模式的影响,但会受到启动作业所使用的调用影响。execute()方法(它是阻塞式的)可以保证是顺序得,它会将“下一个”作业的执行拖延到“当前”作业完成。executeAsync()方法(它是非阻塞式的)会导致“下一个”作业会在“这个”作业完成之前就被启动了。

Per-Job模式

旨在提供更好的资源隔离的保障,Per-Job模式利用可用的资源调度框架(例如,YARN、Kubernetes)为每个提交的作业启动一个集群。集群只对该作业可用。当作业结束时,集群也会结束、任何lingering的资源(例如,文件、等)也会被清理。

该模式下,作业的main()方法在创建集群之前在Client端执行。客户端首先从集群管理器请求资源启动 JobManager,然后将作业提交给在这个进程中运行的 Dispatcher。然后根据作业的资源请求惰性的分配 TaskManager。一旦作业完成,Flink Job 集群将被拆除。

它提供了更好的资源隔离,因为一个行为可疑的作业只会搞垮它自己TaskManager。另外,它将记账的负担分散到了多个JobManager,因为每个Job都有一个JobManager。基于这些原因,Per-Job资源分配模式是很多生产环境的首选模式。

Session模式

Session模式假设有一个已经在运行的Flink集群,使用该集群的资源来执行执行的应用。同一个集群中执行的应用使用的同样的资源,因此就会对资源进行竞争。但是,如果其中有一个作业胡作非为或者搞垮了TaskManager,那么运行在那个TaskManager上的所有作业都会受到影响。另外,使用一个集群运行多个作业这意味着JobManager更大的负载,它负责对集群中的所有作业进行记账。

部署示例

下表给出了各个资源调度平台对于Flink三种提交模式的支持情况:

Application模式 Per-Job模式 Session模式
Standalone
YARN
Mesos
Kubernetes

Standalone

本篇旨在本地环境快速搭建Flink Standalone集群,从而快速一探究竟;关于如何在多设备上搭建分布式Standalone集群、以及docker和kubernetes部署,可以参考官方相关文档。

Application模式

第1步:准备应用程序jar

1
$ cp ./examples/streaming/TopSpeedWindowing.jar lib/

第2步:提交作业启动JobManager

1
$ ./bin/standalone-job.sh start --job-classname org.apache.flink.streaming.examples.windowing.TopSpeedWindowing

第3步:启动TaskManager(执行两次以下命令,启动两个TaskManager进程)

1
$ ./bin/taskmanager.sh start

第4步:使用jps查看本地机器上的Flink进程,会有如下三个相关进程

StandaloneApplicationClusterEntryPoint

TaskManagerRunner

TaskManagerRunner

第5步:使用localhost:8081查看Flink Web UI

第6步:停止集群

1
2
$ ./bin/taskmanager.sh stop-all
$ ./bin/standalone-job.sh stop

Per-Job模式

Standalone集群不支持Per-Job

Session模式

第1步:进行如下配置,conf/workers中有两行,意味着我们要在本地启动两个TaskManager

conf/masters

1
localhost

conf/workers

1
2
localhost
localhost

第2步:启动集群

1
$ ./bin/start-cluster.sh

第3步:使用jps查看本地机器上的Flink进程,会有如下3个相关进程:

StandaloneSessionClusterEntrypoint(注意,这里与Application模式的StandaloneApplicationClusterEntryPoint不同)

TaskManagerRunner

TaskManagerRunner

第4步:使用localhost:8081查看Flink Web UI,通过UI我们可以看到,Jobs - Running Jobs页面没有在运行的作业,Task Managers页面中的每个TaskManager行中的All Slots和Free Slots值是相等的。

第5步:向Flink集群提交作业

1
$ ./bin/flink run ./examples/streaming/TopSpeedWindowing.jar

这是使用jps再次查看本地机器上的Flink进程,会有如下4个相关进程:

CliFrontend

StandaloneSessionClusterEntrypoint

TaskManagerRunner

TaskManagerRunner

通过Web UI界面我们可以看到Jobs - Running Jobs页面有一个在运行的作业,Task Managers页面中的有一行的Free Slots值为0。

第6步:重复【第5步】,我们再提交一个作业,同样进行观察,这时Flink集群的Slots被全部占用了,并且多了一个CliFrontend进程。

CliFrontend

CliFrontend

StandaloneSessionClusterEntrypoint

TaskManagerRunner

TaskManagerRunner

第7步:停止集群

1
$ ./bin/stop-cluster.sh

YARN

该节的试验是在AWS EMR上进行的,集群设备描述如下:

节点类型和名称 私有IP地址 描述
MASTER
主实例组 - 1
172.31.13.9 主实例节点作为Flink提交节点
CORE
核心实例组 - 2
172.31.2.210
172.31.11.231
核心节点是YARN NodeManager运行节点

Application模式

第1步:相关配置

1
$ export HADOOP_CLASSPATH=`hadoop classpath`

第2步:提交作业启动集群

1
$ ./bin/flink run-application -t yarn-application ./examples/streaming/TopSpeedWindowing.jar

第3步:查看Flink Web UI、YARN ResourceManager UI

第4步:使用jps查看YARN各个节点上的Flink进程

节点说明 进程说明
提交节点(172.31.13.9) 无Flink相关进程
集群某个节点A(172.31.11.231) YarnApplicationClusterEntryPoint
集群某个节点B(172.31.2.210) YarnTaskExecutorRunner

Per-Job模式

第1步:相关配置

1
$ export HADOOP_CLASSPATH=`hadoop classpath`

第2步:提交作业启动集群

1
$ ./bin/flink run -t yarn-per-job --detached ./examples/streaming/TopSpeedWindowing.jar

第3步:查看Flink Web UI、YARN ResourceManager UI

第4步:使用jps查看YARN节点上的Flink进程

节点说明 进程说明
提交节点(172.31.13.9) 无Flink相关进程
集群某个节点A(172.31.11.231) YarnJobClusterEntrypoint
集群某个节点B(172.31.2.210) YarnTaskExecutorRunner

Session模式

第1步:相关配置

1
$ export HADOOP_CLASSPATH=`hadoop classpath`

第2步:启动集群

1
$ ./bin/yarn-session.sh --detached

第3步:查看Flink Web UI、YARN ResourceManager UI

第4步:使用jps查看YARN各个节点上的Flink进程

节点说明 进程说明
提交节点(172.31.13.9) 无Flink相关进程
集群某个节点A(172.31.2.210) YarnSessionClusterEntrypoint

第5步:提交作业

1
2
3
$ ./bin/flink run ./examples/streaming/TopSpeedWindowing.jar

$ ./bin/flink run -t yarn-session -Dyarn.application.id=application_XXXX_YY ./examples/streaming/TopSpeedWindowing.jar

第6步:查看Flink Web UI、YARN ResourceManager UI

第7步:使用jps查看YARN各个节点上的Flink进程

节点说明 进程说明
提交节点(172.31.13.9) CliFrontend
集群某个节点A(172.31.2.210) YarnSessionClusterEntrypoint
集群某个节点B(172.31.11.231) YarnTaskExecutorRunner

第8步:重复【第5步】,我们再提交一个作业,同样进行观察

节点说明 进程说明
提交节点(172.31.13.9) CliFrontend
CliFrontend
集群某个节点A(172.31.2.210) YarnSessionClusterEntrypoint
集群某个节点B(172.31.11.231) YarnTaskExecutorRunner
YarnTaskExecutorRunner

Mesos

TODO

Native Kubernetes

TODO

参考

注意,官方中文文档和英文文档内容并不是一致的,如resource-providers/standalone的中/英文文档介绍的内容是不大一致的。

  1. https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/
  2. https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/concepts/flink-architecture.html
  3. https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/resource-providers/standalone/
  4. https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone
  5. https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/docker.html
  6. https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/resource-providers/standalone/docker.html
  7. https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html
  8. https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/resource-providers/standalone/kubernetes.html