0%

如何让DolphinScheduler的Flink任务支持K8S部署

dolphinscheduler的flink任务默认支持yarn。
本文主要梳理flinkTask如何支持早k8s部署的思路。

flink任务和spark任务支持类似

环境说明

  • dolphinscheduler版本:1.3.6
  • flink版本:1.12.2

如何让DolphinScheduler的Flink任务支持K8S部署

apiserver

向k8s提交任务,需要新增以下基础配置

  • kubeconfig:包含k8s的token,k8s-client初始化需要
  • master:k8s apiserver的地址
  • image:制作好的镜像,提交到harbor的地址

另外FlinkParameters需要新增

  • deployMode:kubernetesApplication
  • flinkVersion:1.12.2

worker

由于我采用的是flink native方式,不是google的spark on k8s operator。
所以worker主要问题是拼接flinkTask的flink run-application命令。
具体配置内容参考xmind脑图

另外可新增1个spark_on_k8s.json配置文件定义环境参数 

1
2
3
4
5
6
7
{
"k8sIngressHttpUrl": "http://ip:port",
"k8sNameSpace": "flink",
"flinkContainerImage": "flink:latest",
"flinkAppServiceAccountName": "flink",
"flinkVersion": "1.12"
}

flink基础配置flink-conf.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# k8s
kubernetes.rest-service.exposed.type: ClusterIP
kubernetes.container.image.pull-policy: Always
kubernetes.jobmanager.cpu: 1.0
security.module.factory.classes : ;
security.context.factory.classes: ;

# rpc
jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
jobmanager.memory.process.size: 1gb
taskmanager.memory.process.size: 1gb
taskmanager.numberOfTaskSlots: 1
parallelism.default: 1
jobmanager.execution.failover-strategy: region
akka.ask.timeout: 120 s
web.timeout: 120000

### history server
jobmanager.archive.fs.dir: s3a://flink/__FLINK__/historylog
historyserver.archive.fs.dir: s3a://flink/__FLINK__/historylog
historyserver.web.address: 10.199.150.161
historyserver.web.port: 30067
historyserver.archive.fs.refresh-interval: 1000

# checkpoint
execution.checkpointing.timeout: 10 min
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.unaligned: false
execution.checkpointing.externalized-checkpoint-retention: DELETE_ON_CANCELLATION

# state
state.backend: filesystem
state.backend.incremental: true
state.backend.fs.checkpointdir: s3a://flink/__FLINK__/checkpoints/backend
state.checkpoints.dir: s3a://flink/__FLINK__/checkpoints/metadata
state.savepoints.dir: s3a://flink/__FLINK__/savepoints
state.checkpoints.num-retained: 10

# filesystem-s3a
s3.access-key: ak
s3.secret-key: sk
s3.endpoint: http://ip:port
s3.path.style.access: true

master

flinkTask执行的worker宕机时,master会对worker执行容错,kill掉所有正在执行的task。
所以master也需要加载worker一样的配置,在容错时执行flink k8s集群的清理工作。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
flink run-application \
--target kubernetes-application \
-Dkubernetes.config.file="/opt/flink/config/kube_config.yaml" \
-Dkubernetes.container.image="flink:latest" \
-Dkubernetes.namespace="flink" \
-Dkubernetes.service-account="flink" \
-Dkubernetes.rest-service.exposed.type="ClusterIP" \
-Dkubernetes.container.image.pull-policy="Always" \
-Dkubernetes.jobmanager.cpu="1.0" \
-Dkubernetes.cluster-id="flink-0-0-0-0-0" \
-Dkubernetes.flink.conf.dir="${FLINK_HOME}/conf" \
-Dkubernetes.flink.log.dir="${FLINK_HOME}/log" \
-Dsecurity.module.factory.classes=";" \
-Dsecurity.context.factory.classes=";" \
-Djobmanager.rpc.address="localhost" \
-Djobmanager.rpc.port="6123" \
-Djobmanager.memory.process.size="1gb" \
-Dtaskmanager.memory.process.size="1gb" \
-Dtaskmanager.numberOfTaskSlots="1" \
-Dparallelism.default="1" \
-Djobmanager.execution.failover-strategy="region" \
-Dakka.ask.timeout="120 s" \
-Dweb.timeout="120000" \
-Djobmanager.archive.fs.dir="s3a://flink/__FLINK__/historylog" \
-Dhistoryserver.archive.fs.dir="s3a://flink/__FLINK__/historylog" \
-Dhistoryserver.web.address="10.199.150.161" \
-Dhistoryserver.web.port="30067" \
-Dhistoryserver.archive.fs.refresh-interval="1000" \
-Dexecution.checkpointing.timeout="10 min" \
-Dexecution.checkpointing.max-concurrent-checkpoints="1" \
-Dexecution.checkpointing.mode="EXACTLY_ONCE" \
-Dexecution.checkpointing.unaligned="false" \
-Dexecution.checkpointing.externalized-checkpoint-retention="DELETE_ON_CANCELLATION" \
-Dstate.backend="filesystem" \
-Dstate.backend.incremental="true" \
-Dstate.backend.fs.checkpointdir="s3a://flink/__FLINK__/checkpoints/backend" \
-Dstate.checkpoints.dir="s3a://flink/__FLINK__/checkpoints/metadata" \
-Dstate.savepoints.dir="s3a://flink/__FLINK__/savepoints" \
-Dstate.checkpoints.num-retained="10" \
-Ds3.access-key="DYaDwXsj8VRtWYPSbr7A" \
-Ds3.secret-key="z7HAEhdyseNX9AVyzDLAJzEjZChJsnAf1f7VehE" \
-Ds3.endpoint="http://10.199.150.160:32030" \
-Ds3.path.style.access="true" \
-Djobmanager.memory.process.size="4G" \
-Dtaskmanager.memory.process.size="2G" \
-Dtaskmanager.numberOfTaskSlots="2" \
-Dcontainerized.master.env.KUBERNETES_HOST_ALIASES="127.0.0.1 localhost" \
-Dcontainerized.master.env.KUBERNETES_S3_ACCESS_KEY="ak" \
-Dcontainerized.master.env.KUBERNETES_S3_SECRET_KEY="sk" \
-Dcontainerized.master.env.KUBERNETES_S3_ENDPOINT="xxx:xxx" \
-Dcontainerized.master.env.KUBERNETES_S3_BUCKET="xxx" \
-Dcontainerized.master.env.KUBERNETES_REMOTE_MAIN_JAR="/flink/WordCount-1.12.2.jar" \
-Dcontainerized.master.env.KUBERNETES_LOCAL_MAIN_JAR="/tmp/WordCount-1.12.2.jar" \
-Dexecution.checkpointing.interval="1 min" \
-d \
-Dkubernetes.container.image.pull-policy=Always \
-c \
org.apache.flink.examples.java.wordcount.WordCount \
local:///tmp/WordCount-1.12.2.jar \
--input /opt/cdh/lib/flink-1.12.2/conf/flink-conf.yaml --output /tmp/flink-conf.yaml

参考