Canal的使用方法

Canal的使用方法

在分布式系统中,数据同步是绕不开的话题 —— 列如用户下单后,订单数据需要实时同步到数据分析平台做库存计算,或者同步到搜索服务构建索引。而提到 MySQL 数据同步,阿里巴巴开源的Canal绝对是绕不开的利器。它轻量、高性能、实时性强,早已成为业内增量同步 MySQL 数据的首选方案之一。

一、为什么需要 Canal?先搞懂它的核心原理

在聊怎么用之前,我们得先清楚:Canal 到底是怎么实现 MySQL 数据同步的?

1.1 本质:模拟 MySQL 从库的 “复制逻辑”

MySQL 主从复制的原理你肯定不陌生:

1. 主库(Master)将数据变更记录到binlog(二进制日志);

2. 从库(Slave)连接主库,发送dump命令,请求获取 binlog;

3. 主库的 IO 线程将 binlog 日志传给从库;

4. 从库的 IO 线程将接收到的 binlog 写入本地relay log(中继日志),SQL 线程再解析中继日志,执行 SQL 重现数据变更。

而 Canal 的核心思路,就是模拟一个 MySQL 从库

1. Canal 伪装成从库,向 MySQL 主库发送dump请求;

2. 主库将 binlog 推送给 Canal;

3. Canal 解析 binlog 日志(将二进制格式转成可读的结构化数据);

4. 开发者通过 Canal Client 消费这些解析后的数据,同步到其他系统(如 ES、Redis、Kafka 等)。

这种方式的优势很明显:

5. 无侵入:不需要在 MySQL 主库写任何业务代码,仅依赖 binlog;

6. 实时性强:binlog 实时生成,Canal 实时解析,延迟一般在毫秒级;

7. 低性能损耗:主库推送 binlog 的开销极小,不会影响业务运行。

1.2 Canal 的核心组件

Canal 的架构主要分为两部分,部署和使用时需要重点关注:

1、 Canal Server(服务端)

1. 负责连接 MySQL 主库,订阅 binlog;

2. 解析 binlog 日志,将二进制数据转成结构化的Event(如插入、更新、删除事件);

3. 提供数据接口,供 Client 端获取同步数据。

2、 Canal Client(客户端)

1. 连接 Canal Server,拉取解析后的Event数据;

2. 自定义业务逻辑,将数据同步到目标存储(如写入 ES、发送到 Kafka、更新 Redis 缓存等);

3. 支持断点续传(记录消费位置),避免数据丢失。

二、实战第一步:Canal Server 部署(以 Linux 为例)

接下来我们从 0 开始,搭建 Canal Server 环境。在此之前,需要先满足两个前提条件:

2.1 前提:配置 MySQL 主库

Canal 依赖 MySQL 的 binlog,所以必须先确保主库开启 binlog,且配置正确:

1. 编辑 MySQL 配置文件(如/etc/my.cnf),添加以下配置:

# 开启binlog,日志格式必须为ROW(Canal仅支持ROW格式)
log-bin=mysql-bin
binlog-format=ROW # 主库唯一ID(从库也需要,Canal伪装从库时会用)
server-id=1
# 可选:指定需要同步的数据库(不指定则同步所有库)
binlog-do-db=test_db # 替换为你的业务数据库名

2. 重启 MySQL,验证配置是否生效:

— 执行后若显示ON,说明binlog已开启
show variables like 'log_bin';
— 查看binlog格式(需为ROW)
show variables like 'binlog_format';

3. 创建 Canal 专属账号,并授予权限:

— 创建账号(用户名:canal,密码:Canal@123)
CREATE USER 'canal'@'%' IDENTIFIED BY 'Canal@123';
— 授予复制权限(Canal需要模拟从库,必须有此权限)
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
— 刷新权限
FLUSH PRIVILEGES;

2.2 下载并部署 Canal Server

Canal 的官方下载地址:https://github.com/alibaba/canal/releases,提议选择稳定版(如 1.1.7)。

1. 下载并解压安装包:

# 下载(替换为最新稳定版链接)
wget
https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz

# 创建安装目录
mkdir -p /opt/canal
# 解压到安装目录
tar -zxvf
canal.deployer-1.1.7.tar.gz -C /opt/canal

2. 配置 Canal Server:

进入配置目录,修改instance.properties(每个 “实例” 对应一个 MySQL 主库的同步配置):

cd /opt/canal/conf/example # example是默认实例名,可自定义
vim instance.properties

关键配置项修改如下(其他保持默认):

# 1. MySQL主库地址(IP:端口)

canal.instance.master.address=192.168.1.100:3306

# 2. MySQL账号密码(刚才创建的canal账号)
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@123
# 3. 同步的数据库(与MySQL配置的binlog-do-db一致)

canal.instance.defaultDatabaseName=test_db

# 4. 从哪个binlog文件开始同步(首次同步可留空,Canal会自动找最新的)

canal.instance.master.journal.name=

# 5. 从binlog的哪个位置开始同步(首次同步可留空)

canal.instance.master.position=

3. 启动 Canal Server:

# 进入bin目录
cd /opt/canal/bin
# 启动
sh startup.sh
# 查看日志,验证是否启动成功(无ERROR则正常)
tail -f
/opt/canal/logs/canal/canal.log

tail -f
/opt/canal/logs/example/example.log

三、编写 Canal Client,实现数据同步

Canal Server 启动后,就需要编写 Client 端代码,消费同步过来的数据。这里以 Java 为例,演示如何将 MySQL 的user表数据同步到本地日志(实际业务中可替换为同步到 ES、Redis 等)。

3.1 引入依赖(Maven)

在pom.xml中添加 Canal Client 的依赖:

<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.7</version>
<!– 与Server版本保持一致 –>
</dependency>

3.2 编写 Client 核心代码

核心逻辑:连接 Canal Server,循环拉取数据,解析数据并处理业务逻辑。

import
com.alibaba.otter.canal.client.CanalConnector;

import
com.alibaba.otter.canal.client.CanalConnectors;

import
com.alibaba.otter.canal.protocol.CanalEntry;

import
com.alibaba.otter.canal.protocol.Message;

import
java.net.InetSocketAddress;

import java.util.List;
public class CanalClientDemo {
public static void main(String[] args) {
// 1. 配置Canal Server地址、实例名、账号密码(默认账号密码都是canal)
String host = “192.168.1.101”; // Canal Server所在服务器IP
int port = 11111; // Canal默认端口
String destination = “example”; // 实例名(与Server配置的一致)
String username = “canal”;
String password = “canal”;
// 2. 创建Canal连接器
CanalConnector connector =
CanalConnectors.newSingleConnector(

new InetSocketAddress(host, port),
destination,
username,
password
);
try {
// 3. 连接Canal Server
connector.connect();
// 4. 订阅数据(*.*表明订阅所有库所有表,也可指定如test_db.user)
connector.subscribe(“test_db.user”);
// 5. 回滚到上次消费的位置(避免重复消费)
connector.rollback();
// 6. 循环拉取数据(实际业务中可放在线程池里)
while (true) {
// 拉取数据(1024表明一次最多拉取1024条记录)
Message message = connector.getWithoutAck(1024);
// 获取批次ID
long batchId = message.getId();
// 获取数据条数
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
// 没有数据,休眠1秒再拉取(避免空轮询消耗CPU)
Thread.sleep(1000);
} else {
// 7. 解析并处理数据
handleMessage(message.getEntries());
// 8. 确认消费成功(Canal会记录消费位置,下次从这里开始)
connector.ack(batchId);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 9. 关闭连接器
connector.disconnect();
}
}
/**
* 解析Canal消息,处理数据变更
*/
private static void handleMessage(List<CanalEntry.Entry> entries) throws Exception {
for (CanalEntry.Entry entry : entries) {
// 过滤非ROWData类型的日志(如事务开始、结束的日志)
if (entry.getEntryType() !=
CanalEntry.EntryType.ROWDATA) {

continue;
}
// 解析binlog日志的内容(转成JSON格式,方便查看)
CanalEntry.RowChange rowChange =
CanalEntry.RowChange.parseFrom(entry.getStoreValue());

// 获取数据变更类型(INSERT/UPDATE/DELETE)
CanalEntry.EventType eventType = rowChange.getEventType();
// 获取表名
String tableName = entry.getHeader().getTableName();
System.out.printf(“=== 表名:%s,变更类型:%s ===
“, tableName, eventType);
// 遍历每一行数据
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (eventType ==
CanalEntry.EventType.DELETE) {

// 处理删除操作(旧数据)
printColumn(
rowData.getBeforeColumnsList());

} else if (eventType ==
CanalEntry.EventType.INSERT) {

// 处理插入操作(新数据)
printColumn(
rowData.getAfterColumnsList());

} else {
// 处理更新操作(旧数据和新数据)
System.out.println(“旧数据:”);
printColumn(
rowData.getBeforeColumnsList());

System.out.println(“新数据:”);
printColumn(
rowData.getAfterColumnsList());

}
}
}
}
/**
* 打印列数据(字段名+字段值)
*/
private static void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.printf(“字段名:%s,字段值:%s,是否为主键:%s
“,
column.getName(),
column.getValue(),
column.getIsKey() ? “是” : “否”);
}
}
}

Canal 作为阿里开源的 MySQL 同步工具,凭借其 “无侵入、高性能、实时性强” 的特点,成为了分布式系统中数据同步的首选方案。本文从原理到实战,介绍了 Canal 的部署、Client 编写和进阶用法,希望能协助你快速上手。

实际业务中,Canal 的应用场景远不止于此 —— 列如同步数据到 ES 构建搜索索引、同步到 Redis 做缓存预热、同步到数据仓库做离线。

© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容