0%

FlinkOnK8S踩坑记录

dolphinscheduler的调度任务有flink流任务,flinkx数据同步,默认仅支持flink on yarn,
本文记录了折腾flink on k8s的踩坑过程,主要难点是flink on k8s自身不支持pvc带来的各种妥协,
镜像的制作,hive的兼容。

部署环境

本来之前用的都是flink-1.10,但是1.10对k8s的支持还是beta的,(类似spark的2.4版本,到3.0才真正可用)。kerberos的支持,node-selector,application模式,secrets敏感数据的安全增强。
另外flinkx插件对k8s的支持也只支持flink-1.12,所以折腾了会就升级1.12了.

  • flink版本:flink-1.12-scala-2.12
  • k8s版本:1.15.3

工作流程

工作架构

flinkOnK8s工作流程
支持applicationModesessionMode,我用的是applicationMode.

节点类型

  • jobmanager

    • service:jobmanager的rpc服务
    • deployment:控制pod的副本数
    • pod:实际执行jobmanager逻辑
    • ui-ingress:用户创建,路由到service
  • taskmanager

    • pod: 根据slot和parallel动态启动多个
  • historyserver:独立部署

实现细节

制作docker镜像

因为flink native k8s不支持pvc,只能读取本地的资源文件,所以好多需要用到外部动态资源的都是根据环境变量,在entrypoint启动前完成:

  1. 设置hosts
  2. minio客户端下载flink启动jar
  3. 设置flinkx的classpath
  4. 设置hadoop classpath

附件:
flink-dockerfile
flink-entrypoint.sh
minio_client.py

1.12后续版本版本也支持podTemplate中以sidecar的形式实现上述操作。
这点spark做的好多了,flink这个基础的volume都不支持,太坑了

配置K8S环境

需要运维新增namespaceservice account

JobManager的HA

K8s的HA实现原理:leader选举是间接通过etcd实现,恢复时状态数据从savepoint获取。

注意delete deploy的时候,high-availability.storageDir数据不会删除,需要自己清理

flink-conf新增以下配置实现

1
2
3
4
5
6
high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
#JobManager metadata is persisted in the file system high-availability.storageDir and only a pointer to this state is stored in Kubernetes.
# The storageDir stores all metadata needed to recover a JobManager failure.
high-availability.storageDir: s3:///flink/recovery
# In order to identify the Flink cluster, you have to specify a kubernetes.cluster-id.
kubernetes.cluster-id: cluster1337

查看flink日志

目前3种方式查看

  1. kubectl logs $clusterId
  2. flink web ui
  3. kibana(fluent-bit会将pod日志都采集到es)

1和2只能在任务运行时查看,任务结束后看不了。

部署history-server

以deployment形式部署1个flink-historyserver,然后用service路由,增加1个nodeport外部访问

flink-history-server.yaml

访问web-ui

  • 官方提供的port-forward方案,kubectl -n spark port-forward podName 4000:4040,这种比较原始,每个flink run都需要启动一个代理服务,适合测试,不适合生产环境
  • 生产环境应类似部署独立的ingress提供外部访问,每个flink应用会自动创建一个service,如FLINK_DEMOx-kafka-hive-parquet-rest手动为每个flink应用创建一个ingress,
    然后用ingress controller访问:地址:http://ip:port/apps/FLINK_DEMOx-kafka-hive-parquet/#/overview
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
name: FLINK_DEMOx-kafka-hive-parquet
namespace: FLINK_DEMO
annotations:
nginx.ingress.kubernetes.io/ssl-redirect: "false"
nginx.ingress.kubernetes.io/rewrite-target: /$2
spec:
rules:
- http:
paths:
- path: /apps/FLINK_DEMOx-kafka-hive-parquet(/|$)(.*)
backend:
serviceName: FLINK_DEMOx-kafka-hive-parquet-rest
servicePort: 8081

这儿比spark做的好,spark还得自己部署个service才行

文件系统

目标:支持同时读取hdfs和minio
结论:flink是不强依赖hadoop版本的,HADOOP_CLASSPATH指定hadoop classpath就一定working,建议自己写测试程序验证,实在不行再编译源码。

flink-1.12.2版本的flink-s3-fs-hadoop插件,依赖hadoop-3.1.0。
如果直接换成hadoop-2.6.0,会导致HadoopS3AccessHelper编译不通过,
下面3个依赖只在hadoop-aws-3.1.0版本中存在。

1
2
3
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.fs.s3a.WriteOperationHelper;
import com.amazonaws.SdkBaseException;

是需要自己编译flink-s3-fs-hadoop?
还是有其他的兼容性方案,使得flink兼容hadoop-2.6.0

改为2.7.5测试
java.lang.NoSuchMethodError: org.apache.hadoop.tracing.TraceUtils.wrapHadoopConf(Ljava/lang/String;Lorg/apache/hadoop/conf/Configuration;)Lorg/apache/htrace/HTraceConfiguration;

改为3.1.0测试,无法识别s3.endpoint配置,发现坑爹玩意儿不读flink-conf.yaml,读取的是core-site.xml中的配置
HADOOP_CONF_DIRFLINK_DIR_CONF因为没有加入classpath都没用,只能识别test/resources根目录下的core-site.xml

s3a官方wordcount示例

./bin/start-cluster.
./bin/flink run examples/batch/WordCount.jar -input s3://FLINKDEMO/FLINK/core-site.xml -output s3://FLINKDEMO/__FLINK/worcount.txt
一切正常…
那说明

flink-filesystems项目的pom添加repository

1
2
3
4
5
6
7
<repositories>
true<repository>
truetrue<id>cloudera</id>
truetrue<name>cloudera repo</name>
truetrue<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
true</repository>
</repositories>

1
2
3
4
5
6
7
8
9
mvn clean install \
-Dfast \
-Dskip.npm \
-DskipTests \
-Drat.skip=true \
-Dscala-2.12 \
-Pinclude-hadoop \
-Dhadoop.version=2.6.5 \
-Phive-1.1.0

1
2
3
4
5
6
mvn clean package \
-DskipTests \
-Pvendor-repos \
-Drat.skip=true \
-Pinclude-hadoop \
-Dhadoop.version=2.6.5

自己打包的hadoop是能够读写hdfs了,但是s3a文件系统必须要hadoop-3.1.0,这边好了,那边又翘起来了(--)

hive和hdfs适配好了,然后s3那边又挂了…,hadoop-aws对hadoop的版本超级敏感,一个版本变化都肯呢个会导致起不来

Flink按官方文档是采用SPI机制,自定义revert classloader的方式来动态加载filesystem,(org.apache.flink.core.plugin.PluginLoader),按理不会和flink Classloader的类冲突。

flink程序读取一个s3a://的文件,本地debug时没有FileSystem走的initializeWithoutPlugins,所以报错找不到s3a的文件系统,然后我小机灵就pom引入了flink-s3-fs-hadoop,然后就和user code中的class冲突了,hadoop版本不匹配,

根本原因是这个插件的inverse class loader机制没生效。所以手动初始化一下filesystem的插件机制

1
FileSystem.initialize(flinkConfig, PluginUtils.createPluginManagerFromRootFolder(flinkConfig));

PluginLoader的核心代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95

/**
* Loads all classes from the plugin jar except for explicitly white-listed packages
* (org.apache.flink, logging).
*
* <p>No class/resource in the system class loader (everything in lib/) can be seen in the
* plugin except those starting with a whitelist prefix.
*/
private static final class PluginClassLoader extends URLClassLoader {
private static final ClassLoader PLATFORM_OR_BOOTSTRAP_LOADER;
private final ClassLoader flinkClassLoader;
private final String[] allowedFlinkPackages;
private final String[] allowedResourcePrefixes;

PluginClassLoader(
URL[] pluginResourceURLs,
ClassLoader flinkClassLoader,
String[] allowedFlinkPackages) {
super(pluginResourceURLs, PLATFORM_OR_BOOTSTRAP_LOADER);
this.flinkClassLoader = flinkClassLoader;
this.allowedFlinkPackages = allowedFlinkPackages;
allowedResourcePrefixes =
Arrays.stream(allowedFlinkPackages)
.map(packageName -> packageName.replace('.', '/'))
.toArray(String[]::new);
}

@Override
protected Class<?> loadClass(final String name, final boolean resolve)  throws ClassNotFoundException {
synchronized (getClassLoadingLock(name)) {
final Class<?> loadedClass = findLoadedClass(name);
if (loadedClass != null) {
return resolveIfNeeded(resolve, loadedClass);
}

if (isAllowedFlinkClass(name)) {
try {
return resolveIfNeeded(resolve, flinkClassLoader.loadClass(name));
} catch (ClassNotFoundException e) {
// fallback to resolving it in this classloader
// for cases where the plugin uses org.apache.flink namespace
}
}
return super.loadClass(name, resolve);
}
}

private Class<?> resolveIfNeeded(final boolean resolve, final Class<?> loadedClass) {
if (resolve) {
resolveClass(loadedClass);
}
return loadedClass;
}

@Override
public URL getResource(final String name) {
if (isAllowedFlinkResource(name)) {
return flinkClassLoader.getResource(name);
}
return super.getResource(name);
}

@Override
public Enumeration<URL> getResources(final String name) throws IOException {
// ChildFirstClassLoader merges child and parent resources
if (isAllowedFlinkResource(name)) {
return flinkClassLoader.getResources(name);
}

return super.getResources(name);
}

private boolean isAllowedFlinkClass(final String name) {
return Arrays.stream(allowedFlinkPackages).anyMatch(name::startsWith);
}

private boolean isAllowedFlinkResource(final String name) {
return Arrays.stream(allowedResourcePrefixes).anyMatch(name::startsWith);
}

static {
ClassLoader platformLoader = null;
try {
platformLoader = (ClassLoader)ClassLoader.class.getMethod("getPlatformClassLoader").invoke(null);
} catch (NoSuchMethodException e) {
// on Java 8 this method does not exist, but using null indicates the bootstrap
// loader that we want
// to have
} catch (Exception e) {
throw new IllegalStateException("Cannot retrieve platform classloader on Java 9+", e);
}
PLATFORM_OR_BOOTSTRAP_LOADER = platformLoader;
ClassLoader.registerAsParallelCapable();
}
}

用户程序加载

flink1.12不能使用pvc挂载目录到容器内,userCode只能提前打包在image中,这在生产上基本不能使用。因为数据加工时,用户的代码总是自定义的jar包的形式出现。

另外打包方式可以采用shaded-plugin,将依赖都打成1个fatJar,flink的依赖改为provided。classloader.resolve-order:child-first模式加载就行,避免userCode和flink代码冲突

flink-1.12可在entrypoint中通过s3a的python sdk,从minio下载用户指定的userCode.jar,然后启动脚本用local://userCode.jar

flink-1.13可以定义podTemplate,initContainer下载远程userCode到local

相关配置参数:pipeline.jars,pipeline.classpath

kerberos认证

Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors
flink的kerberos配置

1
2
3
4
security.kerberos.login.contexts
security.kerberos.login.keytab
security.kerberos.login.principal
security.kerberos.login.use-ticket-cache

我是自己从远程下载当前用户的keytab然后在程序内认证的,不是走kinit的形式。

  1. 默认读取/etc/krb5.conf而不是自定义的krb5.conf

    1
    2
    3
    4
    5
    public static synchronized void reloadKrb5conf(String krb5confPath) {
    System.setProperty("java.security.krb5.conf", krb5confPath);
    Config.refresh();
    KerberosName.resetDefaultRealm();
    }
  2. kerberos使用keytab认证后,返回的用户是当前linux的系统用户(simple认证)

    1
    2
    3
    4
    5
    6
    7
    Configuration securityConf = new Configuration();
    securityConf.set(FileSystemUtil.KEY_HADOOP_SECURITY_AUTHORIZATION, "true");
    securityConf.set(FileSystemUtil.KEY_HADOOP_SECURITY_AUTHENTICATION, KRB_STR);
    //强制刷新config会重新初始化UserGroupInformation的conf
    UserGroupInformation.setConfiguration(securityConf);
    LOG.trace("login user:{} with keytab:{}", principal, keytab);
    return UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab);

hive版本兼容

由于社区大多数用户都是hive作为数仓,所以flink和spark一样,都内置兼容hive的各个版本。
但注意flink的hive版本和自己的hiveServer的版本必须严格一致,用高版本的jdbc连接低版本的hiveserver是会报错的

引入flink-sql-connector-hive包即可,但是注意如果服务端是cdh的,官方的包可能报错,还是需要自己引入cdh对应的hive包

hosts配置

flink run-application中新增环境变量HOSTS_FILE,
docker镜像的entrypoint中读取变量并并追加到/etc/hosts

示例

flink不同集群环境的测试脚本

测试WordCount

/opt/flink/bin/flink run-application \
—target kubernetes-application \
-Dkubernetes.cluster-id=”flink-demo1” \
-Dkubernetes.container.image=”harbor.dc.xxx-it.com/x-bigdata/flink:0.1” \
-Dkubernetes.container.image.pull-policy=Always \
-Dkubernetes.namespace=dboard \
-Dkubernetes.service-account=dboard \
-Dkubernetes.jobmanager.cpu=1 \
-Dkubernetes.taskmanager.cpu=1 \
-Djobmanager.memory.flink.size=1gb \
-Dtaskmanager.memory.process.size=1gb \
-Dtaskmanager.numberOfTaskSlots=1 \
local:///opt/flink/examples/batch/WordCount.jar

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#!/bin/sh
set -ex
BASEDIR=$(cd `dirname $0`; pwd)
cd $BASEDIR
source /opt/dolphinscheduler/conf/env/dolphinscheduler_env.sh
export KUBECONFIG=/opt/dolphinscheduler/conf/config/kube_config.yaml
export FLINK_HOME=$FLINK112_HOME
export FLINK_CONF_DIR="$FLINK_HOME/conf"
export FLINK_CLASS_PATH="$FLINK_HOME/lib/*"

/usr/local/openjdk-8/bin/java \
-Dlog.file=/opt/cdh/lib/flink-1.12.2/log/flink--client-10.199.150.66.log \
-Dlog4j.configuration=file:/opt/cdh/lib/flink-1.12.2/conf/log4j-cli.properties \
-Dlog4j.configurationFile=file:/opt/cdh/lib/flink-1.12.2/conf/log4j-cli.properties \
-Dkubernetes.config.file=/opt/dolphinscheduler/conf/config/kube_config.yaml \
-Dkubernetes.namespace=FLINK_DEMO \
-Dkubernetes.service-account=FLINK_DEMO \
-Dkubernetes.container.image=bigdata/flink:latest \
-Dkubernetes.cluster-id=flink-5-19-2870-28819-48517 \
-Dkubernetes.flink.conf.dir=/opt/cdh/lib/flink-1.12.2/conf \
-Dkubernetes.flink.log.dir=/opt/cdh/lib/flink-1.12.2/log \
-Djobmanager.memory.process.size=1gb \
-Dtaskmanager.memory.process.size=1gb \
-Dtaskmanager.numberOfTaskSlots=1 \
-Dkubernetes.jobmanager.cpu=1.0 \
-Dparallelism.default=1 \
-Dclassloader.resolve-order=parent-first \
-classpath $FLINK_CLASS_PATH: \
org.apache.flink.client.cli.CliFrontend run-application \
--target kubernetes-application \
-c org.apache.flink.examples.java.wordcount.WordCount \
local:///opt/cdh/lib/flink-1.12.2/examples/batch/WordCount.jar \
--input /opt/cdh/lib/flink-1.12.2/conf/flink-conf.yaml \
--output /tmp/flink-conf.yaml

囧,没注意区分flink run 和 flink run-application导致的Caused by: java.lang.IllegalStateException: No ExecutorFactory found to execute the application.查了一下午.
run用的是DefaultExecutorServiceLoader, run-application用的是DefaultClusterClientServiceLoader,—target要与之对应

./bin/flink list —target kubernetes-application -Dkubernetes.cluster-id=flink-demo1
./bin/flink cancel —target kubernetes-application -Dkubernetes.cluster-id=flink-demo1 $jobId

引用