dolphinscheduler的调度任务有flink流任务,flinkx数据同步,这两类任务目前是批任务的思路,阻塞等待任务结束,缺少流任务的异步监控,需要做一些非阻塞的改造。
存在问题
flink流任务在ds中调度存在2个问题
- flink task提交后,worker会一直在轮询yarn的api查状态等待结束,但是flink流任务是没有终点的,所以这儿很浪费资源。
- 如果出现worker容错,会kill掉flink任务,调度问题导致任务被中止,这个是不合理的。
修改思路
- 流任务提交后,将流程/任务状态设置为运行中,同时新增一张流任务监控表stream_task,
- 单独一个进程或者定时任务来轮询stream_task,查询yarn/k8s的任务状态,更新流程/任务状态,发现异常时触发告警,人工干预处理;
- 任务kill接口调用时,判断是不是流任务,如果是流任务,修改状态为ready_stop,由监控线程异步执行yarn/k8s的kill逻辑
flink调度任务新增类型参数:离线/实时
- 如果是离线任务:必须查询yarn/k8s任务状态轮询阻塞,dag调度时才能触发后续任务。
- 如果是实时任务:
- 任务提交成功后(ret_code=0),任务状态为success,同时新增一张流任务监控表stream_task(状态=待查询)记录,状态为SUBMITTED_SUCCESS
- scheduler-alert启动单独启动一个线程,轮询stream_task,用于处理任务异常或手动停止;
流任务=1个流程套1个流task
任务停止
- flink on k8s任务:执行
flink deploy delete $appId
和flink-ui ingress delete $appId
- flink on yarn:启动shell进程执行
yarn kill -application $appId
,这儿得根据流程实例获取当前租户信息,然后获取kerberos认证文件.
任务状态监控
查询任务在yarn/k8s的任务状态,并更新数据库的流程实例表和任务实例表的状态(运行中/结束),结束时触发sentry告警(tag:stream);
- 待查询:60s轮询一次,查询stream_task表的SUBMITTED_SUCCESS,修改为RUNNING,为了界面显示状态一致,相关的process_instance和task_instance表的状态也改为RUNNING,
- 运行中:5s轮询一次;查询stream_task表的READY_STOP, RUNNING_EXECUTION
- READY_STOP:执行kill操作,如果是yarn新建shell进程kill,如果是k8s,调用api执行delete deploy,最后触发告警
- RUNNING_EXECUTION:查询yarn或者k8s的集群状态,如果异常,触发告警
master或worker容错时会查询[RUNNING_EXECUTION,READY_PAUSE,READY_STOP]
状态的processInstance/taskInstance进行容错
- 当master容错时,需要检查当前流程是否为stream_task相关的流程,如果是的话,不做容错;
- 当worker容错时,因为此时task_instance虽然显示的是RUNNING_EXECUTION,但是host不为空,所以不会触发kill并创建taskInstance;
流任务监控功能
列表查询:项目,流程名称,执行用户,起止时间
- 日志查看:yarn日志,k8s日志
- WebUI跳转:url由clusterId和任务类型获取
- kill操作:中止流任务
- restart:重启流任务