阿里巴巴中间件-Canal

简介

img

**canal [kə’næl]**,译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。

基于日志增量订阅和消费的业务包括

  • 数据库镜像
  • 数据库实时备份
  • 索引构建和实时维护(拆分异构索引、倒排索引等)
  • 业务 cache 刷新
  • 带业务逻辑的增量数据处理

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

工作原理

MySQL主备复制原理

img

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

重要版本更新说明

  1. canal 1.1.x 版本(release_note),性能与功能层面有较大的突破,重要提升包括:
  1. canal 1.1.4版本,迎来最重要的WebUI能力,引入canal-admin工程,支持面向WebUI的canal动态管理能力,支持配置、任务、日志等在线白屏运维能力,具体文档:Canal Admin Guide

环境准备

  • 对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

    1
    2
    3
    4
    [mysqld]
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
    • 注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步
  • 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

    1
    2
    3
    4
    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;

启动

  • 下载 canal, 访问 release 页面 , 选择需要的包下载, 如以 1.0.17 版本为例

    1
    wget https://github.com/alibaba/canal/releases/download/canal-1.0.17/canal.deployer-1.0.17.tar.gz
  • 解压缩

    1
    2
    mkdir /tmp/canal
    tar zxvf canal.deployer-$version.tar.gz -C /tmp/canal
    • 解压完成后,进入 /tmp/canal 目录,可以看到如下结构

      1
      2
      3
      4
      drwxr-xr-x 2 jianghang jianghang  136 2013-02-05 21:51 bin
      drwxr-xr-x 4 jianghang jianghang 160 2013-02-05 21:51 conf
      drwxr-xr-x 2 jianghang jianghang 1.3K 2013-02-05 21:51 lib
      drwxr-xr-x 2 jianghang jianghang 48 2013-02-05 21:29 logs
  • 配置修改

    1
    vi conf/example/instance.properties
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    ## mysql serverId
    canal.instance.mysql.slaveId = 1234
    #position info,需要改成自己的数据库信息
    canal.instance.master.address = 127.0.0.1:3306
    canal.instance.master.journal.name =
    canal.instance.master.position =
    canal.instance.master.timestamp =
    #canal.instance.standby.address =
    #canal.instance.standby.journal.name =
    #canal.instance.standby.position =
    #canal.instance.standby.timestamp =
    #username/password,需要改成自己的数据库信息
    canal.instance.dbUsername = canal
    canal.instance.dbPassword = canal
    canal.instance.defaultDatabaseName =
    canal.instance.connectionCharset = UTF-8
    #table regex
    canal.instance.filter.regex = .\*\\\\..\*
    • canal.instance.connectionCharset 代表数据库的编码方式对应到 java 中的编码类型,比如 UTF-8,GBK , ISO-8859-1
    • 如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false
  • 启动

    1
    sh bin/startup.sh
  • 查看 server 日志

    1
    vi logs/canal/canal.log</pre>
    1
    2
    3
    2013-02-05 22:45:27.967 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
    2013-02-05 22:45:28.113 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[10.1.29.120:11111]
    2013-02-05 22:45:28.210 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
  • 查看 instance 的日志

    1
    vi logs/example/example.log
    1
    2
    3
    4
    2013-02-05 22:50:45.636 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
    2013-02-05 22:50:45.641 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
    2013-02-05 22:50:45.803 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
    2013-02-05 22:50:45.810 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....
  • 关闭

    1
    sh bin/stop.sh

Java 客户端

直接使用canal.example工程

默认 canal 使用 tcp 模式。

a. 首先启动Canal Server
b.

  1. 可以在eclipse里,直接打开com.alibaba.otter.canal.example.SimpleCanalClientTest,直接运行

  2. 在工程的example目录下运行命令行:

    1
    mvn exec:java -Dexec.mainClass="com.alibaba.otter.canal.example.SimpleCanalClientTest"
  3. 下载example包: https://github.com/alibaba/canal/releases,解压缩后,直接运行sh startup.sh脚本

c. 触发数据变更 d. 在控制台或者logs中查看,可以看到如下信息 :

1
2
3
4
5
6
7
================> binlog[mysql-bin.002579:508882822] , name[retl,xdual] , eventType : UPDATE , executeTime : 1368607728000 , delay : 4270ms
-------> before
ID : 1 update=false
X : 2013-05-15 11:43:42 update=false
-------> after
ID : 1 update=false
X : 2013-05-15 16:48:48 update=true

从头创建工程

依赖配置:

1
2
3
4
5
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
  1. 创建mvn标准工程:
1
mvn archetype:create -DgroupId=com.alibaba.otter -DartifactId=canal.sample

maven3.0.5以上版本舍弃了create,使用generate生成项目

1
mvn archetype:generate -DgroupId=com.alibaba.otter -DartifactId=canal.sample
  1. 修改pom.xml,添加依赖

  2. ClientSample代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package com.alibaba.otter.canal.sample;

import java.net.InetSocketAddress;
import java.util.List;

import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;

public class SimpleCanalClientExample {

public static void main(String args[]) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
11111), "example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}

connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}

System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}

private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}

RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}

EventType eventType = rowChage.getEventType();
System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));

for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------&gt; before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------&gt; after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}

private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
  1. 运行Client

首先启动Canal Server

启动Canal Client后,可以从控制台从看到类似消息:

1
2
3
4
empty count : 1
empty count : 2
empty count : 3
empty count : 4

此时代表当前数据库无变更数据

  1. 触发数据库变更
1
2
3
4
5
6
7
8
9
mysql> use test;
Database changed
mysql> CREATE TABLE `xdual` (
-> `ID` int(11) NOT NULL AUTO_INCREMENT,
-> `X` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
-> PRIMARY KEY (`ID`)
-> ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 ;
Query OK, 0 rows affected (0.06 sec)
mysql> insert into xdual(id,x) values(null,now());Query OK, 1 row affected (0.06 sec)

可以从控制台中看到:

1
2
3
4
5
6
7
empty count : 1
empty count : 2
empty count : 3
empty count : 4
================> binlog[mysql-bin.001946:313661577] , name[test,xdual] , eventType : INSERT
ID : 4 update=true
X : 2013-02-05 23:29:46 update=true

MQ 模式

基本说明

canal 1.1.1版本之后, 默认支持将canal server接收到的binlog数据直接投递到MQ, 目前默认支持的MQ系统有:

环境版本

  • 操作系统:CentOS release 6.6 (Final)
  • java版本: jdk1.8
  • canal 版本: 请下载最新的安装包,本文以当前v1.1.1 的canal.deployer-1.1.1.tar.gz为例
  • MySQL版本 :5.7.18
  • 注意 : 关闭所有机器的防火墙,同时注意启动可以相互telnet ip 端口

一、 安装zookeeper

参考:Zookeeper QuickStart

二、安装MQ

三、 安装canal.server

3.1 下载压缩包

到官网地址(release)下载最新压缩包,请下载 canal.deployer-latest.tar.gz

3.2 将canal.deployer 复制到固定目录并解压

1
2
3
mkdir -p /usr/local/canal
cp canal.deployer-1.1.6.tar.gz /usr/local/canal
tar -zxvf canal.deployer-1.1.6.tar.gz

3.3 配置修改参数

a. 修改instance 配置文件 vi conf/example/instance.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
##  按需修改成自己的数据库信息
#################################################
...
canal.instance.master.address=192.168.1.20:3306
## username/password,数据库的用户名和密码
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...
## mq config
canal.mq.topic=example
## 针对库名或者表名发送动态topic
##canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..*
canal.mq.partition=0
## hash partition config
##canal.mq.partitionsNum=3
##库名.表名: 唯一主键,多个表之间用逗号分隔
##canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################

对应ip 地址的MySQL 数据库需进行相关初始化与设置, 可参考 Canal QuickStart

b. 修改canal 配置文件vi /usr/local/canal/conf/canal.properties
1
2
3
4
5
6
7
8
9
10
11
## ...
## 可选项: tcp(默认), kafka,RocketMQ,rabbitmq,pulsarmq
canal.serverMode = kafka
## ...

## Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 50
## Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
## 是否为flat json格式对象
canal.mq.flatMessage = false

mq相关参数说明 (<=1.1.4版本)

参数名 参数说明 默认值
canal.mq.servers kafka为bootstrap.servers
rocketMQ中为nameserver列表
127.0.0.1:6667
canal.mq.retries 发送失败重试次数 0
canal.mq.batchSize kafka为ProducerConfig.BATCH_SIZE_CONFIG
rocketMQ无意义
16384
canal.mq.maxRequestSize kafka为ProducerConfig.MAX_REQUEST_SIZE_CONFIG
rocketMQ无意义
1048576
canal.mq.lingerMs kafka为ProducerConfig.LINGER_MS_CONFIG , 如果是flatMessage格式建议将该值调大, 如: 200
rocketMQ无意义
1
canal.mq.bufferMemory kafka为ProducerConfig.BUFFER_MEMORY_CONFIG
rocketMQ无意义
33554432
canal.mq.acks kafka为ProducerConfig.ACKS_CONFIG
rocketMQ无意义
all
canal.mq.kafka.kerberos.enable kafka为ProducerConfig.ACKS_CONFIG
rocketMQ无意义
false
canal.mq.kafka.kerberos.krb5FilePath kafka kerberos认证
rocketMQ无意义
../conf/kerberos/krb5.conf
canal.mq.kafka.kerberos.jaasFilePath kafka kerberos认证
rocketMQ无意义
../conf/kerberos/jaas.conf
canal.mq.producerGroup kafka无意义
rocketMQ为ProducerGroup名
Canal-Producer
canal.mq.accessChannel kafka无意义
rocketMQ为channel模式,如果为aliyun则配置为cloud
local
canal.mq.vhost= rabbitMQ配置
canal.mq.exchange= rabbitMQ配置
canal.mq.username= rabbitMQ配置
canal.mq.password= rabbitMQ配置
canal.mq.aliyunuid= rabbitMQ配置
canal.mq.canalBatchSize 获取canal数据的批次大小 50
canal.mq.canalGetTimeout 获取canal数据的超时时间 100
canal.mq.parallelThreadSize mq数据转换并行处理的并发度 8
canal.mq.flatMessage 是否为json格式
如果设置为false,对应MQ收到的消息为protobuf格式
需要通过CanalMessageDeserializer进行解码
false
canal.mq.topic mq里的topic名
canal.mq.dynamicTopic mq里的动态topic规则, 1.1.3版本支持
canal.mq.partition 单队列模式的分区下标, 1
canal.mq.partitionsNum 散列模式的分区数
canal.mq.partitionHash 散列规则定义
库名.表名 : 唯一主键,比如mytest.person: id
1.1.3版本支持新语法,见下文

mq相关参数说明 (>=1.1.5版本)

在1.1.5版本开始,引入了MQ Connector设计,因此参数配置做了部分调整

参数名 参数说明 默认值
canal.aliyun.accessKey 阿里云ak
canal.aliyun.secretKey 阿里云sk
canal.aliyun.uid 阿里云uid
canal.mq.flatMessage 是否为json格式
如果设置为false,对应MQ收到的消息为protobuf格式
需要通过CanalMessageDeserializer进行解码
false
canal.mq.canalBatchSize 获取canal数据的批次大小 50
canal.mq.canalGetTimeout 获取canal数据的超时时间 100
canal.mq.accessChannel = local 是否为阿里云模式,可选值local/cloud local
canal.mq.database.hash 是否开启database混淆hash,确保不同库的数据可以均匀分散,如果关闭可以确保只按照业务字段做MQ分区计算 true
canal.mq.send.thread.size MQ消息发送并行度 30
canal.mq.build.thread.size MQ消息构建并行度 8
—— ———– ——-
kafka.bootstrap.servers kafka服务端地址 127.0.0.1:9092
kafka.acks kafka为ProducerConfig.ACKS_CONFIG all
kafka.compression.type 压缩类型 none
kafka.batch.size kafka为ProducerConfig.BATCH_SIZE_CONFIG 16384
kafka.linger.ms kafka为ProducerConfig.LINGER_MS_CONFIG , 如果是flatMessage格式建议将该值调大, 如: 200 1
kafka.max.request.size kafka为ProducerConfig.MAX_REQUEST_SIZE_CONFIG 1048576
kafka.buffer.memory kafka为ProducerConfig.BUFFER_MEMORY_CONFIG 33554432
kafka.max.in.flight.requests.per.connection kafka为ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 1
kafka.retries 发送失败重试次数 0
kafka.kerberos.enable kerberos认证 false
kafka.kerberos.krb5.file kerberos认证 ../conf/kerberos/krb5.conf
kafka.kerberos.jaas.file kerberos认证 ../conf/kerberos/jaas.conf
—— ———– ——-
rocketmq.producer.group rocketMQ为ProducerGroup名 test
rocketmq.enable.message.trace 是否开启message trace false
rocketmq.customized.trace.topic message trace的topic
rocketmq.namespace rocketmq的namespace
rocketmq.namesrv.addr rocketmq的namesrv地址 127.0.0.1:9876
rocketmq.retry.times.when.send.failed 重试次数 0
rocketmq.vip.channel.enabled rocketmq是否开启vip channel false
rocketmq.tag rocketmq的tag配置 空值
rabbitmq.host rabbitMQ配置
rabbitmq.virtual.host rabbitMQ配置
rabbitmq.exchange rabbitMQ配置
rabbitmq.username rabbitMQ配置
rabbitmq.password rabbitMQ配置
rabbitmq.deliveryMode rabbitMQ配置
pulsarmq.serverUrl pulsarmq配置
pulsarmq.roleToken pulsarmq配置
pulsarmq.topicTenantPrefix pulsarmq配置
canal.mq.topic mq里的topic名
canal.mq.dynamicTopic mq里的动态topic规则, 1.1.3版本支持
canal.mq.partition 单队列模式的分区下标, 1
canal.mq.enableDynamicQueuePartition 动态获取MQ服务端的分区数,如果设置为true之后会自动根据topic获取分区数替换canal.mq.partitionsNum的定义,目前主要适用于RocketMQ false
canal.mq.partitionsNum 散列模式的分区数
canal.mq.dynamicTopicPartitionNum mq里的动态队列分区数,比如针对不同topic配置不同partitionsNum
canal.mq.partitionHash 散列规则定义
库名.表名 : 唯一主键,比如mytest.person: id
1.1.3版本支持新语法,见下文

canal.mq.dynamicTopic 表达式说明

canal 1.1.3版本之后, 支持配置格式:schema 或 schema.table,多个配置之间使用逗号或分号分隔

  • 例子1:test\\.test 指定匹配的单表,发送到以test_test为名字的topic上
  • 例子2:.*\\..* 匹配所有表,则每个表都会发送到各自表名的topic上
  • 例子3:test 指定匹配对应的库,一个库的所有表都会发送到库名的topic上
  • 例子4:test\\..* 指定匹配的表达式,针对匹配的表会发送到各自表名的topic上
  • 例子5:test,test1\\.test1,指定多个表达式,会将test库的表都发送到test的topic上,test1\\.test1的表发送到对应的test1_test1 topic上,其余的表发送到默认的canal.mq.topic值

为满足更大的灵活性,允许对匹配条件的规则指定发送的topic名字,配置格式:topicName:schema 或 topicName:schema.table

  • 例子1: test:test\\.test 指定匹配的单表,发送到以test为名字的topic上
  • 例子2: test:.*\\..* 匹配所有表,因为有指定topic,则每个表都会发送到test的topic下
  • 例子3: test:test 指定匹配对应的库,一个库的所有表都会发送到test的topic下
  • 例子4:testA:test\\..* 指定匹配的表达式,针对匹配的表会发送到testA的topic下
  • 例子5:test0:test,test1:test1\\.test1,指定多个表达式,会将test库的表都发送到test0的topic下,test1\\.test1的表发送到对应的test1的topic下,其余的表发送到默认的canal.mq.topic值

大家可以结合自己的业务需求,设置匹配规则,建议MQ开启自动创建topic的能力

canal.mq.partitionHash 表达式说明

canal 1.1.3版本之后, 支持配置格式:schema.table:pk1^pk2,多个配置之间使用逗号分隔

  • 例子1:test\\.test:pk1^pk2 指定匹配的单表,对应的hash字段为pk1 + pk2
  • 例子2:.*\\..*:id 正则匹配,指定所有正则匹配的表对应的hash字段为id
  • 例子3:.*\\..*:$pk$ 正则匹配,指定所有正则匹配的表对应的hash字段为表主键(自动查找)
  • 例子4: 匹配规则啥都不写,则默认发到0这个partition上
  • 例子5:.*\\..* ,不指定pk信息的正则匹配,将所有正则匹配的表,对应的hash字段为表名
    • 按表hash: 一张表的所有数据可以发到同一个分区,不同表之间会做散列 (会有热点表分区过大问题)
  • 例子6: test\\.test:id,.\\..* , 针对test的表按照id散列,其余的表按照table散列

注意:大家可以结合自己的业务需求,设置匹配规则,多条匹配规则之间是按照顺序进行匹配(命中一条规则就返回)

其他详细参数可参考Canal AdminGuide

mq顺序性问题

binlog本身是有序的,写入到mq之后如何保障顺序是很多人会比较关注,在issue里也有非常多人咨询了类似的问题,这里做一个统一的解答

  1. canal目前选择支持的kafka/rocketmq,本质上都是基于本地文件的方式来支持了分区级的顺序消息的能力,也就是binlog写入mq是可以有一些顺序性保障,这个取决于用户的一些参数选择
  2. canal支持MQ数据的几种路由方式:单topic单分区,单topic多分区、多topic单分区、多topic多分区
  • canal.mq.dynamicTopic,主要控制是否是单topic还是多topic,针对命中条件的表可以发到表名对应的topic、库名对应的topic、默认topic name
  • canal.mq.partitionsNum、canal.mq.partitionHash,主要控制是否多分区以及分区的partition的路由计算,针对命中条件的可以做到按表级做分区、pk级做分区等
  1. canal的消费顺序性,主要取决于描述2中的路由选择,举例说明:
  • 单topic单分区,可以严格保证和binlog一样的顺序性,缺点就是性能比较慢,单分区的性能写入大概在2~3k的TPS
  • 多topic单分区,可以保证表级别的顺序性,一张表或者一个库的所有数据都写入到一个topic的单分区中,可以保证有序性,针对热点表也存在写入分区的性能问题
  • 单topic、多topic的多分区,如果用户选择的是指定table的方式,那和第二部分一样,保障的是表级别的顺序性(存在热点表写入分区的性能问题),如果用户选择的是指定pk hash的方式,那只能保障的是一个pk的多次binlog顺序性
    ** pk hash的方式需要业务权衡,这里性能会最好,但如果业务上有pk变更或者对多pk数据有顺序性依赖,就会产生业务处理错乱的情况. 如果有pk变更,pk变更前和变更后的值会落在不同的分区里,业务消费就会有先后顺序的问题,需要注意

MQ发送性能数据

1.1.5版本可以在5k~50k左右,具体可参考:[[Canal-MQ-Performance]]

阿里云RocketMQ对接参数

1
2
3
4
5
6
7
8
9
## 配置ak/sk
canal.aliyun.accessKey = XXX
canal.aliyun.secretKey = XXX
## 配置topic
canal.mq.accessChannel = cloud
canal.mq.servers = 内网接入点
canal.mq.producerGroup = GID_**group(在后台创建)
canal.mq.namespace = rocketmq实例id
canal.mq.topic=(在后台创建)

kafka ssl配置参数

1
2
3
4
5
## canal.properties配置文件

kafka.kerberos.enable = true
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"

3.4 启动

1
2
cd /usr/local/canal/
sh bin/startup.sh

3.5 查看日志

a.查看 logs/canal/canal.log

1
vi logs/canal/canal.log

b. 查看instance的日志:

1
vi logs/example/example.log

3.6 关闭

1
2
cd /usr/local/canal/
sh bin/stop.sh

3.7 MQ数据消费

canal.client下有对应的MQ数据消费的样例工程,包含数据编解码的功能