From 3e57abe9ca254531fc09da9f37d125a2cb238e34 Mon Sep 17 00:00:00 2001 From: story Date: Sat, 23 Dec 2023 00:47:55 +0800 Subject: [PATCH] canal --- .../Canal\351\203\250\347\275\262.md" | 261 ++++++++++++++++-- 1 file changed, 245 insertions(+), 16 deletions(-) diff --git "a/docs/linux/applications/Canal\351\203\250\347\275\262.md" "b/docs/linux/applications/Canal\351\203\250\347\275\262.md" index 77191fb16..b6615e48d 100644 --- "a/docs/linux/applications/Canal\351\203\250\347\275\262.md" +++ "b/docs/linux/applications/Canal\351\203\250\347\275\262.md" @@ -4,35 +4,57 @@ canal官方仓库:`https://github.com/alibaba/canal/` wiki:`https://github.com/alibaba/canal/wiki` -canal的用途是基于mysql的binlog日志解析,提供增量数据的订阅和消费。简单的场景:通过canal监控mysql数据变更从而及时更新redis中对应的缓存。 +canal的用途是基于mysql的binlog日志解析,提供增量数据的订阅和消费。简单的场景:通过canal监控mysql数据变更从而及时更新redis中对应的缓存或更新es中的文档内容 -本文主要介绍canal在服务器端的部署,包括`canal-admin`,`canal-tsdb配置`以及`instance`配置。 +本文主要介绍canal在服务器端的部署,包括`canal-admin`,`canal-tsdb配置`以及`instance`配置。mysql版本为5.7,系统为macOS14.2.1。 -首先需要下载canal的发行版,下载地址:https://github.com/alibaba/canal/releases,可自行选择版本,这里直接选择最新的v1.1.5。系统版本为centos7。 +首先需要下载canal的发行版,下载地址:`https://github.com/alibaba/canal/releases`,可自行选择版本,这里选择1.1.4。 + +## mysql + +自建MySQL,需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下 + +```ini +[mysqld] +log-bin=mysql-bin # 开启 binlog +binlog-format=ROW # 选择 ROW 模式 +server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复 +``` ## canal admin - 解压 ```bash -tar zxvf canal.admin-1.1.5.tar.gz -C /home/install/canal/admin +mkdir -p ~/Downloads/canal/admin && tar -zxvf canal.admin-1.1.4.tar.gz -C ~/Downloads/canal/admin ``` - 执行conf文件夹中的canal_manager.sql建表 + ```shell + mysql -uroot -p < ~/Downloads/canal/admin/conf/canal_manager.sql + ``` + + + ![image-20210617000438523](https://storyxc.com/images/blog//image-20210617000438523.png) - 修改conf文件夹中的application.yml +- 创建`canal`用户并授权`canal`链接 MySQL 账号具有作为 MySQL slave 的权限 - ![image-20210617001123953](https://storyxc.com/images/blog//image-20210617001123953.png) + ```sql + CREATE USER canal IDENTIFIED BY 'canal'; + GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; + -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; + FLUSH PRIVILEGES; + ``` - - 开放admin控制台的端口,以默认的8089为例 + - 开放端口:`firewall-cmd --zone=public --add-port=8089/tcp --permanent` + 修改conf文件夹中的application.yml - 配置立即生效:`firewall-cmd --reload` + ![image-20231223000510198](https://storyxc.com/images/blog/b0c9b225-f1ee-4bec-b2b2-fe32122ff354.png) -- 执行bin目录的startup.sh +- 执行admin/bin目录的startup.sh - 访问8089端口 @@ -42,7 +64,13 @@ tar zxvf canal.admin-1.1.5.tar.gz -C /home/install/canal/admin - 使用默认账号密码 admin/123456即可登录 -- ![image-20210617001457272](https://storyxc.com/images/blog//image-20210617001457272.png) + ::: tip + + `application.yml`中的canal.adminUser和canal.adminPasswd并非WebUI的登录用户名和密码,而是和canal-server交互用的 + + ::: + + ![image-20210617001457272](https://storyxc.com/images/blog//image-20210617001457272.png) @@ -54,14 +82,14 @@ tar zxvf canal.admin-1.1.5.tar.gz -C /home/install/canal/admin > > 简单解释: > -> 1. instance因为是最原始的业务订阅诉求,它会和 server/集群 这两个面向资源服务属性的进行关联,比如instance A绑定到server A上或者集群 A上, +> 1. instance是最原始的业务订阅诉求,它会和 server/集群 这两个面向资源服务属性的进行关联,比如instance A绑定到server A上或者集群 A上, > 2. 有了任务和资源的绑定关系后,对应的资源服务就会接收到这个任务配置,在对应的资源上动态加载instance,并提供服务 > - 动态加载的过程,对应配置文件中的autoScan配置,只不过基于canal-admin之后可就以变为远程的web操作,而不需要在机器上运维配置文件 > 3. 将server抽象成资源之后,原本canal-server运行所需要的canal.properties/instance.properties配置文件就需要在web ui上进行统一运维,每个server只需要以最基本的启动配置 (比如知道一下canal-admin的manager地址,以及访问配置的账号、密码即可) - 新建server,按照图中配置即可 - ![image-20210617001928077](https://storyxc.com/images/blog//image-20210617001928077.png) + ![image-20231223001200351](https://storyxc.com/images/blog/04729a10-70fe-440a-ba4f-f28238ca3295.png) > 配置项: > @@ -72,17 +100,218 @@ tar zxvf canal.admin-1.1.5.tar.gz -C /home/install/canal/admin > - tcp端口,canal提供netty数据订阅服务的端口 > - metric端口, promethues的exporter监控数据端口 (未来会对接监控) +### server端配置 + +- 修改为kafka模式并配置kafka相关参数 +- tsdb改为mysql + +```properties +################################################# +######### common argument ############# +################################################# +# tcp bind ip +canal.ip = +# register ip to zookeeper +canal.register.ip = +canal.port = 11111 +canal.metrics.pull.port = 11112 +# canal instance user/passwd +canal.user = canal +canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458 + +# canal admin config +canal.admin.manager = 127.0.0.1:8089 +canal.admin.port = 11110 +canal.admin.user = admin +canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 + +canal.zkServers = +# flush data to zk +canal.zookeeper.flush.period = 1000 +canal.withoutNetty = false +# tcp, kafka, RocketMQ +canal.serverMode = kafka +# flush meta cursor/parse position to file +canal.file.data.dir = ${canal.conf.dir} +canal.file.flush.period = 1000 +## memory store RingBuffer size, should be Math.pow(2,n) +canal.instance.memory.buffer.size = 16384 +## memory store RingBuffer used memory unit size , default 1kb +canal.instance.memory.buffer.memunit = 1024 +## meory store gets mode used MEMSIZE or ITEMSIZE +canal.instance.memory.batch.mode = MEMSIZE +canal.instance.memory.rawEntry = true + +## detecing config +canal.instance.detecting.enable = false +#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now() +canal.instance.detecting.sql = select 1 +canal.instance.detecting.interval.time = 3 +canal.instance.detecting.retry.threshold = 3 +canal.instance.detecting.heartbeatHaEnable = false + +# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery +canal.instance.transaction.size = 1024 +# mysql fallback connected to new master should fallback times +canal.instance.fallbackIntervalInSeconds = 60 + +# network config +canal.instance.network.receiveBufferSize = 16384 +canal.instance.network.sendBufferSize = 16384 +canal.instance.network.soTimeout = 30 + +# binlog filter config +canal.instance.filter.druid.ddl = true +canal.instance.filter.query.dcl = false +canal.instance.filter.query.dml = false +canal.instance.filter.query.ddl = false +canal.instance.filter.table.error = false +canal.instance.filter.rows = false +canal.instance.filter.transaction.entry = false + +# binlog format/image check +canal.instance.binlog.format = ROW,STATEMENT,MIXED +canal.instance.binlog.image = FULL,MINIMAL,NOBLOB + +# binlog ddl isolation +canal.instance.get.ddl.isolation = false + +# parallel parser config +canal.instance.parser.parallel = true +## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors() +#canal.instance.parser.parallelThreadSize = 16 +## disruptor ringbuffer size, must be power of 2 +canal.instance.parser.parallelBufferSize = 256 + +# table meta tsdb info +canal.instance.tsdb.enable = true +#canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:} +#canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL; +canal.instance.tsdb.dbUsername = canal +canal.instance.tsdb.dbPassword = canal +# dump snapshot interval, default 24 hour +canal.instance.tsdb.snapshot.interval = 24 +# purge snapshot expire , default 360 hour(15 days) +canal.instance.tsdb.snapshot.expire = 360 + +# aliyun ak/sk , support rds/mq +canal.aliyun.accessKey = +canal.aliyun.secretKey = + +################################################# +######### destinations ############# +################################################# +canal.destinations = +# conf root dir +canal.conf.dir = ../conf +# auto scan instance dir add/remove and start/stop instance +canal.auto.scan = true +canal.auto.scan.interval = 5 + +#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml +canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml + +canal.instance.global.mode = manager +canal.instance.global.lazy = false +canal.instance.global.manager.address = ${canal.admin.manager} +#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml +canal.instance.global.spring.xml = classpath:spring/file-instance.xml +#canal.instance.global.spring.xml = classpath:spring/default-instance.xml + +################################################## +######### MQ ############# +################################################## +canal.mq.servers = 127.0.0.1:9092 +canal.mq.retries = 0 +canal.mq.batchSize = 16384 +canal.mq.maxRequestSize = 1048576 +canal.mq.lingerMs = 100 +canal.mq.bufferMemory = 33554432 +canal.mq.canalBatchSize = 50 +canal.mq.canalGetTimeout = 100 +canal.mq.flatMessage = true +canal.mq.compressionType = none +canal.mq.acks = all +#canal.mq.properties. = +canal.mq.producerGroup = canal-test +# Set this value to "cloud", if you want open message trace feature in aliyun. +canal.mq.accessChannel = local +# aliyun mq namespace +#canal.mq.namespace = + +################################################## +######### Kafka Kerberos Info ############# +################################################## +canal.mq.kafka.kerberos.enable = false +canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf" +canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf" +``` + ## canal deployer - 解压 ```bash -tar xzvf canal.deployer-1.1.5.tar.gz -C /home/install/canal/deployer +mkdir -p ~/Downloads/canal/deployer && tar -zxvf canal.deployer-1.1.4.tar.gz -C ~/Downloads/canal/deployer ``` - 用conf目录下的canal_local.properties替换canal.properties +- 修改配置 + + ```properties + # canal.ip + canal.ip = 127.0.0.1 + # register ip + canal.register.ip = 127.0.0.1 + ``` + + :::tip + + 1. 注意ip前后不能有空格,不然会无法启动netty server从而无法启动canal server,应该是后台没做trim + + 2. 如果不填写`canal.ip`和`canal.register.ip`两个配置项,代码中将通过`AddressUtils.getHostIp()`获取本机的ip地址,如果本地有docker/orbstack等创建的虚拟网络设备会导致启动canal-server后识别到多个server且是不同的ip(docker0网桥或orbstack容器等的ip),比较膈应人。([#issue47](https://github.com/alibaba/canal/issues/47)) + + 源码: + + ```java + // CanalController.java + + if (StringUtils.isEmpty(ip) && StringUtils.isEmpty(registerIp)) { + ip = registerIp = AddressUtils.getHostIp(); + } + + if (StringUtils.isEmpty(ip)) { + ip = AddressUtils.getHostIp(); + } + + if (StringUtils.isEmpty(registerIp)) { + registerIp = ip; // 兼容以前配置 + } + ``` + + ::: + + + - 执行bin目录下的startup.sh -- 直接在canal admin的webUI界面中配置instance即可 - - ![image-20210617002844737](https://storyxc.com/images/blog//image-20210617002844737.png) +- 直接在canal admin的webUI界面中配置instance,等待启动即可,如有问题查看`deploy/logs/story/story.log` + + > 具体配置项参见[wiki](https://github.com/alibaba/canal/wiki/AdminGuide#properties%E9%85%8D%E7%BD%AE%E6%96%87%E4%BB%B6) + + ![image-20231223002954201](https://storyxc.com/images/blog/9e206011-7434-4313-a178-598aed41cc97.png) + +:::tip + +如果在server的配置文件中填了相同的配置项,那么instance中的配置会被server中的覆盖,例如`canal.instance.tsdb.url`配置([#issue4669](https://github.com/alibaba/canal/issues/4669)) + +::: + +## 验证 + +此时我们的实例`story`已经开始监听`story`数据库下面的操作,如果有变更,就会推送至kafka的`canal-story`这个topic中 + +![image-20231223004300829](https://storyxc.com/images/blog/3085e72e-0399-4e84-b019-bc14cbe0f4ae.png) + +之后就需要通过业务端根据情况监听队列中的数据变化,做相应的操作。