binlog日志实时采集,支持按事物聚合输入到flink udf中进行处理
binlog数据源的实现
- 通过canel获取mysql的binlog事件(CanalEntry.RowChange)
- flinkx通过BinlogRowConverter实现日志数据的压平(新增before/after字段),添加insert/update/delete事件类型,put到内部的linkedBlockingQueue,然后BinlogInputFormat(Flink的SourceFunction)提供nextRecordInternal方法poll队列数据
- flink通过checkpoint将canal的同步状态持久化,state对象为EntryPosition,flinkx的metric和state数据都存储在FormatState对象中
checkpoint如何做到不丢
canal的EventParser在每个transaction的sink
后会获取当前事物的position,然后persistLogPosition持久化点位信息。
flinkx实现了canal的logPositionManager,persistLogPosition会做2个动作:
- flink状态更新:format.setEntryPosition(logPosition.getPostion())
- 本地缓存更新:logPositionCache.put(destination, logPosition);
flinkx的binlogReader的实现中,canal的sink是通过queue异步处理的,由flink的DtInputFormatSourceFunction在执行nextRecord时从queue中poll出来处理row。
那么问题来了,
如果此时server断电flink程序异常中断,format的state已经往前走,但是异步处理比较慢,还没处理完被异常中断了,
重启时读取的checkpoint的点位是后面的position,会导致有些日志数据未被处理。
canal会在buffer中缓存binlog事件,然后一个transaction输出一次
- MysqlMultiStageCoprocessor$SinkStoreStage.onEvent()接受事件.
- 获得事件(MessageEvent)里的内容(CanalEntry.Entry).
- 调用:EventTransactionBuffer.add(CanalEntry.Entry).
- EventTransactionBuffer.add方法的会判断:CanalEntry.Entry是什么类型.
- 如果:CanalEntry.Entry.getEntryType为:TRANSACTIONBEGIN/TRANSACTIONEND/HEARTBEAT就刷新缓存,并把CanalEntry.Entry添加到缓存中.这样意味着:每一次事务,刷新一次.
- 如果:CanalEntry.Entry.getEntryType为:ROWDATA,则CanalEntry.Entry添加到缓存中.
- 最终会:调用flush时,集合一批数据,并回调:TransactionFlushCallback.flush(List).
- TransactionFlushCallback.flush方法,会调用:CanalEventSink.sink方法进行数据的发送.
pavingData
gtid模式
show global variables like 'gtid%'
1
2
3
4
5
6
7
8
9 gtid_executed,"3528d5b7-e86d-11ea-8e5b-0242ac110004:9-7635928:7635930-7648258:7648260-7657211:7657213-7751431:7751433-7887164:7887166-7888815:7888817-7891772:7891774-7946407:7946409-8012629:8012631-8029084:8029086-8030188:8030190-14238956:14238958-24160895,
5d0b498d-e388-11ea-a9ff-0242ac110004:2853-3315:3317-3468:170523-170632:170634-177867,
66587608-45f0-11ea-85fe-001a4a1601db:1-13827942"
gtid_executed_compression_period,1000
gtid_mode,ON
gtid_owned,""
gtid_purged,"3528d5b7-e86d-11ea-8e5b-0242ac110004:9-7635928:7635930-7648258:7648260-7657211:7657213-7751431:7751433-7887164:7887166-7888815:7888817-7891772:7891774-7946407:7946409-8012629:8012631-8029084:8029086-8030188:8030190-14238956:14238958-22753778,
5d0b498d-e388-11ea-a9ff-0242ac110004:2853-3315:3317-3468:170523-170632:170634-177867,
66587608-45f0-11ea-85fe-001a4a1601db:1-12593227"
一个GTID由两部分组成:server id uuid 与递增序号,两者之间用英文冒号隔开,例如:1f0eee4c-a66e-11ea-8999-00dbdfe417b8:1。
- gtid_executed
当前MySQL实现已经执行过的事务。在开启GTID模块时每执行一个事务会产生一个全局唯一的事务ID。在每一台MySQL实例上执行的事务何止上亿,这个字段要存储所有已执行的的事务ID,怎么存储能节省空间就是一个需要解决的问题,稍后再进行展开说明。 - gtid_executed_compression_period
在MySQL5.7版本专门引入了一个系统表:mysql.gtid_executed,gtid_executed_compression_period 参数就是设置每执行多个事务,对这个表进行压缩,默认值为1000。 - gtid_mode
是否开启gtid模式。 - gtid_purged
已不在 binlog 日志中的事务ID,Mysql 并不会永久存储 binlog 日志,而是通过 expire_logs_days 设置过期时间,单位为天,默认为10天。 - gtid_next
session级别的变量,下一个gtid - gtid_owned
正在运行的gtid - enforce_gtid_consistency
保证GTID安全的参数