Skip to content

Commit

Permalink
canal auto scan
Browse files Browse the repository at this point in the history
  • Loading branch information
storyxc committed Dec 23, 2023
1 parent 3e57abe commit fad1b5c
Showing 1 changed file with 178 additions and 14 deletions.
192 changes: 178 additions & 14 deletions docs/linux/applications/Canal部署.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@ mkdir -p ~/Downloads/canal/admin && tar -zxvf canal.admin-1.1.4.tar.gz -C ~/Down
mysql -uroot -p < ~/Downloads/canal/admin/conf/canal_manager.sql
```



![image-20210617000438523](https://storyxc.com/images/blog//image-20210617000438523.png)
![image-20210617000438523](https://storyxc.com/images/blog//image-20210617000438523.png)

- 创建`canal`用户并授权`canal`链接 MySQL 账号具有作为 MySQL slave 的权限

Expand All @@ -48,19 +46,16 @@ mkdir -p ~/Downloads/canal/admin && tar -zxvf canal.admin-1.1.4.tar.gz -C ~/Down
FLUSH PRIVILEGES;
```



修改conf文件夹中的application.yml
修改conf文件夹中的application.yml

![image-20231223000510198](https://storyxc.com/images/blog/b0c9b225-f1ee-4bec-b2b2-fe32122ff354.png)
![image-20231223000510198](https://storyxc.com/images/blog/b0c9b225-f1ee-4bec-b2b2-fe32122ff354.png)

- 执行admin/bin目录的startup.sh

- 访问8089端口

![image-20210617001200010](https://storyxc.com/images/blog//image-20210617001200010.png)



- 使用默认账号密码 admin/123456即可登录

Expand All @@ -72,8 +67,6 @@ mkdir -p ~/Downloads/canal/admin && tar -zxvf canal.admin-1.1.4.tar.gz -C ~/Down

![image-20210617001457272](https://storyxc.com/images/blog//image-20210617001457272.png)



> > canal-admin的核心模型主要有:
>
> 1. instance,对应canal-server里的instance,一个最小的订阅mysql的队列
Expand All @@ -84,7 +77,10 @@ mkdir -p ~/Downloads/canal/admin && tar -zxvf canal.admin-1.1.4.tar.gz -C ~/Down
>
> 1. instance是最原始的业务订阅诉求,它会和 server/集群 这两个面向资源服务属性的进行关联,比如instance A绑定到server A上或者集群 A上,
> 2. 有了任务和资源的绑定关系后,对应的资源服务就会接收到这个任务配置,在对应的资源上动态加载instance,并提供服务
> - 动态加载的过程,对应配置文件中的autoScan配置,只不过基于canal-admin之后可就以变为远程的web操作,而不需要在机器上运维配置文件
>
- 动态加载的过程,对应配置文件中的autoScan配置,只不过基于canal-admin之后可就以变为远程的web操作,而不需要在机器上运维配置文件

> 3. 将server抽象成资源之后,原本canal-server运行所需要的canal.properties/instance.properties配置文件就需要在web ui上进行统一运维,每个server只需要以最基本的启动配置 (比如知道一下canal-admin的manager地址,以及访问配置的账号、密码即可)
- 新建server,按照图中配置即可
Expand Down Expand Up @@ -247,6 +243,175 @@ canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf"
canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"
```

:::tip

`canal.auto.scan`如果设置为true,`canal.destinations`可以不填写,server会自动扫描instance然后启动

```java
// CanalController
// 初始化monitor机制
autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN));
if (autoScan) {
defaultAction = new InstanceAction() {
public void start(String destination) {
InstanceConfig config = instanceConfigs.get(destination);
if (config == null) {
// 重新读取一下instance config
config = parseInstanceConfig(properties, destination);
instanceConfigs.put(destination, config);
}
if (!embededCanalServer.isStart(destination)) {
// HA机制启动
ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
if (!config.getLazy() && !runningMonitor.isStart()) {
runningMonitor.start();
}
}
logger.info("auto notify start {} successful.", destination);
}
//...
}
}

instanceConfigMonitors = MigrateMap.makeComputingMap(new Function<InstanceMode, InstanceConfigMonitor>() {

public InstanceConfigMonitor apply(InstanceMode mode) {
int scanInterval = Integer.valueOf(getProperty(properties,
CanalConstants.CANAL_AUTO_SCAN_INTERVAL,
"5"));

if (mode.isSpring()) {
SpringInstanceConfigMonitor monitor = new SpringInstanceConfigMonitor();
monitor.setScanIntervalInSecond(scanInterval);
monitor.setDefaultAction(defaultAction);
// 设置conf目录,默认是user.dir + conf目录组成
String rootDir = getProperty(properties, CanalConstants.CANAL_CONF_DIR);
if (StringUtils.isEmpty(rootDir)) {
rootDir = "../conf";
}

if (StringUtils.equals("otter-canal", System.getProperty("appName"))) {
monitor.setRootConf(rootDir);
} else {
// eclipse debug模式
monitor.setRootConf("src/main/resources/");
}
return monitor;
} else if (mode.isManager()) {
ManagerInstanceConfigMonitor monitor = new ManagerInstanceConfigMonitor();
monitor.setScanIntervalInSecond(scanInterval);
monitor.setDefaultAction(defaultAction);
String managerAddress = getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);
monitor.setConfigClient(getManagerClient(managerAddress));
return monitor;
} else {
throw new UnsupportedOperationException("unknow mode :" + mode + " for monitor");
}
}
});


// CanalController.start()
public void start() throws Throwable {
// ...
// 尝试启动一下非lazy状态的通道
for (Map.Entry<String, InstanceConfig> entry : instanceConfigs.entrySet()) {
final String destination = entry.getKey();
InstanceConfig config = entry.getValue();
// 创建destination的工作节点
if (!embededCanalServer.isStart(destination)) {
// HA机制启动
ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
if (!config.getLazy() && !runningMonitor.isStart()) {
runningMonitor.start();
}
}

if (autoScan) {
instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction);
}
}

if (autoScan) {
instanceConfigMonitors.get(globalInstanceConfig.getMode()).start();
for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {
if (!monitor.isStart()) {
monitor.start();
}
}
}
// ...
}

// 然后会调用ManagerInstanceConfigMonitor的start方法,start方法会启动一个定时任务,每隔scanInterval秒调用scan方法
public void start() {
super.start();
executor.scheduleWithFixedDelay(new Runnable() {

public void run() {
try {
scan();
if (isFirst) {
isFirst = false;
}
} catch (Throwable e) {
logger.error("scan failed", e);
}
}

}, 0, scanIntervalInSecond, TimeUnit.SECONDS);
}

// scan方法中会通过configClient调用canal-admin的接口获取instance的配置信息,
// 最后对instance进行stop/reload/start操作

private void scan() {
String instances = configClient.findInstances(null);
final List<String> is = Lists.newArrayList(StringUtils.split(instances, ','));
List<String> start = Lists.newArrayList();
List<String> stop = Lists.newArrayList();
List<String> restart = Lists.newArrayList();
for (String instance : is) {
if (!configs.containsKey(instance)) {
PlainCanal newPlainCanal = configClient.findInstance(instance, null);
if (newPlainCanal != null) {
configs.put(instance, newPlainCanal);
start.add(instance);
}
} else {
PlainCanal plainCanal = configs.get(instance);
PlainCanal newPlainCanal = configClient.findInstance(instance, plainCanal.getMd5());
if (newPlainCanal != null) {
// 配置有变化
restart.add(instance);
configs.put(instance, newPlainCanal);
}
}
}

configs.forEach((instance, plainCanal) -> {
if (!is.contains(instance)) {
stop.add(instance);
}
});

stop.forEach(instance -> {
notifyStop(instance);
});

restart.forEach(instance -> {
notifyReload(instance);
});

start.forEach(instance -> {
notifyStart(instance);
});

}
```

:::

## canal deployer

- 解压
Expand All @@ -268,9 +433,9 @@ mkdir -p ~/Downloads/canal/deployer && tar -zxvf canal.deployer-1.1.4.tar.gz -C

:::tip

1. 注意ip前后不能有空格,不然会无法启动netty server从而无法启动canal server,应该是后台没做trim
1. 注意ip前后不能有空格,不然会无法启动netty server从而无法启动canal server,应该是后台没做trim

2. 如果不填写`canal.ip``canal.register.ip`两个配置项,代码中将通过`AddressUtils.getHostIp()`获取本机的ip地址,如果本地有docker/orbstack等创建的虚拟网络设备会导致启动canal-server后识别到多个server且是不同的ip(docker0网桥或orbstack容器等的ip),比较膈应人。([#issue47](https://github.com/alibaba/canal/issues/47))
2. 如果不填写`canal.ip`和`canal.register.ip`两个配置项,代码中将通过`AddressUtils.getHostIp()`获取本机的ip地址,如果本地有docker/orbstack等创建的虚拟网络设备会导致启动canal-server后识别到多个server且是不同的ip(docker0网桥或orbstack容器等的ip),比较膈应人。([#issue47](https://github.com/alibaba/canal/issues/47))

源码:

Expand All @@ -292,7 +457,6 @@ mkdir -p ~/Downloads/canal/deployer && tar -zxvf canal.deployer-1.1.4.tar.gz -C

:::



- 执行bin目录下的startup.sh

Expand Down

0 comments on commit fad1b5c

Please sign in to comment.