FLINK October 31, 2020

Flink Cluster On Kubernetes部署

Words count 23k Reading time 21 mins. Read count 0

Flink版本:1.10.1

kubernetes:1.16.5

Flink 在Kubernetes上部署分为Job cluster和Session cluster两种模式。Job cluster需要我们将自己的Jar打到flink镜像里一块部署,session模式可以启动cluster之后,我们再提交jar到session cluster。

1 环境准备

在YARN模式部署的文章里,我们是直接从官网下载编译后的包进行部署的。由于Job Cluster模式需要我们重新打镜像,在环境准备这一部分,我们尝试从github上拉去flink源码手动编译一下。

1.1 从Github上拉取Flink源码

  1. 进入 flink github,地址:https://github.com/apache/flink

  2. 点击【Releases】,查找我们需要下载版本的包,例如我们需要下载1.10.1的源码,我们就在Releases页面下载release-1.10.1这个包

    wget https://github.com/apache/flink/archive/release-1.10.1.tar.gz
    

    小技巧:由于网络原因我们可能下载代码失败,我们可以注册一个码云账号https://gitee.com/,在码云上新创建一个仓库,在导入已有仓库选项中将flink在github上的地址填入进去,最后我们从码云上下载代码就很快了。

  3. 下载代码后进行解压

    tar -zxvf flink-release-1.10.1.tar.gz
    

1.2 编译

  1. 进入解压目录执行如下Maven命令进行编译

    mvn clean package -DskipTests -Dfast
    

    编译过程中可能遇到如下错误:

    Failure to find io.confluent:kafka-schema-registry-client:jar:3.3.1

    [ERROR] Failed to execute goal on project flink-avro-confluent-registry: Could not resolve dependencies for project org.apache.flink:flink-avro-confluent-registry:jar:1.8.2: Failure to find io.confluent:kafka-schema-registry-client:jar:3.3.1 in https://maven.aliyun.com/repository/public was cached in the local repository, resolution will not be reattempted until the update interval of aliyunmaven has elapsed or updates are forced -> [Help 1]
    [ERROR] 
    [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
    [ERROR] Re-run Maven using the -X switch to enable full debug logging.
    [ERROR] 
    [ERROR] For more information about the errors and possible solutions, please read the following articles:
    [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/DependencyResolutionException
    [ERROR] 
    [ERROR] After correcting the problems, you can resume the build with the command
    [ERROR]   mvn <goals> -rf :flink-avro-confluent-registry
    

    只是由于在maven仓库中找不到kafka-schema-registry-client:jar:3.3.1导致的,这里我尝试换用官方仓库和阿里仓库都没有找到该报,通过查阅资料,我们可以手动下载该包,然后安装到我们的maven仓库中。

    在flink代码目录创建lib包,用来存放下载的JAR,然后下载从io.confluent官网下载对应版本的JAR

    mkdir lib
    cd lib
    wget http://packages.confluent.io/maven/io/confluent/kafka-schema-registry-client/3.3.1/kafka-schema-registry-client-3.3.1.jar
    

    然后使用maven命令安装该JAR到本地仓库

    mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-schema-registry-client -Dversion=3.3.1 -Dpackaging=jar -Dfile=kafka-schema-registry-client-3.3.1.jar
    

    重新尝试编译。

    image-20200922142327350

  2. build-target目录下就是我们编译后的文件

1.3 Kubernetes环境

我们也需要k8s环境,可以尝试在自己电脑上使用docker on desktop安装kuberneteshttps://www.docker.com/products/docker-desktop,这里就不详细介绍,可以参考:https://blog.csdn.net/shirukai/article/details/103512497在自己的电脑上搭建k8s环境。

2 Flink session cluster on Kubernetes

Flink Session cluster是作为长期运行的Kubernetes Deployment。一个session cluster可以提交多个Flink job,集群部署后,需要将Job提交到集群。

一个基础的Flink session 集群包含以下k8s资源组件:

Flink Conf ConfigMap

  • 用于存储flink-conf.yaml,log4j-console.properties等配置信息
  • Flink JM和TM Deployment启动时会自动获取配置

JobManager Service

  • 通过Service Name和Port暴露JobManager服务,让TaskManager能够连接到JobManager

JobManager Deployment

  • 定义JobManager Pod 副本数目,版本等,保证在Pods中至少有一个副本

TaskManager Deployment

  • 定义TaskManager Pod副本数目,版本等,保证在Pods中至少有一个副本。

这里参考官网给的资源定义:https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/kubernetes.html#session-cluster-resource-definitions

用来存放Flink的相关配置文件,如flink-conf.yaml、log4j.properties等。

  1. 创建一个名为flink-configuration-configmap.yaml的文件。

    内容如下:

    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: flink-config
      labels:
        app: flink
    data:
      flink-conf.yaml: |+
        jobmanager.rpc.address: flink-jobmanager
        taskmanager.numberOfTaskSlots: 1
        blob.server.port: 6124
        jobmanager.rpc.port: 6123
        taskmanager.rpc.port: 6122
        jobmanager.heap.size: 1024m
        taskmanager.memory.process.size: 1024m
      log4j.properties: |+
        log4j.rootLogger=INFO, file
        log4j.logger.akka=INFO
        log4j.logger.org.apache.kafka=INFO
        log4j.logger.org.apache.hadoop=INFO
        log4j.logger.org.apache.zookeeper=INFO
        log4j.appender.file=org.apache.log4j.FileAppender
        log4j.appender.file.file=${log.file}
        log4j.appender.file.layout=org.apache.log4j.PatternLayout
        log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
        log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
    
  2. 创建Flink Conf ConfigMap

    kubectl create -f flink-configuration-configmap.yaml 
    
  3. 查看已创建的configmap

    kubectl get configmap
    

2.1.2 JobManager Service

  1. 编辑jobmanager-service.yaml文件

    apiVersion: v1
    kind: Service
    metadata:
      name: flink-jobmanager
    spec:
      type: ClusterIP
      ports:
      - name: rpc
        port: 6123
      - name: blob
        port: 6124
      - name: ui
        port: 8081
      selector:
        app: flink
        component: jobmanager
    
  2. 创建Flink jobmanager-service

    kubectl create -f jobmanager-service.yaml
    

2.1.3 JobManager Deployment

  1. 编辑jobmanager-deployment.yaml

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: flink-jobmanager
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: flink
          component: jobmanager
      template:
        metadata:
          labels:
            app: flink
            component: jobmanager
        spec:
          containers:
          - name: jobmanager
            image: flink:1.10.1
            workingDir: /opt/flink
            command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\
              while :;
              do
                if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
                  then tail -f -n +1 log/*jobmanager*.log;
                fi;
              done"]
            ports:
            - containerPort: 6123
              name: rpc
            - containerPort: 6124
              name: blob
            - containerPort: 8081
              name: ui
            livenessProbe:
              tcpSocket:
                port: 6123
              initialDelaySeconds: 30
              periodSeconds: 60
            volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/flink/conf
            securityContext:
              runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
          volumes:
          - name: flink-config-volume
            configMap:
              name: flink-config
              items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j.properties
                path: log4j.properties
    
  2. 创建Flink jobmanager-deployment

    kubectl create -f jobmanager-deployment.yaml
    

2.1.4 TaskManager Deployment

  1. 编辑taskmanager-deployment.yaml

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: flink-taskmanager
    spec:
      replicas: 2
      selector:
        matchLabels:
          app: flink
          component: taskmanager
      template:
        metadata:
          labels:
            app: flink
            component: taskmanager
        spec:
          containers:
          - name: taskmanager
            image: flink:1.10.1
            workingDir: /opt/flink
            command: ["/bin/bash", "-c", "$FLINK_HOME/bin/taskmanager.sh start; \
              while :;
              do
                if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];
                  then tail -f -n +1 log/*taskmanager*.log;
                fi;
              done"]
            ports:
            - containerPort: 6122
              name: rpc
            livenessProbe:
              tcpSocket:
                port: 6122
              initialDelaySeconds: 30
              periodSeconds: 60
            volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/flink/conf/
            securityContext:
              runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
          volumes:
          - name: flink-config-volume
            configMap:
              name: flink-config
              items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j.properties
                path: log4j.properties
    
  2. 创建Flink taskmanager-deployment

    kubectl create -f taskmanager-deployment.yaml
    

2.1.5.1 kubectl proxy

  1. 在命令行执行kubectl proxy

  2. 然后在浏览器访问 http://localhost:8001/api/v1/namespaces/default/services/flink-jobmanager:ui/proxy

2.1.5.2 kubectl port forward

  1. Run kubectl port-forward ${flink-jobmanager-pod} 8081:8081 to forward your jobmanager’s web ui port to local 8081.
  2. Navigate to http://localhost:8081 in your browser.
  3. Moreover, you could use the following command below to submit jobs to the cluster:

获取所有pod

(base) shirukai@shirukaideMacBook-Pro session % kubectl get pods
NAME                                READY   STATUS    RESTARTS   AGE
flink-jobmanager-6b67975bc6-rshdt   1/1     Running   0          23m
flink-taskmanager-9dbb6dbc4-lhdhr   1/1     Running   0          4m58s
flink-taskmanager-9dbb6dbc4-mx7rn   1/1     Running   0          4m58s

查看jobmanager pod

kubectl describe pod flink-jobmanager-6b67975bc6-rshdt

端口转发

kubectl port-forward flink-jobmanager-6b67975bc6-rshdt 8081:8081

2.1.5.3 NodePort方式

  1. 编辑jobmanager-rest-service.yaml

    apiVersion: v1
    kind: Service
    metadata:
      name: flink-jobmanager-rest
    spec:
      type: NodePort
      ports:
      - name: rest
        port: 8081 #Cluster IP 上监听的端口
        targetPort: 8081 #Pod监听的端口
        nodePort: 30081 #k8s节点上监听的端口
      selector:
        app: flink
        component: jobmanager
    
  2. 创建 jobmanager-rest-service

    kubectl create -f jobmanager-rest-service.yaml
    
  3. 通过kubectl get svc 查看对外端口

    kubectl get svc
    

    image-20200922105755486

    访问: localhost:30081

2.1.5.4 提交任务到seesion集群

假如本地有flink客户端,可以直接进入bin目录下,使用客户端进行任务提交

 ./flink run -m localhost:30081  ../examples/streaming/WordCount.jar 

image-20200922133322206

image-20200922133334904

2.1.5.5 停止集群

kubectl delete -f jobmanager-rest-service.yaml
kubectl delete -f jobmanager-service.yaml
kubectl delete -f jobmanager-deployment.yaml
kubectl delete -f taskmanager-deployment.yaml
kubectl delete -f flink-configuration-configmap.yaml

Per-Job需要提前在容器内准备好Jar包,有两种方式,一种是重新构建镜像,将用户的Jar包打到镜像里,第二种是通过挂载卷的方式将用户存放Jar的存储,挂载到容器里的/opt/flink/usrlib下。

2.2.1 重新构建镜像

重构镜像也有两种方式,一种是基于源码编译后然后重新构建镜像。第二种是基于官方的镜像,将我们的Jar打进去。

2.2.1.1 基于编码编重新构建对象

源码编译完成之后,进入flink/flink-container/docker目录,执行如下命令进行构建镜像

sh build.sh --from-local-dist --image-name flink-job:1.10.1-source --job-artifacts ../../build-target/examples/streaming/WordCount.jar

2.2.1.2 基于官方镜像重新构建

编写dockerfile

From flink:1.10.1
ADD *.jar /opt/flink/usrlib/

构建镜像

docker build -t flink-job:1.10.1-image .

2.2.1.3 定义k8s资源

flink-configuration-configmap.yaml

apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |+
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 1
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    jobmanager.heap.size: 1024m
    taskmanager.memory.process.size: 1024m
  log4j.properties: |+
    log4j.rootLogger=INFO, file
    log4j.logger.akka=INFO
    log4j.logger.org.apache.kafka=INFO
    log4j.logger.org.apache.hadoop=INFO
    log4j.logger.org.apache.zookeeper=INFO
    log4j.appender.file=org.apache.log4j.FileAppender
    log4j.appender.file.file=${log.file}
    log4j.appender.file.layout=org.apache.log4j.PatternLayout
    log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
  log4j-console.properties: |+
    log4j-console.properties: |+
    # This affects logging for both user code and Flink
    log4j.rootLogger=INFO, console

    # Uncomment this if you want to _only_ change Flink's logging
    #log4j.logger.org.apache.flink=INFO

    # The following lines keep the log level of common libraries/connectors on
    # log level INFO. The root logger does not override this. You have to manually
    # change the log levels here.
    log4j.logger.akka=INFO
    log4j.logger.org.apache.kafka=INFO
    log4j.logger.org.apache.hadoop=INFO
    log4j.logger.org.apache.zookeeper=INFO

    # Log all infos to the console
    log4j.appender.console=org.apache.log4j.ConsoleAppender
    log4j.appender.console.layout=org.apache.log4j.PatternLayout
    log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

    # Suppress the irrelevant (wrong) warnings from the Netty channel handler
    log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console

jobmanager-service.yaml

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
spec:
  type: ClusterIP
  ports:
  - name: rpc
    port: 6123
  - name: blob-server
    port: 6124
  - name: webui
    port: 8081
  selector:
    app: flink
    component: jobmanager

jobmanager-rest-service.yaml

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager-rest
spec:
  type: NodePort
  ports:
  - name: rest
    port: 8081
    targetPort: 8081
    nodePort: 30081
  selector:
    app: flink
    component: jobmanager

jobmanager-job-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager

    # -----
    spec:
      # restartPolicy: OnFailure
      containers:
        - name: jobmanager
          image: flink-job:1.10.1-image
          env:
          args: ["standalone-job", "--job-classname", "org.apache.flink.streaming.examples.wordcount.WordCount"] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
          ports:
            - containerPort: 6123
              name: rpc
            - containerPort: 6124
              name: blob-server
            - containerPort: 8081
              name: webui
          livenessProbe:
            tcpSocket:
              port: 6123
            initialDelaySeconds: 30
            periodSeconds: 60
          volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/flink/conf
           # - name: job-artifacts-volume
           #   mountPath: /opt/flink/usrlib
          securityContext:
            runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
        - name: flink-config-volume
          configMap:
            name: flink-config
            items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j.properties
                path: log4j.properties
              - key: log4j-console.properties
                path: log4j-console.properties
      #  - name: job-artifacts-volume
       #   hostPath:
        #    path: /host/path/to/job/artifacts

taskmanager-job-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: flink-job:1.10.1-image
        env:
        args: ["taskmanager"]
        ports:
        - containerPort: 6122
          name: rpc
        - containerPort: 6125
          name: query-state
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        #- name: job-artifacts-volume
        #  mountPath: /opt/flink/usrlib
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties
#      - name: job-artifacts-volume
#        hostPath:
#          path: /host/path/to/job/artifacts

2.2.1.4 部署Per-Job集群

kubectl create -f flink-configuration-configmap.yaml
kubectl create -f jobmanager-rest-service.yaml
kubectl create -f jobmanager-service.yaml
kubectl create -f jobmanager-job-deployment.yaml
kubectl create -f taskmanager-job-deployment.yaml

2.2.1.5 停止集群

kubectl delete -f flink-configuration-configmap.yaml
kubectl delete -f jobmanager-rest-service.yaml
kubectl delete -f jobmanager-service.yaml
kubectl delete -f jobmanager-job-deployment.yaml
kubectl delete -f taskmanager-job-deployment.yaml

docker rm $(docker ps -a | grep Exit | awk ‘{print $1}’)

0%