SpringBoot整合Canal:MySQL同步

系统目前能把MySQL的变更实时推送到Redis、Elasticsearch和各个微服务,线上已经稳定跑了好几个月,关键场景没出问题,缓存命中和搜索结果都跟数据库保持同步。

回溯一下过程:部署完SpringBoot客户端后,变更从Canal走到业务系统的整个链路被接通。用户表变更一旦被Canal捕获,马上会进到我们写的处理器里——针对bu_user_info的逻辑是第一时间清Redis相关key,必要时触发ES文档更新。订单状态变更也会被捕获,进而推送到业务流程里去发送通知或做统计更新。为了稳住高峰,我们在中间加了消息队列作缓冲,Canal把解析好的数据发到MQ,后端从MQ异步消费,削峰填谷的效果明显。

再往前看一下客户端的实现细节。项目里用了top.javatool的canal客户端包,但这个包里并没有直接把rabbitMQ或rocketMQ封好的实现,所以需要自己实现一个消息驱动。具体做法是实现
top.javatool.canal.client.handler.MessageHandler,把从MQ里拿到的消息转换并交给top.javatool的EntryHandler处理。EntryHandler的接口相对干净,按类型去写对应的处理方法就行。配置上,如果走RabbitMQ,改conf/canal.properties里相关的mq配置就可以;如果走RocketMQ,既要改conf/canal.properties,也要调整
conf/example/instance.properties里的实例设置。把这些改好后,Canal Server端会把变更发到MQ,客户端只要稳妥消费并把数据交给EntryHandler,业务逻辑就能按预期执行。

处理器的写法有讲究。我们把处理器做得尽量轻量,核心职责就是把变更事件解析成业务能用的结构,然后把具体的耗时操作(列如重建索引、远程调用)交给异步线程或MQ去处理。实际代码里会把一批变更聚合处理,特别是高并发场景,合并成批操作能省许多db和缓存的压力。异常方面,任何个别记录处理失败都不能把整个同步链卡住,所以捕获异常后要记录详细日志并把失败数据打到死信队列或重试队列里,保证不丢数据、也不堵流。

往前是Canal Server的部署步骤。下载官方或社区构建的Canal包,解压后需要调整两个地方:conf/canal.properties和
conf/example/instance.properties。instance配置里要填数据库连接信息、订阅的库表、以及消费位点等;canal.properties里有rpc、mq、日志等全局设置。改完配置后启动server,注意查看启动日志确认实例注册正常以及能连上MySQL。这里常见的问题是权限不够或binlog位置错误。

最早的准备工作是数据库端的配置。MySQL必须打开binlog,而且binlog_format要设置为ROW模式,这样Canal才能拿到行级别的变更数据。再创建一个专用账号给Canal用,权限至少包括REPLICATION SLAVE和REPLICATION CLIENT,必要时还要给SELECT权限以便读取表结构。细节上要确保binlog的文件保留时间够长,防止Canal掉线期间binlog被清掉导致数据丢失。

说点实现上的原理:Canal通过模拟MySQL的从节点,向主库发送dump协议,拿到二进制日志(binlog),然后把这些日志解析成事件格式,供客户端消费。这个过程里,解析出的每一个insert/update/delete,都会带上变更前后的数据,EntryHandler根据这些信息判断应该做哪些业务动作。理解这条链路有助于定位问题:是Canal没拿到binlog、还是解析出错,还是客户端消费失败。

在具体场景应用上,Canal在我们项目里主要承担三类任务。第一类是缓存层的同步:用户信息、商品详情变更后需立即清理或更新Redis,避免脏读。第二类是搜索同步:把商品或内容变动实时推到Elasticsearch,保证搜索结果不滞后。第三类是触发型业务:订单流转、状态变更会触发一系列后续动作,列如发短信、统计数据更新等。这三类场景对响应时间和可靠性要求不同,按需设计处理器和队列策略就行。

配置和集成时碰到的坑也不少。top.javatool的自动装配包里对EntryHandler的设计比较友善,但从MQ到EntryHandler的桥接得自己写;MQ选型上,RabbitMQ适合轻量消息和复杂路由,RocketMQ在吞吐量上更有优势,配置点稍有差别。还有个细节:当把数据通过MQ转发时,记得把canal解析出的位点信息一并传递或记录,否则消费中断后难以定位消费进度。

从工程实践角度讲,有三点比较重大:把处理器做小、不要把长耗时逻辑放同步路径里;对高并发做批量合并;异常和重试机制要健全。按这套套路,生产环境运行平稳,团队也能比较容易维护。

代码和配置示例放在仓库里,按需查看实现细节:gitee:
https://gitee.com/jq_di/springcloud-template

© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容