Skip to content

Commit

Permalink
use splitVector command to split table and then support parallel fetc…
Browse files Browse the repository at this point in the history
…hing in full sync stage. remove 'full_sync.reader.read_document_count' in configuration file.
  • Loading branch information
vinllen committed Apr 14, 2021
1 parent fb86488 commit fc7462c
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 85 deletions.
4 changes: 4 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
* version: 2.6.4
* IMPROVE: parse configuration failed because of column length is bigger
than 4096. #582
* IMPROVE: use splitVector command to split table and then support
parallel fetching in full sync stage. remove
'full_sync.reader.read_document_count' in configuration file.

2021-04-12 Alibaba Cloud.
* version: 2.6.3
* BUGFIX: ignore duplicate key error when in full sync stage. #579
Expand Down
11 changes: 7 additions & 4 deletions cmd/collector/sanitize.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,13 @@ func checkDefaultValue() error {
if conf.Options.FullSyncReaderWriteDocumentParallel <= 0 {
conf.Options.FullSyncReaderWriteDocumentParallel = 8
}
if conf.Options.FullSyncReaderReadDocumentCount < 0 {
conf.Options.FullSyncReaderReadDocumentCount = 0
} else if conf.Options.FullSyncReaderReadDocumentCount > 0 && conf.Options.FullSyncReaderReadDocumentCount < 10000 {
return fmt.Errorf("full_sync.reader.read_document_count should == 0 or >= 10000")
if conf.Options.FullSyncReaderParallelThread <= 0 {
conf.Options.FullSyncReaderParallelThread = 1
} else if conf.Options.FullSyncReaderParallelThread > 128 {
return fmt.Errorf("full_sync.reader.parallel_thread should <= 128")
}
if conf.Options.FullSyncReaderParallelIndex == "" {
conf.Options.FullSyncReaderParallelIndex = "_id"
}
if conf.Options.FullSyncReaderDocumentBatchSize <= 0 {
conf.Options.FullSyncReaderDocumentBatchSize = 128
Expand Down
7 changes: 6 additions & 1 deletion collector/configure/configure.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ type Configuration struct {
// 2. full sync
FullSyncReaderCollectionParallel int `config:"full_sync.reader.collection_parallel"`
FullSyncReaderWriteDocumentParallel int `config:"full_sync.reader.write_document_parallel"`
FullSyncReaderReadDocumentCount uint64 `config:"full_sync.reader.read_document_count"`
FullSyncReaderDocumentBatchSize int `config:"full_sync.reader.document_batch_size"`
FullSyncReaderParallelThread int `config:"full_sync.reader.parallel_thread"` // add v2.6.4
FullSyncReaderParallelIndex string `config:"full_sync.reader.parallel_index"` // add v2.6.4
FullSyncCollectionDrop bool `config:"full_sync.collection_exist_drop"`
FullSyncCreateIndex string `config:"full_sync.create_index"`
FullSyncReaderOplogStoreDisk bool `config:"full_sync.reader.oplog_store_disk"`
Expand Down Expand Up @@ -103,6 +104,10 @@ type Configuration struct {
IncrSyncTunnelMessage string `config:"incr_sync.tunnel.message"` // deprecate since v2.4.1
HTTPListenPort int `config:"http_profile"` // deprecate since v2.4.1
SystemProfile int `config:"system_profile"` // deprecate since v2.4.1

/*---------------------------------------------------------*/
// removed variables
// FullSyncReaderReadDocumentCount uint64 `config:"full_sync.reader.read_document_count"` // remove since v2.6.4
}

func (configuration *Configuration) IsShardCluster() bool {
Expand Down
149 changes: 98 additions & 51 deletions collector/docsyncer/doc_reader.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package docsyncer

import (
"context"
"fmt"
conf "github.com/alibaba/MongoShake/v2/collector/configure"
utils "github.com/alibaba/MongoShake/v2/common"
"math"

"context"
"sync/atomic"

LOG "github.com/vinllen/log4go"
Expand All @@ -24,7 +22,7 @@ type DocumentSplitter struct {
ns utils.NS // namespace
conn *utils.MongoConn // connection
readerChan chan *DocumentReader // reader chan
pieceSize uint64 // each piece max size
pieceByteSize uint64 // each piece max byte size
count uint64 // total document number
pieceNumber int // how many piece
}
Expand All @@ -34,7 +32,6 @@ func NewDocumentSplitter(src, sslRootCaFile string, ns utils.NS) *DocumentSplitt
src: src,
sslRootCaFile: sslRootCaFile,
ns: ns,
pieceSize: conf.Options.FullSyncReaderReadDocumentCount,
}

// create connection
Expand All @@ -48,20 +45,27 @@ func NewDocumentSplitter(src, sslRootCaFile string, ns utils.NS) *DocumentSplitt
}

// get total count
count, err := ds.conn.Session.DB(ds.ns.Database).C(ds.ns.Collection).Count()
if err != nil {
var res struct {
Count int64 `bson:"count"`
Size float64 `bson:"size"`
StorageSize float64 `bson:"storageSize"`
}
if err := ds.conn.Session.DB(ds.ns.Database).Run(bson.M{"collStats": ds.ns.Collection}, &res); err != nil {
LOG.Error("splitter[%s] connection mongo[%v] failed[%v]", ds,
utils.BlockMongoUrlPassword(ds.src, "***"), err)
return nil
}
ds.count = uint64(count)
ds.count = uint64(res.Count)
ds.pieceByteSize = uint64(res.Size / float64(conf.Options.FullSyncReaderParallelThread))
if ds.pieceByteSize > 8*utils.GB {
// at most 8GB per chunk
ds.pieceByteSize = 8 * utils.GB
}

if ds.pieceSize <= 0 {
ds.pieceNumber = 1
if conf.Options.FullSyncReaderParallelThread <= 1 {
ds.readerChan = make(chan *DocumentReader, 1)
} else {
ds.pieceNumber = int(math.Ceil(float64(ds.count) / float64(ds.pieceSize)))
ds.readerChan = make(chan *DocumentReader, SpliterReader)
ds.readerChan = make(chan *DocumentReader, 4196)
}

go func() {
Expand All @@ -77,8 +81,8 @@ func (ds *DocumentSplitter) Close() {
}

func (ds *DocumentSplitter) String() string {
return fmt.Sprintf("DocumentSplitter src[%s] ns[%s] count[%v] pieceSize[%v] pieceNumber[%v]",
utils.BlockMongoUrlPassword(ds.src, "***"), ds.ns, ds.count, ds.pieceSize, ds.pieceNumber)
return fmt.Sprintf("DocumentSplitter src[%s] ns[%s] count[%v] pieceByteSize[%v MB] pieceNumber[%v]",
utils.BlockMongoUrlPassword(ds.src, "***"), ds.ns, ds.count, ds.pieceByteSize/utils.MB, ds.pieceNumber)
}

// TODO, need add retry
Expand All @@ -87,54 +91,93 @@ func (ds *DocumentSplitter) Run() error {
defer close(ds.readerChan)

// disable split
if ds.pieceNumber == 1 {
if conf.Options.FullSyncReaderParallelThread <= 1 {
LOG.Info("splitter[%s] disable split or no need", ds)
ds.readerChan <- NewDocumentReader(ds.src, ds.ns, nil, nil, ds.sslRootCaFile)
ds.readerChan <- NewDocumentReader(0, ds.src, ds.ns, "", nil, nil, ds.sslRootCaFile)
LOG.Info("splitter[%s] exits", ds)
return nil
}

LOG.Info("splitter[%s] enable split: piece size[%v], count[%v]", ds, ds.pieceSize, ds.count)

var start interface{}
// cut into piece
cnt := ds.count
for i := 0; cnt > 0; i++ {
result := make(bson.M)
// current window size
windowSize := ds.pieceSize
if cnt < windowSize {
windowSize = cnt
}
LOG.Info("splitter[%s] enable split, waiting splitVector return...", ds)

query := make(bson.M)
if start != nil {
query["_id"] = bson.M{"$gt": start}
}
var res bson.M
err := ds.conn.Session.DB(ds.ns.Database).Run(bson.D{
{"splitVector", ds.ns.Str()},
{"keyPattern", bson.M{conf.Options.FullSyncReaderParallelIndex: 1}},
// {"maxSplitPoints", ds.pieceNumber - 1},
{"maxChunkSize", ds.pieceByteSize / utils.MB},
}, &res)
// if failed, do not panic, run single thread fetching
if err != nil {
LOG.Warn("splitter[%s] run splitVector failed[%v], give up parallel fetching", ds, err)
ds.readerChan <- NewDocumentReader(0, ds.src, ds.ns, "", nil, nil, ds.sslRootCaFile)
LOG.Info("splitter[%s] exits", ds)
return nil
}

// find the right boundary. the parameter of Skip() is int, what if overflow?
err := ds.conn.Session.DB(ds.ns.Database).C(ds.ns.Collection).Find(query).Sort("_id").
Skip(int(windowSize - 1)).Limit(1).One(&result)
if err != nil {
return fmt.Errorf("splitter[%s] piece[%d] with query[%v] and skip[%v] fetch boundary failed[%v]",
ds, i, query, windowSize-1, err)
LOG.Info("splitter[%s] run splitVector result: %v", ds, res)

if splitKeys, ok := res["splitKeys"]; ok {
if splitKeysList, ok := splitKeys.([]interface{}); ok && len(splitKeysList) > 0 {
// return list is sorted
ds.pieceNumber = len(splitKeysList) + 1

var start interface{}
cnt := 0
for i, keyDoc := range splitKeysList {
// check key == conf.Options.FullSyncReaderParallelIndex
key, val, err := parseDocKeyValue(keyDoc)
if err != nil {
LOG.Crash("splitter[%s] parse doc key failed: %v", ds, err)
}
if key != conf.Options.FullSyncReaderParallelIndex {
LOG.Crash("splitter[%s] parse doc invalid key: %v", ds, key)
}

LOG.Info("splitter[%s] piece[%d] create reader with boundary(%v, %v]", ds, cnt, start, val)
// inject new DocumentReader into channel
ds.readerChan <- NewDocumentReader(cnt, ds.src, ds.ns, key, start, val, ds.sslRootCaFile)

// new start
start = val
cnt++

// last one
if i == len(splitKeysList)-1 {
LOG.Info("splitter[%s] piece[%d] create reader with boundary(%v, INF)", ds, cnt, start)
// inject new DocumentReader into channel
ds.readerChan <- NewDocumentReader(cnt, ds.src, ds.ns, key, start, nil, ds.sslRootCaFile)
}
}

return nil
} else {
LOG.Warn("splitter[%s] run splitVector return empty result[%v]", ds, res)
}

end := result["_id"]

LOG.Info("splitter[%s] piece[%d] create reader with boundary(%v, %v]", ds, i, start, end)
// inject new DocumentReader into channel
ds.readerChan <- NewDocumentReader(ds.src, ds.ns, start, end, ds.sslRootCaFile)

// new start
start = end
cnt -= windowSize
} else {
LOG.Warn("splitter[%s] run splitVector return null result[%v]", ds, res)
}

LOG.Warn("splitter[%s] give up parallel fetching", ds, err)
ds.readerChan <- NewDocumentReader(0, ds.src, ds.ns, "", nil, nil, ds.sslRootCaFile)
LOG.Info("splitter[%s] exits", ds)

LOG.Info("splitter[%s] exits", ds)
return nil
}

func parseDocKeyValue(x interface{}) (string, interface{}, error) {
keyDocM := x.(bson.M)
if len(keyDocM) > 1 {
return "", nil, fmt.Errorf("invalid key doc[%v]", keyDocM)
}
var key string
var val interface{}
for key, val = range keyDocM {
}
return key, val, nil
}

// @deprecated
func (ds *DocumentSplitter) GetIndexes() ([]mgo.Index, error) {
return ds.conn.Session.DB(ds.ns.Database).C(ds.ns.Collection).Indexes()
Expand All @@ -161,10 +204,12 @@ type DocumentReader struct {

// query statement and current max cursor
query bson.M
key string
id int
}

// NewDocumentReader creates reader with mongodb url
func NewDocumentReader(src string, ns utils.NS, start, end interface{}, sslRootCaFile string) *DocumentReader {
func NewDocumentReader(id int, src string, ns utils.NS, key string, start, end interface{}, sslRootCaFile string) *DocumentReader {
q := make(bson.M)
if start != nil || end != nil {
innerQ := make(bson.M)
Expand All @@ -174,22 +219,24 @@ func NewDocumentReader(src string, ns utils.NS, start, end interface{}, sslRootC
if end != nil {
innerQ["$lte"] = end
}
q["_id"] = innerQ
q[key] = innerQ
}

ctx := context.Background()

return &DocumentReader{
id: id,
src: src,
ns: ns,
sslRootCaFile: sslRootCaFile,
query: q,
key: key,
ctx: ctx,
}
}

func (reader *DocumentReader) String() string {
ret := fmt.Sprintf("DocumentReader src[%v] ns[%s] query[%v]",
ret := fmt.Sprintf("DocumentReader id[%v], src[%v] ns[%s] query[%v]", reader.id,
utils.BlockMongoUrlPassword(reader.src, "***"), reader.ns, reader.query)
if reader.docCursor != nil {
ret = fmt.Sprintf("%s docCursorId[%v]", ret, reader.docCursor.ID())
Expand Down
15 changes: 5 additions & 10 deletions collector/docsyncer/doc_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

const (
MAX_BUFFER_BYTE_SIZE = 12 * 1024 * 1024
SpliterReader = 4
)

func IsShardingToSharding(fromIsSharding bool, toConn *utils.MongoConn) bool {
Expand Down Expand Up @@ -433,28 +432,24 @@ func (syncer *DBSyncer) collectionSync(collExecutorId int, ns utils.NS, toNS uti

// run in several pieces
var wg sync.WaitGroup
wg.Add(splitter.pieceNumber)
readerCnt := SpliterReader
if readerCnt > splitter.pieceNumber {
readerCnt = splitter.pieceNumber
}
for i := 0; i < readerCnt; i++ {
wg.Add(conf.Options.FullSyncReaderParallelThread)
for i := 0; i < conf.Options.FullSyncReaderParallelThread; i++ {
go func() {
defer wg.Done()
for {
reader, ok := <-splitter.readerChan
if !ok {
if !ok || reader == nil {
break
}

if err := syncer.splitSync(reader, colExecutor, collectionMetric); err != nil {
LOG.Crashf("%v", err)
}

wg.Done()
}
}()
}
wg.Wait()
LOG.Info("%s all readers finish, wait all writers finish", syncer)

// close writer
if err := colExecutor.Wait(); err != nil {
Expand Down
25 changes: 13 additions & 12 deletions common/fcv.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ var (
FeatureCompatibleVersion: 1,
}
FcvConfiguration = Configuration{
CurrentVersion: 9,
FeatureCompatibleVersion: 3,
CurrentVersion: 10,
FeatureCompatibleVersion: 10,
}

LowestCheckpointVersion = map[int]string{
Expand All @@ -16,16 +16,17 @@ var (
2: "2.4.6", // change sharding checkpoint position from cs to mongos
}
LowestConfigurationVersion = map[int]string{
0: "1.0.0",
1: "2.4.0",
2: "2.4.1",
3: "2.4.3",
4: "2.4.6", // add incr_sync.target_delay
5: "2.4.7", // add full_sync.reader.read_document_count
6: "2.4.12", // add incr_sync.shard_by_object_id_whitelist
7: "2.4.17", // add filter.oplog.gids
8: "2.4.20", // add special.source.db.flag
9: "2.4.21", // remove incr_sync.worker.oplog_compressor; add incr_sync.tunnel.write_thread, tunnel.kafka.partition_number
0: "1.0.0",
1: "2.4.0",
2: "2.4.1",
3: "2.4.3",
4: "2.4.6", // add incr_sync.target_delay
5: "2.4.7", // add full_sync.reader.read_document_count
6: "2.4.12", // add incr_sync.shard_by_object_id_whitelist
7: "2.4.17", // add filter.oplog.gids
8: "2.4.20", // add special.source.db.flag
9: "2.4.21", // remove incr_sync.worker.oplog_compressor; add incr_sync.tunnel.write_thread, tunnel.kafka.partition_number
10: "2.6.4", // remove full_sync.reader.read_document_count; add full_sync.reader.parallel_thread
}
)

Expand Down
Loading

0 comments on commit fc7462c

Please sign in to comment.