flink: 1.10.1
kubernetes: 1.18.8
1 前言
目前参与的项目基本开发完成,打算开发的微服务和Flink任务统一部署到k8s上。之前几个项目都是跑在YARN上,Flink on YARN的部署模式是在生产中比较常见,YARN的稳定性和资源调度能力也是有目共睹的,依托Hadoop生态Flink高可用也很容易实现。关于On YARN
还是On K8S之前文章都有介绍,《Flink Cluster On YARN部署》和《Flink Cluster On Kubernetes部署》两篇文章里分别介绍了Flink以session以及per-job两种模式的部署。这篇文章将重点介绍per-job模式下如何在K8S中开启高可用,关于K8s上的高可用部署官网并没有提供相关文档,这里参考了Yarn以及Standalone部署的HA方案。相关代码已经提交到Github:https://github.com/shirukai/flink-examples-k8s-cluster.git,可以先pull下代码,然后跟着文章一块阅读,项目中包括一个自定义的有状态的Flink任务,还有关于部署的一些资源定义。
2 准备镜像
Per-job模式这里采用将用户自定义的Jar打包进镜像的方式,在之前的文章里也有介绍过几种方式,这里采用基于官方镜像进行打包的方式。
首先准备好Dockerfile,文件在flink-example-k8s-cluster/kubernetes/Dockerfile
From flink:1.10.1
ADD *.jar /opt/flink/usrlib/
RUN chmod +x -R /opt/flink/usrlib
RUN chown flink:flink -R /opt/flink/usrlib
Dockerfile定义很简单,镜像基于flink:1.10.1,然后将我们开发的jar 复制到/opt/flink/usrlib/目录下,最后给目录授权。
构建镜像build.sh
cp ../target/flink-*.jar .
docker build -t flink-job:1.10.1-example .
3 NFS或其他网络存储
官网上在介绍Yarn或者Standalone高可用部署时都用到了HDFS,在k8s上我们就不考虑使用HDFS,我们使用k8s中的持久卷,将JobManager和TaskManager挂载相同的网络卷,以达到文件共享的目的。这里笔者为了测试方便,在k8s中使用的是mac操作系统中自带的NFS服务。关于mac上开启并在k8s中使用NFS服务可以参考我的另一篇文章《k8s使用mac上自带的NFS》。这里假设我们已有一个NFS服务,地址为20.5.1.21,共享目录为:Users/shirukai/opt/nfs_data。我们定义一个类型为PersistentVolume的k8s资源:
flink-nfs-pv.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
name: flinknfspv
spec:
mountOptions:
- nfsvers=3
- nolock
capacity:
storage: 10Gi
accessModes:
- ReadWriteOnce
persistentVolumeReclaimPolicy: Recycle
storageClassName: flink-nfs
nfs:
path: /Users/shirukai/opt/nfs_data
server: 20.5.1.21
定义一个类型为PersistentVolumeClaim的k8s资源:
flink-nfs-pvc.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: flinknfspvc
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 6Gi
storageClassName: flink-nfs
创建PV:
kubectl apply -f flink-nfs-pv.yaml
创建PVC
kubectl apply -f flink-nfs-pvc.yaml
PVC创建好之后,我们就可以在后面部署Flink时,通过挂载卷的方式持久化数据路。一共有三个地方需要用到持久卷:
- zookeeper数据目录,将pvc挂载到zookeeper的数据目录,用以持久化zk的数据,防止数据丢失
- flink的状态后端采用filesystem的方式,指定的state.checkpoints.dir目录需要持久化
- flink开启高可用的元数据存储指定的high-availability.storageDir目录需要持久化
4 Flink资源定义
接下来就是定义一些列k8s资源用以部署Flink集群,其中包括ConfigMap,Zookeeper的Deployment、Service,JobManager的Deployment、Service,TaskManager的Deployment,下面将分别进行详细介绍。
4.1 ConfigMap
统一将所有将来需要修改的配置都放到configmap里,包括flink的配置flink-conf.yaml,日志配置log4j.properties和log4j-console.properties,以及zookeeper的配置:zoo.cfg。
定义类型为ConfigMap的k8s资源:
flink-configuration-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
labels:
app: flink
data:
flink-conf.yaml: |+
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 1
parallelism.default: 2
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
jobmanager.heap.size: 1024m
taskmanager.memory.process.size: 1024m
state.backend: filesystem
state.checkpoints.dir: file:///data/flink/checkpoints
# 通过zookeeper开启HA
high-availability: zookeeper
# zk quorum的服务列表
high-availability.zookeeper.quorum: flink-zookeeper:2181
# 设置作业元数据的存储位置
high-availability.storageDir: file:///data/flink/ha
high-availability.jobmanager.port: 34560
metrics.internal.query-service.port: 34561
log4j.properties: |+
log4j.rootLogger=INFO, file
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
log4j-console.properties: |+
# This affects logging for both user code and Flink
log4j.rootLogger=INFO, console
# Uncomment this if you want to _only_ change Flink's logging
#log4j.logger.org.apache.flink=INFO
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO
# Log all infos to the console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console
zoo.cfg: |+
clientPort=2181
dataDir=/data/zookeeper/
dataLogDir=/data/zookeeper/log
4lw.commands.whitelist=*
重点注意flink-conf.yaml的中几个配置
state.backend: filesystem
state.checkpoints.dir: file:///data/flink/checkpoints
# 通过zookeeper开启HA
high-availability: zookeeper
# zk quorum的服务列表
high-availability.zookeeper.quorum: flink-zookeeper:2181
# 设置作业元数据的存储位置
high-availability.storageDir: file:///data/flink/ha
# 固定JobManager的端口(不指定的话默认会随机生成,会导致Pod之间无法访问),并需要通过service暴露出去,否则在TaskManager中将访问不到改端口
high-availability.jobmanager.port: 34560
metrics.internal.query-service.port: 34561
创建configmap
kubectl apply -f flink-configuration-configmap.yaml
4.2 Zookeeper
我们只开启一个单节点的zookeeper服务,需要定义Deployment和Service
4.2.1 zookeeper-deployment.yaml
定义Deployment如下:
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-zookeeper
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: zookeeper
template:
metadata:
labels:
app: flink
component: zookeeper
# -----
spec:
# restartPolicy: OnFailure
containers:
- name: zookeeper
image: zookeeper:3.6
ports:
- containerPort: 2181
name: port
livenessProbe:
tcpSocket:
port: 2181
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: zk-config-volume
mountPath: /conf
- name: flink-data
mountPath: /data
securityContext:
runAsUser: 9999
volumes:
- name: zk-config-volume
configMap:
name: flink-config
items:
- key: zoo.cfg
path: zoo.cfg
- name: flink-data
persistentVolumeClaim:
claimName: flinknfspvc
注意两点:
- 从名称flink-config的configmap中取出zoo.cfg配置文件挂载到容器的/conf目录下
- 将claim名称为flinknfspvc的持久卷挂载到容器的/data目录下
资源定义好后,创建Deployment
kubectl create -f zookeeper-deployment.yaml
进入容器,查看zookeeper是否正常
kubectl exec -it flink-zookeeper-6c698d5cc5-8v8tv -- bin/zkCli.sh
4.2.2 zookeeper-service.yaml
为zookeeper定义Service,向外暴露2181端口,其它的pod可以通过flink-zookeeper:2181的地址进行访问。
apiVersion: v1
kind: Service
metadata:
name: flink-zookeeper
spec:
type: ClusterIP
ports:
- name: port
port: 2181
selector:
app: flink
component: zookeeper
创建Service
kubectl create -f zookeeper-service.yaml
4.3 JobManager
JobManager需要定义三个资源,一个是用以部署的Deployment,另外是Pod间能够访问的Service,最后是对外能够通过NodPort的方式暴露REST UI的Service。
4.3.1 jobmanager-job-deployment.yaml
JobManager的Deployment定义,需要指定我们自己构建的镜像,并且在重写启动命令,指定job-class以及相关参数。
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
# -----
spec:
# restartPolicy: OnFailure
containers:
- name: jobmanager
image: flink-job:1.10.1-example
command: ["/bin/bash", "-c", "$FLINK_HOME/bin/standalone-job.sh start --job-classname flink.k8s.cluster.examples.EventCounterJob --socket-host 20.5.1.21;
sleep 3;
tail -f $FLINK_HOME/log/*.log"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
# 需要指定该端口,否则将随机生成,导致TaskManager与JobManager通信异常
- containerPort: 34560
name: ha
- containerPort: 34561
name: query
livenessProbe:
tcpSocket:
port: 6124
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
- name: flink-data
mountPath: /data
# - name: job-artifacts-volume
# mountPath: /opt/flink/usrlib
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j.properties
path: log4j.properties
- key: log4j-console.properties
path: log4j-console.properties
- name: flink-data
persistentVolumeClaim:
claimName: flinknfspvc
# - name: job-artifacts-volume
# hostPath:
# path: /host/path/to/job/artifacts
同样注意两点:
- 从名称flink-config的configmap中取出flink-conf.yaml、log4j.properties、log4j-console.properties配置文件挂载到容器的/opt/flink/conf目录下
- 将claim名称为flinknfspvc的持久卷挂载到容器的/data目录下
创建Deployment
kubectl create -f jobmanager-job-deployment.yaml
4.3.2 jobmanager-service.yaml
JobManager的Service定义也相对比较简单,需要暴露出指定的几个端口即可
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager
spec:
type: ClusterIP
ports:
- name: rpc
port: 6123
- name: blob-server
port: 6124
- name: webui
port: 8081
- name: ha
port: 34560
- name: query
port: 34561
selector:
app: flink
component: jobmanager
创建Service
kubectl create -f jobmanager-service.yaml
4.3.3 jobmanager-rest-service.yaml
JobManager REST的Service定义,主要是通过NodPort的方式,将UI端口暴露出来,方便我们通过节点IP+端口号的方式访问Flink 的UI。
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager-rest
spec:
type: NodePort
ports:
- name: rest
port: 8081
targetPort: 8081
nodePort: 30081
selector:
app: flink
component: jobmanager
创建Service
kubectl create -f jobmanager-rest-service.yaml
此时我们就可以通过节点IP:30081访问Flink Web UI
通过UI界面可以看到,我们已经有一个Flink Job在执行了,但是出于RESTARTING状态,这是由于我们的TaskManager还没有启动,Job没有足够的Slot的去执行每一个Task,接下来我们需要去定义TaskManager。
4.4 TaskManager
TaskManager只需要定义一个Deployment资源即可。
4.4.1 taskmanager-job-deployment.yaml
TaskManager的Deployment定义与JobManager几乎相同,只是启动命令有所不同。
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
spec:
replicas: 2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: flink-job:1.10.1-example
command: ["/bin/bash", "-c", "$FLINK_HOME/bin/taskmanager.sh start;
sleep 3;
tail -f $FLINK_HOME/log/*.log"]
ports:
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query-state
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
- name: flink-data
mountPath: /data
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j.properties
path: log4j.properties
- key: log4j-console.properties
path: log4j-console.properties
- name: flink-data
persistentVolumeClaim:
claimName: flinknfspvc
注意:
这里在Deployment里指定了2个副本,意味着我们将启动两个Pod,因为我们再flink-conf.yaml中指定了默认的并行度为2,所以我们需要两个slort来执行我们的Flink task。
创建Deployment
kubectl create -f taskmanager-job-deployment.yaml
等Taskmanager两个容器启动之后,可以在WebUI上看到TaskManager数量已经变成了2,Job的状态也变为Running状态了
5 高可用验证
通过上面的步骤我们已经能够在k8s上运行一个per-job模式的flink集群,并进行了高可用的配置。这一章节将进行高可用的验证,主要包括TaskManager意外退出、JobManager意外退出、整个集群意外退出的验证。
5.1 开发一个带状态的Flink任务
为了进行更直观的验证,我们需要开发一个带状态的Flink任务,方便观察各个服务意外退出之后Flink能否从checkpoint中进行状态恢复。在项目scala代码中的flink.k8s.cluster.examples包下查看EventCounterJob类
package flink.k8s.cluster.examples
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector
import org.apache.flink.api.scala._
/**
* 实时计算事件总个数,以及value总和
*
* @author shirukai
*/
object EventCounterJob {
def main(args: Array[String]): Unit = {
// 获取执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val params: ParameterTool = ParameterTool.fromArgs(args)
env.getConfig.setGlobalJobParameters(params)
// 配置checkpoint
// 做两个checkpoint的间隔为1秒
env.enableCheckpointing(10000)
// 表示下 Cancel 时是否需要保留当前的 Checkpoint,默认 Checkpoint 会在整个作业 Cancel 时被删除。Checkpoint 是作业级别的保存点。
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
// 1. 从socket中接收文本数据
val streamText: DataStream[String] = env.socketTextStream(params.get("socket-host","0.0.0.0"), 9000)
.uid("SocketSource")
// 2. 将文本内容按照空格分割转换为事件样例类
val events = streamText.map(s => {
val tokens = s.split(" ")
Event(tokens(0), tokens(1).toDouble, tokens(2).toLong)
}).uid("String2CaseClass")
// 3. 按照事件id分区,然后进行聚合统计
val counterResult = events.keyBy(_.id)
.process(new EventCounterProcessFunction)
.uid("EventCounter")
// 4. 结果输出到控制台
counterResult.print().uid("Printer")
env.execute("EventCounterJob")
}
}
/**
* 定义事件样例类
*
* @param id 事件类型id
* @param value 事件值
* @param time 事件时间
*/
case class Event(id: String, value: Double, time: Long)
/**
* 定义事件统计器样例类
*
* @param id 事件类型id
* @param sum 事件值总和
* @param count 事件个数
*/
case class EventCounter(id: String, var sum: Double, var count: Int)
/**
* 继承KeyedProcessFunction实现事件统计
*/
class EventCounterProcessFunction extends KeyedProcessFunction[String, Event, EventCounter] {
private var counterState: ValueState[EventCounter] = _
override def open(parameters: Configuration): Unit = {
super.open(parameters)
// 从flink上下文中获取状态
counterState = getRuntimeContext.getState(new ValueStateDescriptor[EventCounter]("event-counter", classOf[EventCounter]))
}
override def processElement(i: Event,
context: KeyedProcessFunction[String, Event, EventCounter]#Context,
collector: Collector[EventCounter]): Unit = {
// 从状态中获取统计器,如果统计器不存在给定一个初始值
val counter = Option(counterState.value()).getOrElse(EventCounter(i.id, 0.0, 0))
// 统计聚合
counter.count += 1
counter.sum += i.value
// 发送结果到下游
collector.collect(counter)
// 保存状态
counterState.update(counter)
}
}
代码逻辑相对比较简单,主要包括下面几个过程:
- 使用socketTextStream算子监听指定的Socket端口,用以接收文本数据
- 使用map算子将接收的文本数据转换为事件样例类
- 将事件按照id进行分区
- 分区后的事件使用自定义的KeyedProcess算子进行统计聚合操作
- 将结果输出控制台
注意:
在运行flink任务之前,需要使用nc -lk 9000
命令监听9000端口,模拟Socket服务端
5.2 高可用验证:TaskManager意外退出
在进行验证之前,确保flink任务已经在集群中正常运行。
1 在nc -lk 9000的命令行控制台输入一条数据:
event-1 1.0 1604280262000
2 查看TaskManager的输出,符合预期:
3 在nc -lk 9000的命令行控制台再输入一条数据:
event-1 2.0 1604280263000
4 刷新TaskManager的标准输出,符合预期:
5 现在将两个TaskManager的Pod删除
(base) shirukai@shirukaideMacBook-Pro nfs_data % kubectl get pods
NAME READY STATUS RESTARTS AGE
flink-jobmanager-5cfcd4b77f-45pl2 1/1 Running 0 43m
flink-taskmanager-5db58d75f5-lvpd4 1/1 Running 0 43m
flink-taskmanager-5db58d75f5-rbtb8 1/1 Running 0 43m
flink-zookeeper-6c698d5cc5-mm7g9 1/1 Running 0 43m
(base) shirukai@shirukaideMacBook-Pro nfs_data % kubectl delete pod flink-taskmanager-5db58d75f5-lvpd4
pod "flink-taskmanager-5db58d75f5-lvpd4" deleted
(base) shirukai@shirukaideMacBook-Pro nfs_data % kubectl delete pod flink-taskmanager-5db58d75f5-rbtb8
pod "flink-taskmanager-5db58d75f5-rbtb8" deleted
删除Pod之后,可以看到当前的Flink任务处于一个RESTARING状态,稍等片刻后会重新变为RUNNING状态
6 在nc -lk 9000的命令行控制台重新输入一条数据:
event-1 3.0 1604280264000
7 刷新TaskManager的标准输出,发现当前的统计结果是在之前的结果基础上继续进行的:
5.3 高可用验证:JobManager意外退出
开始之前,需要停止之前集群,并清理掉NFS中的内容,然后启动集群。
1 在nc -lk 9000的命令行控制台输入一条数据:
event-1 1.0 1604280262000
2 查看TaskManager的输出,符合预期:
3 在nc -lk 9000的命令行控制台再输入一条数据:
event-1 2.0 1604280263000
4 刷新TaskManager的标准输出,符合预期:
5 现在将JobManager的Pod删除
(base) shirukai@shirukaideMacBook-Pro nfs_data % kubectl get pods
NAME READY STATUS RESTARTS AGE
flink-jobmanager-5cfcd4b77f-jqjbr 1/1 Running 0 105s
flink-taskmanager-5db58d75f5-4sp9d 1/1 Running 0 103s
flink-taskmanager-5db58d75f5-dg6d6 1/1 Running 0 103s
flink-zookeeper-6c698d5cc5-qnhk4 1/1 Running 0 105s
(base) shirukai@shirukaideMacBook-Pro nfs_data % kubectl delete pod flink-jobmanager-5cfcd4b77f-jqjbr
pod "flink-jobmanager-5cfcd4b77f-jqjbr" deleted
删除Pod之后,等待JobManager启动,稍等片刻后会Flink任务会重新变为RUNNING状态
6 在nc -lk 9000的命令行控制台重新输入一条数据:
event-1 3.0 1604280264000
7 刷新TaskManager的标准输出,发现当前的统计结果是在之前的结果基础上继续进行的:
5.4 高可用验证:整个集群意外退出
这个比较简单,在上面的基础上进行就可以了,现在将集群停止,并重新启动。
1 在nc -lk 9000的命令行控制台输入一条数据:
event-1 4.0 1604280265000
2 刷新TaskManager的标准输出,符合预期: