FLINK November 02, 2020

Flink在k8s生产环境高可用部署

Words count 25k Reading time 23 mins. Read count 0

flink: 1.10.1

kubernetes: 1.18.8

1 前言

目前参与的项目基本开发完成,打算开发的微服务和Flink任务统一部署到k8s上。之前几个项目都是跑在YARN上,Flink on YARN的部署模式是在生产中比较常见,YARN的稳定性和资源调度能力也是有目共睹的,依托Hadoop生态Flink高可用也很容易实现。关于On YARN

还是On K8S之前文章都有介绍,《Flink Cluster On YARN部署》和《Flink Cluster On Kubernetes部署》两篇文章里分别介绍了Flink以session以及per-job两种模式的部署。这篇文章将重点介绍per-job模式下如何在K8S中开启高可用,关于K8s上的高可用部署官网并没有提供相关文档,这里参考了Yarn以及Standalone部署的HA方案。相关代码已经提交到Github:https://github.com/shirukai/flink-examples-k8s-cluster.git,可以先pull下代码,然后跟着文章一块阅读,项目中包括一个自定义的有状态的Flink任务,还有关于部署的一些资源定义。

2 准备镜像

Per-job模式这里采用将用户自定义的Jar打包进镜像的方式,在之前的文章里也有介绍过几种方式,这里采用基于官方镜像进行打包的方式。

首先准备好Dockerfile,文件在flink-example-k8s-cluster/kubernetes/Dockerfile

From flink:1.10.1

ADD *.jar /opt/flink/usrlib/
RUN chmod +x -R /opt/flink/usrlib
RUN chown flink:flink -R /opt/flink/usrlib

Dockerfile定义很简单,镜像基于flink:1.10.1,然后将我们开发的jar 复制到/opt/flink/usrlib/目录下,最后给目录授权。

构建镜像build.sh

cp ../target/flink-*.jar .
docker build -t flink-job:1.10.1-example .

image-20201031152707683

3 NFS或其他网络存储

官网上在介绍Yarn或者Standalone高可用部署时都用到了HDFS,在k8s上我们就不考虑使用HDFS,我们使用k8s中的持久卷,将JobManager和TaskManager挂载相同的网络卷,以达到文件共享的目的。这里笔者为了测试方便,在k8s中使用的是mac操作系统中自带的NFS服务。关于mac上开启并在k8s中使用NFS服务可以参考我的另一篇文章《k8s使用mac上自带的NFS》。这里假设我们已有一个NFS服务,地址为20.5.1.21,共享目录为:Users/shirukai/opt/nfs_data。我们定义一个类型为PersistentVolume的k8s资源:

flink-nfs-pv.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: flinknfspv
spec:
  mountOptions:
    - nfsvers=3
    - nolock
  capacity:
    storage: 10Gi
  accessModes:
    - ReadWriteOnce
  persistentVolumeReclaimPolicy: Recycle
  storageClassName: flink-nfs
  nfs:
    path: /Users/shirukai/opt/nfs_data
    server: 20.5.1.21

定义一个类型为PersistentVolumeClaim的k8s资源:

flink-nfs-pvc.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: flinknfspvc
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 6Gi
  storageClassName: flink-nfs

创建PV:

kubectl apply -f flink-nfs-pv.yaml

image-20201031155539297

创建PVC

kubectl apply -f flink-nfs-pvc.yaml

image-20201031155711597

PVC创建好之后,我们就可以在后面部署Flink时,通过挂载卷的方式持久化数据路。一共有三个地方需要用到持久卷:

  1. zookeeper数据目录,将pvc挂载到zookeeper的数据目录,用以持久化zk的数据,防止数据丢失
  2. flink的状态后端采用filesystem的方式,指定的state.checkpoints.dir目录需要持久化
  3. flink开启高可用的元数据存储指定的high-availability.storageDir目录需要持久化

4 Flink资源定义

接下来就是定义一些列k8s资源用以部署Flink集群,其中包括ConfigMap,Zookeeper的Deployment、Service,JobManager的Deployment、Service,TaskManager的Deployment,下面将分别进行详细介绍。

4.1 ConfigMap

统一将所有将来需要修改的配置都放到configmap里,包括flink的配置flink-conf.yaml,日志配置log4j.properties和log4j-console.properties,以及zookeeper的配置:zoo.cfg。

定义类型为ConfigMap的k8s资源:

flink-configuration-configmap.yaml

apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |+
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 1
    parallelism.default: 2
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    jobmanager.heap.size: 1024m
    taskmanager.memory.process.size: 1024m
    state.backend: filesystem
    state.checkpoints.dir: file:///data/flink/checkpoints
    # 通过zookeeper开启HA
    high-availability: zookeeper
    # zk quorum的服务列表
    high-availability.zookeeper.quorum: flink-zookeeper:2181
    # 设置作业元数据的存储位置
    high-availability.storageDir: file:///data/flink/ha
    high-availability.jobmanager.port: 34560
    metrics.internal.query-service.port: 34561

  log4j.properties: |+
    log4j.rootLogger=INFO, file
    log4j.logger.akka=INFO
    log4j.logger.org.apache.kafka=INFO
    log4j.logger.org.apache.hadoop=INFO
    log4j.logger.org.apache.zookeeper=INFO
    log4j.appender.file=org.apache.log4j.FileAppender
    log4j.appender.file.file=${log.file}
    log4j.appender.file.layout=org.apache.log4j.PatternLayout
    log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
  log4j-console.properties: |+
    # This affects logging for both user code and Flink
    log4j.rootLogger=INFO, console

    # Uncomment this if you want to _only_ change Flink's logging
    #log4j.logger.org.apache.flink=INFO

    # The following lines keep the log level of common libraries/connectors on
    # log level INFO. The root logger does not override this. You have to manually
    # change the log levels here.
    log4j.logger.akka=INFO
    log4j.logger.org.apache.kafka=INFO
    log4j.logger.org.apache.hadoop=INFO
    log4j.logger.org.apache.zookeeper=INFO

    # Log all infos to the console
    log4j.appender.console=org.apache.log4j.ConsoleAppender
    log4j.appender.console.layout=org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

    # Suppress the irrelevant (wrong) warnings from the Netty channel handler
    log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console
  zoo.cfg: |+
    clientPort=2181
    dataDir=/data/zookeeper/
    dataLogDir=/data/zookeeper/log
    4lw.commands.whitelist=*

重点注意flink-conf.yaml的中几个配置

    state.backend: filesystem
    state.checkpoints.dir: file:///data/flink/checkpoints
    # 通过zookeeper开启HA
    high-availability: zookeeper
    # zk quorum的服务列表
    high-availability.zookeeper.quorum: flink-zookeeper:2181
    # 设置作业元数据的存储位置
    high-availability.storageDir: file:///data/flink/ha
    # 固定JobManager的端口(不指定的话默认会随机生成,会导致Pod之间无法访问),并需要通过service暴露出去,否则在TaskManager中将访问不到改端口
    high-availability.jobmanager.port: 34560
    metrics.internal.query-service.port: 34561

创建configmap

kubectl apply -f flink-configuration-configmap.yaml

image-20201031161900149

4.2 Zookeeper

我们只开启一个单节点的zookeeper服务,需要定义Deployment和Service

4.2.1 zookeeper-deployment.yaml

定义Deployment如下:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-zookeeper
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: zookeeper
  template:
    metadata:
      labels:
        app: flink
        component: zookeeper

    # -----
    spec:
      # restartPolicy: OnFailure
      containers:
        - name: zookeeper
          image: zookeeper:3.6
          ports:
            - containerPort: 2181
              name: port
          livenessProbe:
            tcpSocket:
              port: 2181
            initialDelaySeconds: 30
            periodSeconds: 60
          volumeMounts:
            - name: zk-config-volume
              mountPath: /conf
            - name: flink-data
              mountPath: /data

          securityContext:
            runAsUser: 9999
      volumes:
        - name: zk-config-volume
          configMap:
            name: flink-config
            items:
              - key: zoo.cfg
                path: zoo.cfg
        - name: flink-data
          persistentVolumeClaim:
            claimName: flinknfspvc

注意两点:

  1. 从名称flink-config的configmap中取出zoo.cfg配置文件挂载到容器的/conf目录下
  2. 将claim名称为flinknfspvc的持久卷挂载到容器的/data目录下

资源定义好后,创建Deployment

kubectl create -f zookeeper-deployment.yaml

image-20201031162318722

进入容器,查看zookeeper是否正常

 kubectl exec -it flink-zookeeper-6c698d5cc5-8v8tv -- bin/zkCli.sh

4.2.2 zookeeper-service.yaml

为zookeeper定义Service,向外暴露2181端口,其它的pod可以通过flink-zookeeper:2181的地址进行访问。

apiVersion: v1
kind: Service
metadata:
  name: flink-zookeeper
spec:
  type: ClusterIP
  ports:
    - name: port
      port: 2181
  selector:
    app: flink
    component: zookeeper

创建Service

kubectl create -f zookeeper-service.yaml 

image-20201031163258250

4.3 JobManager

JobManager需要定义三个资源,一个是用以部署的Deployment,另外是Pod间能够访问的Service,最后是对外能够通过NodPort的方式暴露REST UI的Service。

4.3.1 jobmanager-job-deployment.yaml

JobManager的Deployment定义,需要指定我们自己构建的镜像,并且在重写启动命令,指定job-class以及相关参数。

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager

    # -----
    spec:
      # restartPolicy: OnFailure
      containers:
        - name: jobmanager
          image: flink-job:1.10.1-example
          command: ["/bin/bash", "-c", "$FLINK_HOME/bin/standalone-job.sh start --job-classname flink.k8s.cluster.examples.EventCounterJob --socket-host 20.5.1.21;
          sleep 3;
          tail -f $FLINK_HOME/log/*.log"]
          ports:
            - containerPort: 6123
              name: rpc
            - containerPort: 6124
              name: blob-server
            - containerPort: 8081
              name: webui
            # 需要指定该端口,否则将随机生成,导致TaskManager与JobManager通信异常
            - containerPort: 34560
              name: ha
            - containerPort: 34561
              name: query
          livenessProbe:
            tcpSocket:
              port: 6124
            initialDelaySeconds: 30
            periodSeconds: 60
          volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/flink/conf
            - name: flink-data
              mountPath: /data
           # - name: job-artifacts-volume
           #   mountPath: /opt/flink/usrlib
          securityContext:
            runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
        - name: flink-config-volume
          configMap:
            name: flink-config
            items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j.properties
                path: log4j.properties
              - key: log4j-console.properties
                path: log4j-console.properties
        - name: flink-data
          persistentVolumeClaim:
            claimName: flinknfspvc
      #  - name: job-artifacts-volume
       #   hostPath:
        #    path: /host/path/to/job/artifacts

同样注意两点:

  1. 从名称flink-config的configmap中取出flink-conf.yaml、log4j.properties、log4j-console.properties配置文件挂载到容器的/opt/flink/conf目录下
  2. 将claim名称为flinknfspvc的持久卷挂载到容器的/data目录下

创建Deployment

kubectl create -f jobmanager-job-deployment.yaml

image-20201031163954003

4.3.2 jobmanager-service.yaml

JobManager的Service定义也相对比较简单,需要暴露出指定的几个端口即可

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
spec:
  type: ClusterIP
  ports:
  - name: rpc
    port: 6123
  - name: blob-server
    port: 6124
  - name: webui
    port: 8081
  - name: ha
    port: 34560
  - name: query
    port: 34561
  selector:
    app: flink
    component: jobmanager

创建Service

kubectl create -f jobmanager-service.yaml

4.3.3 jobmanager-rest-service.yaml

JobManager REST的Service定义,主要是通过NodPort的方式,将UI端口暴露出来,方便我们通过节点IP+端口号的方式访问Flink 的UI。

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager-rest
spec:
  type: NodePort
  ports:
  - name: rest
    port: 8081
    targetPort: 8081
    nodePort: 30081
  selector:
    app: flink
    component: jobmanager

创建Service

kubectl create -f jobmanager-rest-service.yaml

此时我们就可以通过节点IP:30081访问Flink Web UI

image-20201031164551041

通过UI界面可以看到,我们已经有一个Flink Job在执行了,但是出于RESTARTING状态,这是由于我们的TaskManager还没有启动,Job没有足够的Slot的去执行每一个Task,接下来我们需要去定义TaskManager。

4.4 TaskManager

TaskManager只需要定义一个Deployment资源即可。

4.4.1 taskmanager-job-deployment.yaml

TaskManager的Deployment定义与JobManager几乎相同,只是启动命令有所不同。

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: flink-job:1.10.1-example
        command: ["/bin/bash", "-c", "$FLINK_HOME/bin/taskmanager.sh start;
        sleep 3;
        tail -f $FLINK_HOME/log/*.log"]
        ports:
        - containerPort: 6122
          name: rpc
        - containerPort: 6125
          name: query-state
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        - name: flink-data
          mountPath: /data
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j.properties
            path: log4j.properties
          - key: log4j-console.properties
            path: log4j-console.properties
      - name: flink-data
        persistentVolumeClaim:
          claimName: flinknfspvc

注意:

这里在Deployment里指定了2个副本,意味着我们将启动两个Pod,因为我们再flink-conf.yaml中指定了默认的并行度为2,所以我们需要两个slort来执行我们的Flink task。

创建Deployment

kubectl create -f taskmanager-job-deployment.yaml

image-20201031165307896

image-20201031170815866

等Taskmanager两个容器启动之后,可以在WebUI上看到TaskManager数量已经变成了2,Job的状态也变为Running状态了

5 高可用验证

通过上面的步骤我们已经能够在k8s上运行一个per-job模式的flink集群,并进行了高可用的配置。这一章节将进行高可用的验证,主要包括TaskManager意外退出、JobManager意外退出、整个集群意外退出的验证。

5.1 开发一个带状态的Flink任务

为了进行更直观的验证,我们需要开发一个带状态的Flink任务,方便观察各个服务意外退出之后Flink能否从checkpoint中进行状态恢复。在项目scala代码中的flink.k8s.cluster.examples包下查看EventCounterJob类

package flink.k8s.cluster.examples

import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.util.Collector
import org.apache.flink.api.scala._

/**
 * 实时计算事件总个数,以及value总和
 *
 * @author shirukai
 */

object EventCounterJob {

  def main(args: Array[String]): Unit = {

    // 获取执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val params: ParameterTool = ParameterTool.fromArgs(args)


    env.getConfig.setGlobalJobParameters(params)

    // 配置checkpoint
    // 做两个checkpoint的间隔为1秒
    env.enableCheckpointing(10000)
    // 表示下 Cancel 时是否需要保留当前的 Checkpoint,默认 Checkpoint 会在整个作业 Cancel 时被删除。Checkpoint 是作业级别的保存点。
    env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)


    // 1. 从socket中接收文本数据
    val streamText: DataStream[String] = env.socketTextStream(params.get("socket-host","0.0.0.0"), 9000)
      .uid("SocketSource")

    // 2. 将文本内容按照空格分割转换为事件样例类
    val events = streamText.map(s => {
      val tokens = s.split(" ")
      Event(tokens(0), tokens(1).toDouble, tokens(2).toLong)
    }).uid("String2CaseClass")
    // 3. 按照事件id分区,然后进行聚合统计
    val counterResult = events.keyBy(_.id)
      .process(new EventCounterProcessFunction)
      .uid("EventCounter")

    // 4. 结果输出到控制台
    counterResult.print().uid("Printer")

    env.execute("EventCounterJob")
  }
}

/**
 * 定义事件样例类
 *
 * @param id    事件类型id
 * @param value 事件值
 * @param time  事件时间
 */
case class Event(id: String, value: Double, time: Long)

/**
 * 定义事件统计器样例类
 *
 * @param id    事件类型id
 * @param sum   事件值总和
 * @param count 事件个数
 */
case class EventCounter(id: String, var sum: Double, var count: Int)

/**
 * 继承KeyedProcessFunction实现事件统计
 */
class EventCounterProcessFunction extends KeyedProcessFunction[String, Event, EventCounter] {
  private var counterState: ValueState[EventCounter] = _

  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    // 从flink上下文中获取状态
    counterState = getRuntimeContext.getState(new ValueStateDescriptor[EventCounter]("event-counter", classOf[EventCounter]))
  }

  override def processElement(i: Event,
                              context: KeyedProcessFunction[String, Event, EventCounter]#Context,
                              collector: Collector[EventCounter]): Unit = {

    // 从状态中获取统计器,如果统计器不存在给定一个初始值
    val counter = Option(counterState.value()).getOrElse(EventCounter(i.id, 0.0, 0))

    // 统计聚合
    counter.count += 1
    counter.sum += i.value

    // 发送结果到下游
    collector.collect(counter)

    // 保存状态
    counterState.update(counter)

  }
}

代码逻辑相对比较简单,主要包括下面几个过程:

  1. 使用socketTextStream算子监听指定的Socket端口,用以接收文本数据
  2. 使用map算子将接收的文本数据转换为事件样例类
  3. 将事件按照id进行分区
  4. 分区后的事件使用自定义的KeyedProcess算子进行统计聚合操作
  5. 将结果输出控制台

注意:

在运行flink任务之前,需要使用nc -lk 9000命令监听9000端口,模拟Socket服务端

5.2 高可用验证:TaskManager意外退出

在进行验证之前,确保flink任务已经在集群中正常运行。

1 在nc -lk 9000的命令行控制台输入一条数据:

event-1 1.0 1604280262000

2 查看TaskManager的输出,符合预期:

image-20201102093112281

3 在nc -lk 9000的命令行控制台再输入一条数据:

event-1 2.0 1604280263000

4 刷新TaskManager的标准输出,符合预期:

image-20201102093329211

5 现在将两个TaskManager的Pod删除

(base) shirukai@shirukaideMacBook-Pro nfs_data % kubectl get pods                                             
NAME                                 READY   STATUS    RESTARTS   AGE
flink-jobmanager-5cfcd4b77f-45pl2    1/1     Running   0          43m
flink-taskmanager-5db58d75f5-lvpd4   1/1     Running   0          43m
flink-taskmanager-5db58d75f5-rbtb8   1/1     Running   0          43m
flink-zookeeper-6c698d5cc5-mm7g9     1/1     Running   0          43m
(base) shirukai@shirukaideMacBook-Pro nfs_data % kubectl delete pod flink-taskmanager-5db58d75f5-lvpd4
pod "flink-taskmanager-5db58d75f5-lvpd4" deleted
(base) shirukai@shirukaideMacBook-Pro nfs_data % kubectl delete pod flink-taskmanager-5db58d75f5-rbtb8
pod "flink-taskmanager-5db58d75f5-rbtb8" deleted

删除Pod之后,可以看到当前的Flink任务处于一个RESTARING状态,稍等片刻后会重新变为RUNNING状态

image-20201102100937047

6 在nc -lk 9000的命令行控制台重新输入一条数据:

event-1 3.0 1604280264000

7 刷新TaskManager的标准输出,发现当前的统计结果是在之前的结果基础上继续进行的:

image-20201102101316152

5.3 高可用验证:JobManager意外退出

开始之前,需要停止之前集群,并清理掉NFS中的内容,然后启动集群。

1 在nc -lk 9000的命令行控制台输入一条数据:

event-1 1.0 1604280262000

2 查看TaskManager的输出,符合预期:

image-20201102093112281

3 在nc -lk 9000的命令行控制台再输入一条数据:

event-1 2.0 1604280263000

4 刷新TaskManager的标准输出,符合预期:

image-20201102093329211

5 现在将JobManager的Pod删除

(base) shirukai@shirukaideMacBook-Pro nfs_data % kubectl get pods                                     
NAME                                 READY   STATUS    RESTARTS   AGE
flink-jobmanager-5cfcd4b77f-jqjbr    1/1     Running   0          105s
flink-taskmanager-5db58d75f5-4sp9d   1/1     Running   0          103s
flink-taskmanager-5db58d75f5-dg6d6   1/1     Running   0          103s
flink-zookeeper-6c698d5cc5-qnhk4     1/1     Running   0          105s
(base) shirukai@shirukaideMacBook-Pro nfs_data % kubectl delete pod flink-jobmanager-5cfcd4b77f-jqjbr
pod "flink-jobmanager-5cfcd4b77f-jqjbr" deleted

删除Pod之后,等待JobManager启动,稍等片刻后会Flink任务会重新变为RUNNING状态

image-20201102105234827

6 在nc -lk 9000的命令行控制台重新输入一条数据:

event-1 3.0 1604280264000

7 刷新TaskManager的标准输出,发现当前的统计结果是在之前的结果基础上继续进行的:

image-20201102105146738

5.4 高可用验证:整个集群意外退出

这个比较简单,在上面的基础上进行就可以了,现在将集群停止,并重新启动。

1 在nc -lk 9000的命令行控制台输入一条数据:

event-1 4.0 1604280265000

2 刷新TaskManager的标准输出,符合预期:

image-20201102105543564

0%