0%

Flinkx同步BinLog数据

binlog日志实时采集,支持按事物聚合输入到flink udf中进行处理

binlog数据源的实现

  • 通过canel获取mysql的binlog事件(CanalEntry.RowChange)
  • flinkx通过BinlogRowConverter实现日志数据的压平(新增before/after字段),添加insert/update/delete事件类型,put到内部的linkedBlockingQueue,然后BinlogInputFormat(Flink的SourceFunction)提供nextRecordInternal方法poll队列数据
  • flink通过checkpoint将canal的同步状态持久化,state对象为EntryPosition,flinkx的metric和state数据都存储在FormatState对象中

checkpoint如何做到不丢

canal的EventParser在每个transaction的sink后会获取当前事物的position,然后persistLogPosition持久化点位信息。
flinkx实现了canal的logPositionManager,persistLogPosition会做2个动作:

  1. flink状态更新:format.setEntryPosition(logPosition.getPostion())
  2. 本地缓存更新:logPositionCache.put(destination, logPosition);

flinkx的binlogReader的实现中,canal的sink是通过queue异步处理的,由flink的DtInputFormatSourceFunction在执行nextRecord时从queue中poll出来处理row。
那么问题来了,
如果此时server断电flink程序异常中断,format的state已经往前走,但是异步处理比较慢,还没处理完被异常中断了,
重启时读取的checkpoint的点位是后面的position,会导致有些日志数据未被处理。

canal会在buffer中缓存binlog事件,然后一个transaction输出一次

  1. MysqlMultiStageCoprocessor$SinkStoreStage.onEvent()接受事件.
  2. 获得事件(MessageEvent)里的内容(CanalEntry.Entry).
  3. 调用:EventTransactionBuffer.add(CanalEntry.Entry).
  4. EventTransactionBuffer.add方法的会判断:CanalEntry.Entry是什么类型.
    • 如果:CanalEntry.Entry.getEntryType为:TRANSACTIONBEGIN/TRANSACTIONEND/HEARTBEAT就刷新缓存,并把CanalEntry.Entry添加到缓存中.这样意味着:每一次事务,刷新一次.
    • 如果:CanalEntry.Entry.getEntryType为:ROWDATA,则CanalEntry.Entry添加到缓存中.
  5. 最终会:调用flush时,集合一批数据,并回调:TransactionFlushCallback.flush(List).
  6. TransactionFlushCallback.flush方法,会调用:CanalEventSink.sink方法进行数据的发送.

pavingData

gtid模式

show global variables like 'gtid%'

1
2
3
4
5
6
7
8
9
gtid_executed,"3528d5b7-e86d-11ea-8e5b-0242ac110004:9-7635928:7635930-7648258:7648260-7657211:7657213-7751431:7751433-7887164:7887166-7888815:7888817-7891772:7891774-7946407:7946409-8012629:8012631-8029084:8029086-8030188:8030190-14238956:14238958-24160895,
5d0b498d-e388-11ea-a9ff-0242ac110004:2853-3315:3317-3468:170523-170632:170634-177867,
66587608-45f0-11ea-85fe-001a4a1601db:1-13827942"
gtid_executed_compression_period,1000
gtid_mode,ON
gtid_owned,""
gtid_purged,"3528d5b7-e86d-11ea-8e5b-0242ac110004:9-7635928:7635930-7648258:7648260-7657211:7657213-7751431:7751433-7887164:7887166-7888815:7888817-7891772:7891774-7946407:7946409-8012629:8012631-8029084:8029086-8030188:8030190-14238956:14238958-22753778,
5d0b498d-e388-11ea-a9ff-0242ac110004:2853-3315:3317-3468:170523-170632:170634-177867,
66587608-45f0-11ea-85fe-001a4a1601db:1-12593227"

一个GTID由两部分组成:server id uuid 与递增序号,两者之间用英文冒号隔开,例如:1f0eee4c-a66e-11ea-8999-00dbdfe417b8:1。

  • gtid_executed
    当前MySQL实现已经执行过的事务。在开启GTID模块时每执行一个事务会产生一个全局唯一的事务ID。在每一台MySQL实例上执行的事务何止上亿,这个字段要存储所有已执行的的事务ID,怎么存储能节省空间就是一个需要解决的问题,稍后再进行展开说明。
  • gtid_executed_compression_period
    在MySQL5.7版本专门引入了一个系统表:mysql.gtid_executed,gtid_executed_compression_period 参数就是设置每执行多个事务,对这个表进行压缩,默认值为1000。
  • gtid_mode
    是否开启gtid模式。
  • gtid_purged
    已不在 binlog 日志中的事务ID,Mysql 并不会永久存储 binlog 日志,而是通过 expire_logs_days 设置过期时间,单位为天,默认为10天。
  • gtid_next
    session级别的变量,下一个gtid
  • gtid_owned
    正在运行的gtid
  • enforce_gtid_consistency
    保证GTID安全的参数

参考