-
Notifications
You must be signed in to change notification settings - Fork 446
v2.4版本对接change stream架构设计
Vinllen Chen edited this page Mar 31, 2020
·
7 revisions
v2.4支持change stream对接,但只针对源端版本是大于4.0.1的版本,对于早于4.0.1的版本,不支持。
- change stream对接。可以解决sharding move chunk的问题,不关闭balancer就可以迁移sharding。
- 对配置文件和checkpoint添加版本号机制,如果是从2.2及以下版本升级上来的,需要重新配置,不能兼容之前版本。
- 优化孤儿文档的处理。
- 在配置文件和restful接口中屏蔽密码信息。
- 全量增加
full_sync.executor.insert_on_dup_update
参数,当目的端已经存在数据,可以把insert改为update。 - 增加
full_sync.create_index
选项,目前支持none(同步结束不创建索引)和foreground(同步结束创建前台索引),后续版本可能会考虑支持background。 - 临时关闭2.2.1支持的增量持久化功能,将会在后续小版本进行开放,敬请关注。
change stream存在的不足:
- 性能来说,change stream的性能整体弱于oplog直接拉取的方式,压测结果显示平均性能差异在2-3倍。另外,MongoShake内部也在change stream和oplog中间进行转换,带来了cpu的开销。
- change stream目前仅支持drop database的DDL,对于create index/drop index等都不支持,后续MongoDB官方还会持续优化change stream。
在v2.2及之前版本,MongoShake的数据流依次如下:
- MongoDB oplog。
- oplog_reader。负责从源端采用tail的方式拉取oplog,如果失败会释放迭代器并重新建立。在v2.2版本,这里加了全量期间增量持久化的功能,也就是说,全量期间拉取到的oplog将不会发送到下游组件,而是采用disqueue组件写到本地持久化文件。关于v2.2增量持久化的设计参考v2.2版本全量期间增量持久化。
- pending queue。队列里面存的是未解析的bson raw格式的数据。
- logs queue。存储已经解析成oplog的数据。
- batcher。负责数据聚合和分发,以及处理checkpoint。
- worker。发送到不同tunnel的线程。
数据流图(红色是队列,黑色是工作线程,只画出与本文有关的内容):
v2.4版本增加change stream对接的功能,如下图所示(虚线框是本次架构调整的组件)。
- change_stream_reader负责采用从源端采用change stream的方式接受数据
- 增量持久化组件后移到persist_handler,架构上更加清晰一点。为了与全量同步解耦合,顺便也为以后增量同步期间由于流量大,或者外部开关等因素开启增量备份做准备。另外也考虑到了如果还是存储在oplog_reader里面,需要在change_stream_reader也实现同样的功能,代码上存在冗余。假如没有开启增量持久化,那么将会直接把数据推到下游的pending queue。
- 下游的pending queue里面原来存储的是bons.Raw格式的未解析的数据,现在改为[]byte数组,这是为了增量持久化组件后移处理对齐(存储到磁盘上的都是裸的[]byte)。对上游来说,oplog_reader和change_stream_reader采用的是不同的driver,解析后的bson.Raw无法直接对齐,改为[]byte通用格式更加合适,且不存在解析开销。
- 数据从pending queue流出后,如果不是change stream方式是直接解析成oplog写入到logs queue。
- Translate queue是用于change stream event翻译成Oplog用的。如果源端不是通过change stream拉取会直接写入到Logs queue。
- pending_queue:translate_queue:logs_queue = 1:1:1,继续沿用之前并行解析保证同步速率。
架构带来的风险评估:
- 内存使用上涨。由于多引入了一个translate queue,内存会增大,但是这个queue size不会很大,所以带来的内存可控。
- persist_handler对增量持久化,涉及到磁盘的读/写,性能上会降低。考虑到这个启用的时候要么是在全量阶段,要么是在增量的流量高峰期(目前不会有这种情况),带来的性能影响较低。
- persist_handler持久化后,重启导致的数据丢失。persist_handler用的是disk queue的开源组件,该组件在数据读取后将会删除,可能会导致重启数据的丢失。v2.2版本的处理是每读一批数据,就更新一下checkpoint,这种方式会降低数据丢失的概率,但是问题依然存在(在读取完,数据没有发送到目的端的过程中发生中断),而且这种每次都强刷checkpoint并等待的策略,性能比较低下。所以,后面还会对这个开源组件代码进行调整,读取完不删除,外部触发删除的逻辑。
由于change stream是采用resumeToken进行断点续传的,而原来v2.2以前的版本是采用ts进行断点续传的,那么对于change stream对接后,到底checkpoint是采用resumeToken还是ts方式呢?以下是2种方式的优缺点。
- 不需要对事务进行合并成一个oplog。因为resumeToken自带applyOpsIndex标记当时是applyOps数组里面的第几个元素,那么就可以做断点续传。
- checkpoint的CRUD逻辑需要修改
- 不支持rpc/tcp通道,但支持direct/kafka通道。这是因为rpc和tcp是采用异步确认的方式,worker发送后,对端异步写入,然后回复一批oplog的最新的ts表示该ts之前的数据都已经成功写入了,而这个ts也就是checkpoint,如果采用resumeToken,那么这个信息并没有携带在发送的oplog里面,需要在头部字段修改,此处有修改的成本。
- 需要对收到的一批事务合并成一个applyOps的oplog。因为这一批事务的ts都是一样的,比如1,2,3这三条都是t1时刻,如果1写成功了,checkpoint可能更新到t1,然后这时候MongoShake挂了,那么启动后下次将从t1之后开始拉取,2,3就被跳过了从而造成数据丢失。
- checkpoint的CRUD逻辑不需要修改。
- 支持tcp/rpc/kafka/direct。
- 断点续传的时候,需要根据ts构建resumeToken,同理也需要根据resumeToken解析出ts进行checkpoint持久化。这里需要看下MongoDB本身对于resumeToken的封装/解封装逻辑,依照进行重新实现。这里即使checkpoint是resumeToken,“根据ts构建resumeToken”理论上也是要做的,支持用户指定ts进行同步,也支持全量结束后,增量根据给定的位点开始拉取;“根据resumeToken解析出ts”理论上也是要做的,监控打印同步的位点信息。“根据ts构建resumeToken”在4.0里面提供了startAtOperationTime参数,可以直接作为起始的位点。“根据resumeToken解析出ts”这部分也不需要做,因为拉取的event里面有clusterTime字段,可以直接解析出timestamp字段。
综合考虑,选型采用第二种ts的方式继续作为checkpoint。
ResumeToken结构:
struct ResumeTokenData {
Timestamp clusterTime;
int version = 0;
size_t applyOpsIndex = 0;
Value documentKey;
boost::optional<UUID> uuid;
};