0%

dolphinscheduler流任务监控

dolphinscheduler的调度任务有flink流任务,flinkx数据同步,这两类任务目前是批任务的思路,阻塞等待任务结束,缺少流任务的异步监控,需要做一些非阻塞的改造。

存在问题

flink流任务在ds中调度存在2个问题

  1. flink task提交后,worker会一直在轮询yarn的api查状态等待结束,但是flink流任务是没有终点的,所以这儿很浪费资源。
  2. 如果出现worker容错,会kill掉flink任务,调度问题导致任务被中止,这个是不合理的。

修改思路

  1. 流任务提交后,将流程/任务状态设置为运行中,同时新增一张流任务监控表stream_task,
  2. 单独一个进程或者定时任务来轮询stream_task,查询yarn/k8s的任务状态,更新流程/任务状态,发现异常时触发告警,人工干预处理;
  3. 任务kill接口调用时,判断是不是流任务,如果是流任务,修改状态为ready_stop,由监控线程异步执行yarn/k8s的kill逻辑

flink调度任务新增类型参数:离线/实时

  • 如果是离线任务:必须查询yarn/k8s任务状态轮询阻塞,dag调度时才能触发后续任务。
  • 如果是实时任务:
  1. 任务提交成功后(ret_code=0),任务状态为success,同时新增一张流任务监控表stream_task(状态=待查询)记录,状态为SUBMITTED_SUCCESS
  2. scheduler-alert启动单独启动一个线程,轮询stream_task,用于处理任务异常或手动停止;

流任务=1个流程套1个流task

任务停止

  • flink on k8s任务:执行flink deploy delete $appIdflink-ui ingress delete $appId
  • flink on yarn:启动shell进程执行yarn kill -application $appId,这儿得根据流程实例获取当前租户信息,然后获取kerberos认证文件.

任务状态监控

查询任务在yarn/k8s的任务状态,并更新数据库的流程实例表和任务实例表的状态(运行中/结束),结束时触发sentry告警(tag:stream);

  • 待查询:60s轮询一次,查询stream_task表的SUBMITTED_SUCCESS,修改为RUNNING,为了界面显示状态一致,相关的process_instance和task_instance表的状态也改为RUNNING,
  • 运行中:5s轮询一次;查询stream_task表的READY_STOP, RUNNING_EXECUTION
    • READY_STOP:执行kill操作,如果是yarn新建shell进程kill,如果是k8s,调用api执行delete deploy,最后触发告警
    • RUNNING_EXECUTION:查询yarn或者k8s的集群状态,如果异常,触发告警

master或worker容错时会查询[RUNNING_EXECUTION,READY_PAUSE,READY_STOP]状态的processInstance/taskInstance进行容错

  • 当master容错时,需要检查当前流程是否为stream_task相关的流程,如果是的话,不做容错;
  • 当worker容错时,因为此时task_instance虽然显示的是RUNNING_EXECUTION,但是host不为空,所以不会触发kill并创建taskInstance;

流任务监控功能

列表查询:项目,流程名称,执行用户,起止时间

  1. 日志查看:yarn日志,k8s日志
  2. WebUI跳转:url由clusterId和任务类型获取
  3. kill操作:中止流任务
  4. restart:重启流任务