Flink版本:1.10.1
kubernetes:1.16.5
Flink 在Kubernetes上部署分为Job cluster和Session cluster两种模式。Job cluster需要我们将自己的Jar打到flink镜像里一块部署,session模式可以启动cluster之后,我们再提交jar到session cluster。
1 环境准备
在YARN模式部署的文章里,我们是直接从官网下载编译后的包进行部署的。由于Job Cluster模式需要我们重新打镜像,在环境准备这一部分,我们尝试从github上拉去flink源码手动编译一下。
1.1 从Github上拉取Flink源码
进入 flink github,地址:https://github.com/apache/flink
点击【Releases】,查找我们需要下载版本的包,例如我们需要下载1.10.1的源码,我们就在Releases页面下载release-1.10.1这个包
wget https://github.com/apache/flink/archive/release-1.10.1.tar.gz
小技巧:由于网络原因我们可能下载代码失败,我们可以注册一个码云账号https://gitee.com/,在码云上新创建一个仓库,在导入已有仓库选项中将flink在github上的地址填入进去,最后我们从码云上下载代码就很快了。
下载代码后进行解压
tar -zxvf flink-release-1.10.1.tar.gz
1.2 编译
进入解压目录执行如下Maven命令进行编译
mvn clean package -DskipTests -Dfast
编译过程中可能遇到如下错误:
Failure to find io.confluent:kafka-schema-registry-client:jar:3.3.1
[ERROR] Failed to execute goal on project flink-avro-confluent-registry: Could not resolve dependencies for project org.apache.flink:flink-avro-confluent-registry:jar:1.8.2: Failure to find io.confluent:kafka-schema-registry-client:jar:3.3.1 in https://maven.aliyun.com/repository/public was cached in the local repository, resolution will not be reattempted until the update interval of aliyunmaven has elapsed or updates are forced -> [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [ERROR] Re-run Maven using the -X switch to enable full debug logging. [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/DependencyResolutionException [ERROR] [ERROR] After correcting the problems, you can resume the build with the command [ERROR] mvn <goals> -rf :flink-avro-confluent-registry
只是由于在maven仓库中找不到kafka-schema-registry-client:jar:3.3.1导致的,这里我尝试换用官方仓库和阿里仓库都没有找到该报,通过查阅资料,我们可以手动下载该包,然后安装到我们的maven仓库中。
在flink代码目录创建lib包,用来存放下载的JAR,然后下载从io.confluent官网下载对应版本的JAR
mkdir lib cd lib wget http://packages.confluent.io/maven/io/confluent/kafka-schema-registry-client/3.3.1/kafka-schema-registry-client-3.3.1.jar
然后使用maven命令安装该JAR到本地仓库
mvn install:install-file -DgroupId=io.confluent -DartifactId=kafka-schema-registry-client -Dversion=3.3.1 -Dpackaging=jar -Dfile=kafka-schema-registry-client-3.3.1.jar
重新尝试编译。
build-target目录下就是我们编译后的文件
1.3 Kubernetes环境
我们也需要k8s环境,可以尝试在自己电脑上使用docker on desktop安装kuberneteshttps://www.docker.com/products/docker-desktop,这里就不详细介绍,可以参考:https://blog.csdn.net/shirukai/article/details/103512497在自己的电脑上搭建k8s环境。
2 Flink session cluster on Kubernetes
Flink Session cluster是作为长期运行的Kubernetes Deployment。一个session cluster可以提交多个Flink job,集群部署后,需要将Job提交到集群。
一个基础的Flink session 集群包含以下k8s资源组件:
Flink Conf ConfigMap
- 用于存储flink-conf.yaml,log4j-console.properties等配置信息
- Flink JM和TM Deployment启动时会自动获取配置
JobManager Service
- 通过Service Name和Port暴露JobManager服务,让TaskManager能够连接到JobManager
JobManager Deployment
- 定义JobManager Pod 副本数目,版本等,保证在Pods中至少有一个副本
TaskManager Deployment
- 定义TaskManager Pod副本数目,版本等,保证在Pods中至少有一个副本。
2.1 在K8S上部署Flink Session cluster
2.1.1 Flink ConfigMap
用来存放Flink的相关配置文件,如flink-conf.yaml、log4j.properties等。
创建一个名为flink-configuration-configmap.yaml的文件。
内容如下:
apiVersion: v1 kind: ConfigMap metadata: name: flink-config labels: app: flink data: flink-conf.yaml: |+ jobmanager.rpc.address: flink-jobmanager taskmanager.numberOfTaskSlots: 1 blob.server.port: 6124 jobmanager.rpc.port: 6123 taskmanager.rpc.port: 6122 jobmanager.heap.size: 1024m taskmanager.memory.process.size: 1024m log4j.properties: |+ log4j.rootLogger=INFO, file log4j.logger.akka=INFO log4j.logger.org.apache.kafka=INFO log4j.logger.org.apache.hadoop=INFO log4j.logger.org.apache.zookeeper=INFO log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.file=${log.file} log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
创建Flink Conf ConfigMap
kubectl create -f flink-configuration-configmap.yaml
查看已创建的configmap
kubectl get configmap
2.1.2 JobManager Service
编辑jobmanager-service.yaml文件
apiVersion: v1 kind: Service metadata: name: flink-jobmanager spec: type: ClusterIP ports: - name: rpc port: 6123 - name: blob port: 6124 - name: ui port: 8081 selector: app: flink component: jobmanager
创建Flink jobmanager-service
kubectl create -f jobmanager-service.yaml
2.1.3 JobManager Deployment
编辑jobmanager-deployment.yaml
apiVersion: apps/v1 kind: Deployment metadata: name: flink-jobmanager spec: replicas: 1 selector: matchLabels: app: flink component: jobmanager template: metadata: labels: app: flink component: jobmanager spec: containers: - name: jobmanager image: flink:1.10.1 workingDir: /opt/flink command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\ while :; do if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]]; then tail -f -n +1 log/*jobmanager*.log; fi; done"] ports: - containerPort: 6123 name: rpc - containerPort: 6124 name: blob - containerPort: 8081 name: ui livenessProbe: tcpSocket: port: 6123 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/flink/conf securityContext: runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j.properties path: log4j.properties
创建Flink jobmanager-deployment
kubectl create -f jobmanager-deployment.yaml
2.1.4 TaskManager Deployment
编辑taskmanager-deployment.yaml
apiVersion: apps/v1 kind: Deployment metadata: name: flink-taskmanager spec: replicas: 2 selector: matchLabels: app: flink component: taskmanager template: metadata: labels: app: flink component: taskmanager spec: containers: - name: taskmanager image: flink:1.10.1 workingDir: /opt/flink command: ["/bin/bash", "-c", "$FLINK_HOME/bin/taskmanager.sh start; \ while :; do if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]]; then tail -f -n +1 log/*taskmanager*.log; fi; done"] ports: - containerPort: 6122 name: rpc livenessProbe: tcpSocket: port: 6122 initialDelaySeconds: 30 periodSeconds: 60 volumeMounts: - name: flink-config-volume mountPath: /opt/flink/conf/ securityContext: runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary volumes: - name: flink-config-volume configMap: name: flink-config items: - key: flink-conf.yaml path: flink-conf.yaml - key: log4j.properties path: log4j.properties
创建Flink taskmanager-deployment
kubectl create -f taskmanager-deployment.yaml
2.1.5 通过三种方式访问Flink UI
2.1.5.1 kubectl proxy
在命令行执行
kubectl proxy
然后在浏览器访问 http://localhost:8001/api/v1/namespaces/default/services/flink-jobmanager:ui/proxy
2.1.5.2 kubectl port forward
- Run
kubectl port-forward ${flink-jobmanager-pod} 8081:8081
to forward your jobmanager’s web ui port to local 8081. - Navigate to http://localhost:8081 in your browser.
- Moreover, you could use the following command below to submit jobs to the cluster:
获取所有pod
(base) shirukai@shirukaideMacBook-Pro session % kubectl get pods
NAME READY STATUS RESTARTS AGE
flink-jobmanager-6b67975bc6-rshdt 1/1 Running 0 23m
flink-taskmanager-9dbb6dbc4-lhdhr 1/1 Running 0 4m58s
flink-taskmanager-9dbb6dbc4-mx7rn 1/1 Running 0 4m58s
查看jobmanager pod
kubectl describe pod flink-jobmanager-6b67975bc6-rshdt
端口转发
kubectl port-forward flink-jobmanager-6b67975bc6-rshdt 8081:8081
2.1.5.3 NodePort方式
编辑jobmanager-rest-service.yaml
apiVersion: v1 kind: Service metadata: name: flink-jobmanager-rest spec: type: NodePort ports: - name: rest port: 8081 #Cluster IP 上监听的端口 targetPort: 8081 #Pod监听的端口 nodePort: 30081 #k8s节点上监听的端口 selector: app: flink component: jobmanager
创建 jobmanager-rest-service
kubectl create -f jobmanager-rest-service.yaml
通过kubectl get svc 查看对外端口
kubectl get svc
访问: localhost:30081
2.1.5.4 提交任务到seesion集群
假如本地有flink客户端,可以直接进入bin目录下,使用客户端进行任务提交
./flink run -m localhost:30081 ../examples/streaming/WordCount.jar
2.1.5.5 停止集群
kubectl delete -f jobmanager-rest-service.yaml
kubectl delete -f jobmanager-service.yaml
kubectl delete -f jobmanager-deployment.yaml
kubectl delete -f taskmanager-deployment.yaml
kubectl delete -f flink-configuration-configmap.yaml
2.2 Flink Per-Job on Kubernetes
Per-Job需要提前在容器内准备好Jar包,有两种方式,一种是重新构建镜像,将用户的Jar包打到镜像里,第二种是通过挂载卷的方式将用户存放Jar的存储,挂载到容器里的/opt/flink/usrlib下。
2.2.1 重新构建镜像
重构镜像也有两种方式,一种是基于源码编译后然后重新构建镜像。第二种是基于官方的镜像,将我们的Jar打进去。
2.2.1.1 基于编码编重新构建对象
源码编译完成之后,进入flink/flink-container/docker目录,执行如下命令进行构建镜像
sh build.sh --from-local-dist --image-name flink-job:1.10.1-source --job-artifacts ../../build-target/examples/streaming/WordCount.jar
2.2.1.2 基于官方镜像重新构建
编写dockerfile
From flink:1.10.1
ADD *.jar /opt/flink/usrlib/
构建镜像
docker build -t flink-job:1.10.1-image .
2.2.1.3 定义k8s资源
flink-configuration-configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
labels:
app: flink
data:
flink-conf.yaml: |+
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 1
blob.server.port: 6124
jobmanager.rpc.port: 6123
taskmanager.rpc.port: 6122
jobmanager.heap.size: 1024m
taskmanager.memory.process.size: 1024m
log4j.properties: |+
log4j.rootLogger=INFO, file
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.file=${log.file}
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, file
log4j-console.properties: |+
log4j-console.properties: |+
# This affects logging for both user code and Flink
log4j.rootLogger=INFO, console
# Uncomment this if you want to _only_ change Flink's logging
#log4j.logger.org.apache.flink=INFO
# The following lines keep the log level of common libraries/connectors on
# log level INFO. The root logger does not override this. You have to manually
# change the log levels here.
log4j.logger.akka=INFO
log4j.logger.org.apache.kafka=INFO
log4j.logger.org.apache.hadoop=INFO
log4j.logger.org.apache.zookeeper=INFO
# Log all infos to the console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
# Suppress the irrelevant (wrong) warnings from the Netty channel handler
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console
jobmanager-service.yaml
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager
spec:
type: ClusterIP
ports:
- name: rpc
port: 6123
- name: blob-server
port: 6124
- name: webui
port: 8081
selector:
app: flink
component: jobmanager
jobmanager-rest-service.yaml
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager-rest
spec:
type: NodePort
ports:
- name: rest
port: 8081
targetPort: 8081
nodePort: 30081
selector:
app: flink
component: jobmanager
jobmanager-job-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
# -----
spec:
# restartPolicy: OnFailure
containers:
- name: jobmanager
image: flink-job:1.10.1-image
env:
args: ["standalone-job", "--job-classname", "org.apache.flink.streaming.examples.wordcount.WordCount"] # optional arguments: ["--job-id", "<job id>", "--fromSavepoint", "/path/to/savepoint", "--allowNonRestoredState"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 6124
name: blob-server
- containerPort: 8081
name: webui
livenessProbe:
tcpSocket:
port: 6123
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
# - name: job-artifacts-volume
# mountPath: /opt/flink/usrlib
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j.properties
path: log4j.properties
- key: log4j-console.properties
path: log4j-console.properties
# - name: job-artifacts-volume
# hostPath:
# path: /host/path/to/job/artifacts
taskmanager-job-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
spec:
replicas: 2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: flink-job:1.10.1-image
env:
args: ["taskmanager"]
ports:
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query-state
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
#- name: job-artifacts-volume
# mountPath: /opt/flink/usrlib
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
# - name: job-artifacts-volume
# hostPath:
# path: /host/path/to/job/artifacts
2.2.1.4 部署Per-Job集群
kubectl create -f flink-configuration-configmap.yaml
kubectl create -f jobmanager-rest-service.yaml
kubectl create -f jobmanager-service.yaml
kubectl create -f jobmanager-job-deployment.yaml
kubectl create -f taskmanager-job-deployment.yaml
2.2.1.5 停止集群
kubectl delete -f flink-configuration-configmap.yaml
kubectl delete -f jobmanager-rest-service.yaml
kubectl delete -f jobmanager-service.yaml
kubectl delete -f jobmanager-job-deployment.yaml
kubectl delete -f taskmanager-job-deployment.yaml
docker rm $(docker ps -a | grep Exit | awk ‘{print $1}’)