From fc7462c363be20fe0bbc12c42a82bf912e2300a1 Mon Sep 17 00:00:00 2001 From: vinllen Date: Wed, 14 Apr 2021 17:17:18 +0800 Subject: [PATCH] use splitVector command to split table and then support parallel fetching in full sync stage. remove 'full_sync.reader.read_document_count' in configuration file. --- ChangeLog | 4 + cmd/collector/sanitize.go | 11 ++- collector/configure/configure.go | 7 +- collector/docsyncer/doc_reader.go | 149 ++++++++++++++++++++---------- collector/docsyncer/doc_syncer.go | 15 +-- common/fcv.go | 25 ++--- conf/collector.conf | 17 ++-- 7 files changed, 143 insertions(+), 85 deletions(-) diff --git a/ChangeLog b/ChangeLog index 15614c0a..371962fc 100644 --- a/ChangeLog +++ b/ChangeLog @@ -2,6 +2,10 @@ * version: 2.6.4 * IMPROVE: parse configuration failed because of column length is bigger than 4096. #582 + * IMPROVE: use splitVector command to split table and then support + parallel fetching in full sync stage. remove + 'full_sync.reader.read_document_count' in configuration file. + 2021-04-12 Alibaba Cloud. * version: 2.6.3 * BUGFIX: ignore duplicate key error when in full sync stage. #579 diff --git a/cmd/collector/sanitize.go b/cmd/collector/sanitize.go index 1abacfe7..2a29162d 100644 --- a/cmd/collector/sanitize.go +++ b/cmd/collector/sanitize.go @@ -141,10 +141,13 @@ func checkDefaultValue() error { if conf.Options.FullSyncReaderWriteDocumentParallel <= 0 { conf.Options.FullSyncReaderWriteDocumentParallel = 8 } - if conf.Options.FullSyncReaderReadDocumentCount < 0 { - conf.Options.FullSyncReaderReadDocumentCount = 0 - } else if conf.Options.FullSyncReaderReadDocumentCount > 0 && conf.Options.FullSyncReaderReadDocumentCount < 10000 { - return fmt.Errorf("full_sync.reader.read_document_count should == 0 or >= 10000") + if conf.Options.FullSyncReaderParallelThread <= 0 { + conf.Options.FullSyncReaderParallelThread = 1 + } else if conf.Options.FullSyncReaderParallelThread > 128 { + return fmt.Errorf("full_sync.reader.parallel_thread should <= 128") + } + if conf.Options.FullSyncReaderParallelIndex == "" { + conf.Options.FullSyncReaderParallelIndex = "_id" } if conf.Options.FullSyncReaderDocumentBatchSize <= 0 { conf.Options.FullSyncReaderDocumentBatchSize = 128 diff --git a/collector/configure/configure.go b/collector/configure/configure.go index a1d46154..34c86de9 100644 --- a/collector/configure/configure.go +++ b/collector/configure/configure.go @@ -49,8 +49,9 @@ type Configuration struct { // 2. full sync FullSyncReaderCollectionParallel int `config:"full_sync.reader.collection_parallel"` FullSyncReaderWriteDocumentParallel int `config:"full_sync.reader.write_document_parallel"` - FullSyncReaderReadDocumentCount uint64 `config:"full_sync.reader.read_document_count"` FullSyncReaderDocumentBatchSize int `config:"full_sync.reader.document_batch_size"` + FullSyncReaderParallelThread int `config:"full_sync.reader.parallel_thread"` // add v2.6.4 + FullSyncReaderParallelIndex string `config:"full_sync.reader.parallel_index"` // add v2.6.4 FullSyncCollectionDrop bool `config:"full_sync.collection_exist_drop"` FullSyncCreateIndex string `config:"full_sync.create_index"` FullSyncReaderOplogStoreDisk bool `config:"full_sync.reader.oplog_store_disk"` @@ -103,6 +104,10 @@ type Configuration struct { IncrSyncTunnelMessage string `config:"incr_sync.tunnel.message"` // deprecate since v2.4.1 HTTPListenPort int `config:"http_profile"` // deprecate since v2.4.1 SystemProfile int `config:"system_profile"` // deprecate since v2.4.1 + + /*---------------------------------------------------------*/ + // removed variables + // FullSyncReaderReadDocumentCount uint64 `config:"full_sync.reader.read_document_count"` // remove since v2.6.4 } func (configuration *Configuration) IsShardCluster() bool { diff --git a/collector/docsyncer/doc_reader.go b/collector/docsyncer/doc_reader.go index 6e6b296c..50db3a45 100644 --- a/collector/docsyncer/doc_reader.go +++ b/collector/docsyncer/doc_reader.go @@ -1,12 +1,10 @@ package docsyncer import ( + "context" "fmt" conf "github.com/alibaba/MongoShake/v2/collector/configure" utils "github.com/alibaba/MongoShake/v2/common" - "math" - - "context" "sync/atomic" LOG "github.com/vinllen/log4go" @@ -24,7 +22,7 @@ type DocumentSplitter struct { ns utils.NS // namespace conn *utils.MongoConn // connection readerChan chan *DocumentReader // reader chan - pieceSize uint64 // each piece max size + pieceByteSize uint64 // each piece max byte size count uint64 // total document number pieceNumber int // how many piece } @@ -34,7 +32,6 @@ func NewDocumentSplitter(src, sslRootCaFile string, ns utils.NS) *DocumentSplitt src: src, sslRootCaFile: sslRootCaFile, ns: ns, - pieceSize: conf.Options.FullSyncReaderReadDocumentCount, } // create connection @@ -48,20 +45,27 @@ func NewDocumentSplitter(src, sslRootCaFile string, ns utils.NS) *DocumentSplitt } // get total count - count, err := ds.conn.Session.DB(ds.ns.Database).C(ds.ns.Collection).Count() - if err != nil { + var res struct { + Count int64 `bson:"count"` + Size float64 `bson:"size"` + StorageSize float64 `bson:"storageSize"` + } + if err := ds.conn.Session.DB(ds.ns.Database).Run(bson.M{"collStats": ds.ns.Collection}, &res); err != nil { LOG.Error("splitter[%s] connection mongo[%v] failed[%v]", ds, utils.BlockMongoUrlPassword(ds.src, "***"), err) return nil } - ds.count = uint64(count) + ds.count = uint64(res.Count) + ds.pieceByteSize = uint64(res.Size / float64(conf.Options.FullSyncReaderParallelThread)) + if ds.pieceByteSize > 8*utils.GB { + // at most 8GB per chunk + ds.pieceByteSize = 8 * utils.GB + } - if ds.pieceSize <= 0 { - ds.pieceNumber = 1 + if conf.Options.FullSyncReaderParallelThread <= 1 { ds.readerChan = make(chan *DocumentReader, 1) } else { - ds.pieceNumber = int(math.Ceil(float64(ds.count) / float64(ds.pieceSize))) - ds.readerChan = make(chan *DocumentReader, SpliterReader) + ds.readerChan = make(chan *DocumentReader, 4196) } go func() { @@ -77,8 +81,8 @@ func (ds *DocumentSplitter) Close() { } func (ds *DocumentSplitter) String() string { - return fmt.Sprintf("DocumentSplitter src[%s] ns[%s] count[%v] pieceSize[%v] pieceNumber[%v]", - utils.BlockMongoUrlPassword(ds.src, "***"), ds.ns, ds.count, ds.pieceSize, ds.pieceNumber) + return fmt.Sprintf("DocumentSplitter src[%s] ns[%s] count[%v] pieceByteSize[%v MB] pieceNumber[%v]", + utils.BlockMongoUrlPassword(ds.src, "***"), ds.ns, ds.count, ds.pieceByteSize/utils.MB, ds.pieceNumber) } // TODO, need add retry @@ -87,54 +91,93 @@ func (ds *DocumentSplitter) Run() error { defer close(ds.readerChan) // disable split - if ds.pieceNumber == 1 { + if conf.Options.FullSyncReaderParallelThread <= 1 { LOG.Info("splitter[%s] disable split or no need", ds) - ds.readerChan <- NewDocumentReader(ds.src, ds.ns, nil, nil, ds.sslRootCaFile) + ds.readerChan <- NewDocumentReader(0, ds.src, ds.ns, "", nil, nil, ds.sslRootCaFile) LOG.Info("splitter[%s] exits", ds) return nil } - LOG.Info("splitter[%s] enable split: piece size[%v], count[%v]", ds, ds.pieceSize, ds.count) - - var start interface{} - // cut into piece - cnt := ds.count - for i := 0; cnt > 0; i++ { - result := make(bson.M) - // current window size - windowSize := ds.pieceSize - if cnt < windowSize { - windowSize = cnt - } + LOG.Info("splitter[%s] enable split, waiting splitVector return...", ds) - query := make(bson.M) - if start != nil { - query["_id"] = bson.M{"$gt": start} - } + var res bson.M + err := ds.conn.Session.DB(ds.ns.Database).Run(bson.D{ + {"splitVector", ds.ns.Str()}, + {"keyPattern", bson.M{conf.Options.FullSyncReaderParallelIndex: 1}}, + // {"maxSplitPoints", ds.pieceNumber - 1}, + {"maxChunkSize", ds.pieceByteSize / utils.MB}, + }, &res) + // if failed, do not panic, run single thread fetching + if err != nil { + LOG.Warn("splitter[%s] run splitVector failed[%v], give up parallel fetching", ds, err) + ds.readerChan <- NewDocumentReader(0, ds.src, ds.ns, "", nil, nil, ds.sslRootCaFile) + LOG.Info("splitter[%s] exits", ds) + return nil + } - // find the right boundary. the parameter of Skip() is int, what if overflow? - err := ds.conn.Session.DB(ds.ns.Database).C(ds.ns.Collection).Find(query).Sort("_id"). - Skip(int(windowSize - 1)).Limit(1).One(&result) - if err != nil { - return fmt.Errorf("splitter[%s] piece[%d] with query[%v] and skip[%v] fetch boundary failed[%v]", - ds, i, query, windowSize-1, err) + LOG.Info("splitter[%s] run splitVector result: %v", ds, res) + + if splitKeys, ok := res["splitKeys"]; ok { + if splitKeysList, ok := splitKeys.([]interface{}); ok && len(splitKeysList) > 0 { + // return list is sorted + ds.pieceNumber = len(splitKeysList) + 1 + + var start interface{} + cnt := 0 + for i, keyDoc := range splitKeysList { + // check key == conf.Options.FullSyncReaderParallelIndex + key, val, err := parseDocKeyValue(keyDoc) + if err != nil { + LOG.Crash("splitter[%s] parse doc key failed: %v", ds, err) + } + if key != conf.Options.FullSyncReaderParallelIndex { + LOG.Crash("splitter[%s] parse doc invalid key: %v", ds, key) + } + + LOG.Info("splitter[%s] piece[%d] create reader with boundary(%v, %v]", ds, cnt, start, val) + // inject new DocumentReader into channel + ds.readerChan <- NewDocumentReader(cnt, ds.src, ds.ns, key, start, val, ds.sslRootCaFile) + + // new start + start = val + cnt++ + + // last one + if i == len(splitKeysList)-1 { + LOG.Info("splitter[%s] piece[%d] create reader with boundary(%v, INF)", ds, cnt, start) + // inject new DocumentReader into channel + ds.readerChan <- NewDocumentReader(cnt, ds.src, ds.ns, key, start, nil, ds.sslRootCaFile) + } + } + + return nil + } else { + LOG.Warn("splitter[%s] run splitVector return empty result[%v]", ds, res) } - - end := result["_id"] - - LOG.Info("splitter[%s] piece[%d] create reader with boundary(%v, %v]", ds, i, start, end) - // inject new DocumentReader into channel - ds.readerChan <- NewDocumentReader(ds.src, ds.ns, start, end, ds.sslRootCaFile) - - // new start - start = end - cnt -= windowSize + } else { + LOG.Warn("splitter[%s] run splitVector return null result[%v]", ds, res) } + LOG.Warn("splitter[%s] give up parallel fetching", ds, err) + ds.readerChan <- NewDocumentReader(0, ds.src, ds.ns, "", nil, nil, ds.sslRootCaFile) + LOG.Info("splitter[%s] exits", ds) + LOG.Info("splitter[%s] exits", ds) return nil } +func parseDocKeyValue(x interface{}) (string, interface{}, error) { + keyDocM := x.(bson.M) + if len(keyDocM) > 1 { + return "", nil, fmt.Errorf("invalid key doc[%v]", keyDocM) + } + var key string + var val interface{} + for key, val = range keyDocM { + } + return key, val, nil +} + // @deprecated func (ds *DocumentSplitter) GetIndexes() ([]mgo.Index, error) { return ds.conn.Session.DB(ds.ns.Database).C(ds.ns.Collection).Indexes() @@ -161,10 +204,12 @@ type DocumentReader struct { // query statement and current max cursor query bson.M + key string + id int } // NewDocumentReader creates reader with mongodb url -func NewDocumentReader(src string, ns utils.NS, start, end interface{}, sslRootCaFile string) *DocumentReader { +func NewDocumentReader(id int, src string, ns utils.NS, key string, start, end interface{}, sslRootCaFile string) *DocumentReader { q := make(bson.M) if start != nil || end != nil { innerQ := make(bson.M) @@ -174,22 +219,24 @@ func NewDocumentReader(src string, ns utils.NS, start, end interface{}, sslRootC if end != nil { innerQ["$lte"] = end } - q["_id"] = innerQ + q[key] = innerQ } ctx := context.Background() return &DocumentReader{ + id: id, src: src, ns: ns, sslRootCaFile: sslRootCaFile, query: q, + key: key, ctx: ctx, } } func (reader *DocumentReader) String() string { - ret := fmt.Sprintf("DocumentReader src[%v] ns[%s] query[%v]", + ret := fmt.Sprintf("DocumentReader id[%v], src[%v] ns[%s] query[%v]", reader.id, utils.BlockMongoUrlPassword(reader.src, "***"), reader.ns, reader.query) if reader.docCursor != nil { ret = fmt.Sprintf("%s docCursorId[%v]", ret, reader.docCursor.ID()) diff --git a/collector/docsyncer/doc_syncer.go b/collector/docsyncer/doc_syncer.go index 7a478cd9..34a4b6ff 100644 --- a/collector/docsyncer/doc_syncer.go +++ b/collector/docsyncer/doc_syncer.go @@ -22,7 +22,6 @@ import ( const ( MAX_BUFFER_BYTE_SIZE = 12 * 1024 * 1024 - SpliterReader = 4 ) func IsShardingToSharding(fromIsSharding bool, toConn *utils.MongoConn) bool { @@ -433,28 +432,24 @@ func (syncer *DBSyncer) collectionSync(collExecutorId int, ns utils.NS, toNS uti // run in several pieces var wg sync.WaitGroup - wg.Add(splitter.pieceNumber) - readerCnt := SpliterReader - if readerCnt > splitter.pieceNumber { - readerCnt = splitter.pieceNumber - } - for i := 0; i < readerCnt; i++ { + wg.Add(conf.Options.FullSyncReaderParallelThread) + for i := 0; i < conf.Options.FullSyncReaderParallelThread; i++ { go func() { + defer wg.Done() for { reader, ok := <-splitter.readerChan - if !ok { + if !ok || reader == nil { break } if err := syncer.splitSync(reader, colExecutor, collectionMetric); err != nil { LOG.Crashf("%v", err) } - - wg.Done() } }() } wg.Wait() + LOG.Info("%s all readers finish, wait all writers finish", syncer) // close writer if err := colExecutor.Wait(); err != nil { diff --git a/common/fcv.go b/common/fcv.go index 96f90e3e..29a05475 100644 --- a/common/fcv.go +++ b/common/fcv.go @@ -6,8 +6,8 @@ var ( FeatureCompatibleVersion: 1, } FcvConfiguration = Configuration{ - CurrentVersion: 9, - FeatureCompatibleVersion: 3, + CurrentVersion: 10, + FeatureCompatibleVersion: 10, } LowestCheckpointVersion = map[int]string{ @@ -16,16 +16,17 @@ var ( 2: "2.4.6", // change sharding checkpoint position from cs to mongos } LowestConfigurationVersion = map[int]string{ - 0: "1.0.0", - 1: "2.4.0", - 2: "2.4.1", - 3: "2.4.3", - 4: "2.4.6", // add incr_sync.target_delay - 5: "2.4.7", // add full_sync.reader.read_document_count - 6: "2.4.12", // add incr_sync.shard_by_object_id_whitelist - 7: "2.4.17", // add filter.oplog.gids - 8: "2.4.20", // add special.source.db.flag - 9: "2.4.21", // remove incr_sync.worker.oplog_compressor; add incr_sync.tunnel.write_thread, tunnel.kafka.partition_number + 0: "1.0.0", + 1: "2.4.0", + 2: "2.4.1", + 3: "2.4.3", + 4: "2.4.6", // add incr_sync.target_delay + 5: "2.4.7", // add full_sync.reader.read_document_count + 6: "2.4.12", // add incr_sync.shard_by_object_id_whitelist + 7: "2.4.17", // add filter.oplog.gids + 8: "2.4.20", // add special.source.db.flag + 9: "2.4.21", // remove incr_sync.worker.oplog_compressor; add incr_sync.tunnel.write_thread, tunnel.kafka.partition_number + 10: "2.6.4", // remove full_sync.reader.read_document_count; add full_sync.reader.parallel_thread } ) diff --git a/conf/collector.conf b/conf/collector.conf index 70b6258a..b1a352fd 100644 --- a/conf/collector.conf +++ b/conf/collector.conf @@ -5,7 +5,7 @@ # current configuration version, do not modify. # 当前配置文件的版本号,请不要修改该值。 -conf.version = 9 +conf.version = 10 # --------------------------- global configuration --------------------------- # collector name @@ -194,12 +194,15 @@ full_sync.reader.write_document_parallel = 8 # number of documents in a batch insert in a document concurrence # 目的端写入的batch大小,例如,128表示一个线程将会一次聚合128个文档然后再写入。 full_sync.reader.document_batch_size = 128 -# number of documents reading in single reader thread. -# do not enable when the _id has more than one type: e.g., ObjectId, string. -# 用于单表倾斜的优化,单个拉取线程读取的最多的文档数,默认0表示拉取是单线程拉取,非0情况下必须>=10000。 -# 例如,表内有50000文档,设置10000则读取段拉取为5个线程(建议并发在1-32个线程)。 -# 注意:对单个表来说,仅支持_id对应的value是同种类型,如果有不同类型请勿启用该配置项! -full_sync.reader.read_document_count = 0 +# max number of fetching thread per table. default is 1 +# 单个表最大拉取的线程数,默认是单线程拉取。需要具备splitVector权限。 +# 注意:对单个表来说,仅支持索引对应的value是同种类型,如果有不同类型请勿启用该配置项! +full_sync.reader.parallel_thread = 1 +# the parallel query index if set full_sync.reader.parallel_thread. index should only has +# 1 field. +# 如果设置了full_sync.reader.parallel_thread,还需要设置该参数,并行拉取所扫描的index,value +# 必须是同种类型。对于副本集,建议设置_id;对于集群版,建议设置shard_key。key只能有1个field。 +full_sync.reader.parallel_index = _id # drop the same name of collection in dest mongodb in full synchronization # 同步时如果目的库存在,是否先删除目的库再进行同步,true表示先删除再同步,false表示不删除。