Skip to content

Commit

Permalink
release v2.6.2: support ssl。 #236
Browse files Browse the repository at this point in the history
  • Loading branch information
vinllen committed Apr 8, 2021
1 parent c912a37 commit ffa4486
Show file tree
Hide file tree
Showing 30 changed files with 223 additions and 136 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ logs
tags
*.bak
*.tar.gz
*.pem
run_sys_test_dir

dump.data
Expand Down
4 changes: 4 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
2021-04-08 Alibaba Cloud.
* version: 2.6.2
* IMPROVE: support ssl.

2021-03-22 Alibaba Cloud.
* version: 2.6.1
* BUGFIX: singleWriter with wrong type 'objectId' error. #570
Expand Down
1 change: 1 addition & 0 deletions cmd/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func main() {

utils.Welcome()

utils.Mkdirs(conf.Options.LogDirectory)
// get exclusive process lock and write pid
if utils.WritePidById(conf.Options.LogDirectory, conf.Options.Id) {
startup()
Expand Down
10 changes: 5 additions & 5 deletions cmd/collector/sanitize.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func checkConnection() error {
// check mongo_urls
for _, mongo := range conf.Options.MongoUrls {
_, err := utils.NewMongoConn(mongo, conf.Options.MongoConnectMode, true,
utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault)
utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault, conf.Options.MongoSslRootCaFile)
if err != nil {
return fmt.Errorf("connect source mongodb[%v] failed[%v]", mongo, err)
}
Expand All @@ -269,7 +269,7 @@ func checkConnection() error {
// check mongo_cs_url
if conf.Options.MongoCsUrl != "" {
_, err := utils.NewMongoConn(conf.Options.MongoCsUrl, utils.VarMongoConnectModeSecondaryPreferred, true,
utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault)
utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault, conf.Options.MongoSslRootCaFile)
if err != nil {
return fmt.Errorf("connect config-server[%v] failed[%v]", conf.Options.MongoCsUrl, err)
}
Expand All @@ -282,7 +282,7 @@ func checkConnection() error {
!conf.Options.IncrSyncExecutorDebug {
for i, mongo := range conf.Options.TunnelAddress {
targetConn, err := utils.NewMongoConn(mongo, conf.Options.MongoConnectMode, true,
utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault)
utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault, conf.Options.TunnelMongoSslRootCaFile)
if err != nil {
return fmt.Errorf("connect target tunnel mongodb[%v] failed[%v]", mongo, err)
}
Expand All @@ -302,7 +302,7 @@ func checkConnection() error {
source = conf.Options.MongoUrls[0]
}
sourceConn, _ := utils.NewMongoConn(source, utils.VarMongoConnectModeSecondaryPreferred, true,
utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault)
utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault, conf.Options.MongoSslRootCaFile)
// ignore error
conf.Options.SourceDBVersion, _ = utils.GetDBVersion(sourceConn.Session)
if ok, err := utils.GetAndCompareVersion(sourceConn.Session, "2.6.0", conf.Options.SourceDBVersion); err != nil {
Expand Down Expand Up @@ -418,7 +418,7 @@ func checkConflict() error {
}

conn, err := utils.NewMongoConn(source, utils.VarMongoConnectModeSecondaryPreferred, true,
utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault)
utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault, conf.Options.MongoSslRootCaFile)
if err != nil {
return fmt.Errorf("connect source[%v] failed[%v]", source, err)
}
Expand Down
3 changes: 2 additions & 1 deletion collector/ckpt/ckpt_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"fmt"
conf "github.com/alibaba/MongoShake/v2/collector/configure"
"io/ioutil"
"net/http"

Expand Down Expand Up @@ -67,7 +68,7 @@ func (ckpt *MongoCheckpoint) ensureNetwork() bool {
// make connection if we haven't already established one
if ckpt.Conn == nil {
if conn, err := utils.NewMongoConn(ckpt.URL, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernMajority, utils.ReadWriteConcernMajority); err == nil {
utils.ReadWriteConcernMajority, utils.ReadWriteConcernMajority, conf.Options.CheckpointStorageUrlMongoSslRootCaFile); err == nil {
ckpt.Conn = conn
ckpt.QueryHandle = conn.Session.DB(ckpt.DB).C(ckpt.Table)
} else {
Expand Down
10 changes: 5 additions & 5 deletions collector/ckpt/ckpt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestMongoCheckpoint(t *testing.T) {

name := "ut_tet"
conn, err := utils.NewMongoConn(testUrl, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernMajority, utils.ReadWriteConcernMajority)
utils.ReadWriteConcernMajority, utils.ReadWriteConcernMajority, "")
assert.Equal(t, nil, err, "should be equal")

// drop test db
Expand All @@ -58,7 +58,7 @@ func TestMongoCheckpoint(t *testing.T) {

name := "ut_tet"
conn, err := utils.NewMongoConn(testUrl, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernMajority, utils.ReadWriteConcernMajority)
utils.ReadWriteConcernMajority, utils.ReadWriteConcernMajority, "")
assert.Equal(t, nil, err, "should be equal")

// drop test db
Expand Down Expand Up @@ -105,7 +105,7 @@ func TestMongoCheckpoint(t *testing.T) {

name := "ut_tet"
conn, err := utils.NewMongoConn(testUrl, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernMajority, utils.ReadWriteConcernMajority)
utils.ReadWriteConcernMajority, utils.ReadWriteConcernMajority, "")
assert.Equal(t, nil, err, "should be equal")

// drop test db
Expand Down Expand Up @@ -144,7 +144,7 @@ func TestMongoCheckpoint(t *testing.T) {

name := "ut_tet"
conn, err := utils.NewMongoConn(testUrl, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernMajority, utils.ReadWriteConcernMajority)
utils.ReadWriteConcernMajority, utils.ReadWriteConcernMajority, "")
assert.Equal(t, nil, err, "should be equal")

// drop test db
Expand Down Expand Up @@ -202,7 +202,7 @@ func TestMongoCheckpoint(t *testing.T) {

name := "ut_tet"
conn, err := utils.NewMongoConn(testUrl, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernMajority, utils.ReadWriteConcernMajority)
utils.ReadWriteConcernMajority, utils.ReadWriteConcernMajority, "")
assert.Equal(t, nil, err, "should be equal")

// drop test db
Expand Down
64 changes: 34 additions & 30 deletions collector/configure/configure.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,36 +11,40 @@ type Configuration struct {
ConfVersion uint `config:"conf.version"` // do not modify the tag name

// 1. global
Id string `config:"id"`
MasterQuorum bool `config:"master_quorum"`
FullSyncHTTPListenPort int `config:"full_sync.http_port"`
IncrSyncHTTPListenPort int `config:"incr_sync.http_port"`
SystemProfilePort int `config:"system_profile_port"`
LogLevel string `config:"log.level"`
LogDirectory string `config:"log.dir"`
LogFileName string `config:"log.file"`
LogFlush bool `config:"log.flush"`
SyncMode string `config:"sync_mode"`
MongoUrls []string `config:"mongo_urls"`
MongoCsUrl string `config:"mongo_cs_url"`
MongoSUrl string `config:"mongo_s_url"`
MongoConnectMode string `config:"mongo_connect_mode"`
Tunnel string `config:"tunnel"`
TunnelAddress []string `config:"tunnel.address"`
TunnelMessage string `config:"tunnel.message"`
TunnelKafkaPartitionNumber int `config:"tunnel.kafka.partition_number"` // add v2.4.21
TunnelJsonFormat string `config:"tunnel.json.format"`
FilterNamespaceBlack []string `config:"filter.namespace.black"`
FilterNamespaceWhite []string `config:"filter.namespace.white"`
FilterPassSpecialDb []string `config:"filter.pass.special.db"`
FilterDDLEnable bool `config:"filter.ddl_enable"`
FilterOplogGids bool `config:"filter.oplog.gids"` // add v2.4.17
CheckpointStorageUrl string `config:"checkpoint.storage.url"`
CheckpointStorageDb string `config:"checkpoint.storage.db"`
CheckpointStorageCollection string `config:"checkpoint.storage.collection"`
CheckpointStartPosition int64 `config:"checkpoint.start_position" type:"date"`
TransformNamespace []string `config:"transform.namespace"`
SpecialSourceDBFlag string `config:"special.source.db.flag" type:"string"` // add v2.4.20
Id string `config:"id"`
MasterQuorum bool `config:"master_quorum"`
FullSyncHTTPListenPort int `config:"full_sync.http_port"`
IncrSyncHTTPListenPort int `config:"incr_sync.http_port"`
SystemProfilePort int `config:"system_profile_port"`
LogLevel string `config:"log.level"`
LogDirectory string `config:"log.dir"`
LogFileName string `config:"log.file"`
LogFlush bool `config:"log.flush"`
SyncMode string `config:"sync_mode"`
MongoUrls []string `config:"mongo_urls"`
MongoCsUrl string `config:"mongo_cs_url"`
MongoSUrl string `config:"mongo_s_url"`
MongoSslRootCaFile string `config:"mongo_ssl_root_ca_file"` // add v2.6.2
MongoSslClientCaFile string `config:"mongo_ssl_root_ca_file"`
MongoConnectMode string `config:"mongo_connect_mode"`
Tunnel string `config:"tunnel"`
TunnelAddress []string `config:"tunnel.address"`
TunnelMessage string `config:"tunnel.message"`
TunnelKafkaPartitionNumber int `config:"tunnel.kafka.partition_number"` // add v2.4.21
TunnelJsonFormat string `config:"tunnel.json.format"`
TunnelMongoSslRootCaFile string `config:"tunnel.mongo_ssl_root_ca_file"` // add v2.6.2
FilterNamespaceBlack []string `config:"filter.namespace.black"`
FilterNamespaceWhite []string `config:"filter.namespace.white"`
FilterPassSpecialDb []string `config:"filter.pass.special.db"`
FilterDDLEnable bool `config:"filter.ddl_enable"`
FilterOplogGids bool `config:"filter.oplog.gids"` // add v2.4.17
CheckpointStorageUrl string `config:"checkpoint.storage.url"`
CheckpointStorageDb string `config:"checkpoint.storage.db"`
CheckpointStorageCollection string `config:"checkpoint.storage.collection"`
CheckpointStorageUrlMongoSslRootCaFile string `config:"checkpoint.storage.url.mongo_ssl_root_ca_file"` // add v2.6.2
CheckpointStartPosition int64 `config:"checkpoint.start_position" type:"date"`
TransformNamespace []string `config:"transform.namespace"`
SpecialSourceDBFlag string `config:"special.source.db.flag" type:"string"` // add v2.4.20

// 2. full sync
FullSyncReaderCollectionParallel int `config:"full_sync.reader.collection_parallel"`
Expand Down
2 changes: 1 addition & 1 deletion collector/coordinator/extra_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (cui *CheckUniqueIndexExistsJob) Run() {
conns := make([]*utils.MongoConn, len(cui.urls))
for i, source := range cui.urls {
conns[i], err = utils.NewMongoConn(source.URL, utils.VarMongoConnectModeSecondaryPreferred, true,
utils.ReadWriteConcernMajority, utils.ReadWriteConcernDefault)
utils.ReadWriteConcernMajority, utils.ReadWriteConcernDefault, conf.Options.MongoSslRootCaFile)
if err != nil {
LOG.Error("extra job[%s] connect source[%v] failed: %v", cui.Name(), source.URL, err)
return
Expand Down
8 changes: 4 additions & 4 deletions collector/coordinator/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func fetchChunkMap(isSharding bool) (sharding.ShardingChunkMap, error) {
return nil, nil
}

func getTimestampMap(sources []*utils.MongoSource) (map[string]utils.TimestampNode, error) {
func getTimestampMap(sources []*utils.MongoSource, sslRootFile string) (map[string]utils.TimestampNode, error) {
// no need to fetch if sync mode is full only
if conf.Options.SyncMode == utils.VarSyncModeFull {
return nil, nil
Expand All @@ -51,7 +51,7 @@ func getTimestampMap(sources []*utils.MongoSource) (map[string]utils.TimestampNo
var ckptMap map[string]utils.TimestampNode
var err error

ckptMap, _, _, _, _, err = utils.GetAllTimestamp(sources)
ckptMap, _, _, _, _, err = utils.GetAllTimestamp(sources, sslRootFile)
if err != nil {
return nil, fmt.Errorf("fetch source all timestamp failed: %v", err)
}
Expand Down Expand Up @@ -97,7 +97,7 @@ func (coordinator *ReplicationCoordinator) startDocumentReplication() error {
var ckptMap map[string]utils.TimestampNode
if conf.Options.SpecialSourceDBFlag != utils.VarSpecialSourceDBFlagAliyunServerless {
// get current newest timestamp
ckptMap, err = getTimestampMap(coordinator.MongoD)
ckptMap, err = getTimestampMap(coordinator.MongoD, conf.Options.MongoSslRootCaFile)
if err != nil {
return err
}
Expand All @@ -108,7 +108,7 @@ func (coordinator *ReplicationCoordinator) startDocumentReplication() error {
var toConn *utils.MongoConn
if !conf.Options.FullSyncExecutorDebug {
if toConn, err = utils.NewMongoConn(toUrl, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernLocal, utils.ReadWriteConcernDefault); err != nil {
utils.ReadWriteConcernLocal, utils.ReadWriteConcernDefault, conf.Options.TunnelMongoSslRootCaFile); err != nil {
return err
}
defer toConn.Close()
Expand Down
6 changes: 3 additions & 3 deletions collector/coordinator/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (coordinator *ReplicationCoordinator) sanitizeMongoDB() error {
// try to connect CheckpointStorage
checkpointStorageUrl := conf.Options.CheckpointStorageUrl
if conn, err = utils.NewMongoConn(checkpointStorageUrl, utils.VarMongoConnectModePrimary, true,
utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault); conn == nil || !conn.IsGood() || err != nil {
utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault, conf.Options.CheckpointStorageUrlMongoSslRootCaFile); conn == nil || !conn.IsGood() || err != nil {
LOG.Critical("Connect checkpointStorageUrl[%v] error[%v]. Please add primary node into 'mongo_urls' "+
"if 'context.storage.url' is empty", checkpointStorageUrl, err)
return err
Expand All @@ -127,7 +127,7 @@ func (coordinator *ReplicationCoordinator) sanitizeMongoDB() error {

for i, src := range coordinator.MongoD {
if conn, err = utils.NewMongoConn(src.URL, conf.Options.MongoConnectMode, true,
utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault); conn == nil || !conn.IsGood() || err != nil {
utils.ReadWriteConcernDefault, utils.ReadWriteConcernDefault, conf.Options.MongoSslRootCaFile); conn == nil || !conn.IsGood() || err != nil {
LOG.Critical("Connect mongo server error. %v, url : %s. See https://github.com/alibaba/MongoShake/wiki/FAQ#q-how-to-solve-the-oplog-tailer-initialize-failed-no-reachable-servers-error", err, src.URL)
return err
}
Expand Down Expand Up @@ -195,7 +195,7 @@ func (coordinator *ReplicationCoordinator) serializeDocumentOplog(fullBeginTs in
// get current newest timestamp
var fullFinishTs, oldestTs bson.MongoTimestamp
if conf.Options.SpecialSourceDBFlag != utils.VarSpecialSourceDBFlagAliyunServerless {
_, fullFinishTs, _, oldestTs, _, err = utils.GetAllTimestamp(coordinator.MongoD)
_, fullFinishTs, _, oldestTs, _, err = utils.GetAllTimestamp(coordinator.MongoD, conf.Options.MongoSslRootCaFile)
if err != nil {
return fmt.Errorf("get full sync finish timestamp failed[%v]", err)
}
Expand Down
2 changes: 1 addition & 1 deletion collector/coordinator/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (coordinator *ReplicationCoordinator) compareCheckpointAndDbTs(syncModeAll
tsMap, _, smallestNew, _, _, err = utils.GetAllTimestampInUT()
case false:
// smallestNew is the smallest of the all newest timestamp
tsMap, _, smallestNew, _, _, err = utils.GetAllTimestamp(coordinator.MongoD)
tsMap, _, smallestNew, _, _, err = utils.GetAllTimestamp(coordinator.MongoD, conf.Options.MongoSslRootCaFile)
if err != nil {
return 0, nil, false, fmt.Errorf("get all timestamp failed: %v", err)
}
Expand Down
20 changes: 10 additions & 10 deletions collector/coordinator/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestSelectSyncMode(t *testing.T) {
}

// drop old table
conn, err := utils.NewMongoConn(testUrl, "primary", true, "", "")
conn, err := utils.NewMongoConn(testUrl, "primary", true, "", "", "")
assert.Equal(t, nil, err, "should be equal")

conn.Session.DB(testDb).C(testCollection).DropCollection()
Expand Down Expand Up @@ -144,7 +144,7 @@ func TestSelectSyncMode(t *testing.T) {
}

// drop old table
conn, err := utils.NewMongoConn(testUrl, "primary", true, "", "")
conn, err := utils.NewMongoConn(testUrl, "primary", true, "", "", "")
assert.Equal(t, nil, err, "should be equal")

conn.Session.DB(testDb).C(testCollection).DropCollection()
Expand Down Expand Up @@ -249,7 +249,7 @@ func TestSelectSyncMode(t *testing.T) {
}

// drop old table
conn, err := utils.NewMongoConn(testUrl, "primary", true, "", "")
conn, err := utils.NewMongoConn(testUrl, "primary", true, "", "", "")
assert.Equal(t, nil, err, "should be equal")

conn.Session.DB(testDb).C(testCollection).DropCollection()
Expand Down Expand Up @@ -328,7 +328,7 @@ func TestSelectSyncMode(t *testing.T) {
}

// drop old table
conn, err := utils.NewMongoConn(testUrl, "primary", true, "", "")
conn, err := utils.NewMongoConn(testUrl, "primary", true, "", "", "")
assert.Equal(t, nil, err, "should be equal")

conn.Session.DB(testDb).C(testCollection).DropCollection()
Expand Down Expand Up @@ -403,7 +403,7 @@ func TestSelectSyncMode(t *testing.T) {
// test on checkpoint set

// drop old table
conn, err = utils.NewMongoConn(testUrl, "primary", true, "", "")
conn, err = utils.NewMongoConn(testUrl, "primary", true, "", "", "")
assert.Equal(t, nil, err, "should be equal")

conn.Session.DB(testDb).C(testCollection).DropCollection()
Expand Down Expand Up @@ -455,7 +455,7 @@ func TestSelectSyncMode(t *testing.T) {
}

// drop old table
conn, err := utils.NewMongoConn(testUrl, "primary", true, "", "")
conn, err := utils.NewMongoConn(testUrl, "primary", true, "", "", "")
assert.Equal(t, nil, err, "should be equal")

conn.Session.DB(testDb).C(testCollection).DropCollection()
Expand Down Expand Up @@ -535,7 +535,7 @@ func TestSelectSyncMode(t *testing.T) {
// test on checkpoint set

// drop old table
conn, err = utils.NewMongoConn(testUrl, "primary", true, "", "")
conn, err = utils.NewMongoConn(testUrl, "primary", true, "", "", "")
assert.Equal(t, nil, err, "should be equal")

conn.Session.DB(testDb).C(testCollection).DropCollection()
Expand Down Expand Up @@ -581,7 +581,7 @@ func TestSelectSyncMode(t *testing.T) {
}

// drop old table
conn, err := utils.NewMongoConn(testUrl, "primary", true, "", "")
conn, err := utils.NewMongoConn(testUrl, "primary", true, "", "", "")
assert.Equal(t, nil, err, "should be equal")

conn.Session.DB(testDb).C(testCollection).DropCollection()
Expand Down Expand Up @@ -643,7 +643,7 @@ func TestSelectSyncMode(t *testing.T) {
testReplicaName := "mockReplicaSet"

// drop old table
conn, err := utils.NewMongoConn(testUrlServerless, "primary", true, "", "")
conn, err := utils.NewMongoConn(testUrlServerless, "primary", true, "", "", "")
assert.Equal(t, nil, err, "should be equal")

conn.Session.DB(testDb).C(testCollection).DropCollection()
Expand Down Expand Up @@ -682,7 +682,7 @@ func TestSelectSyncMode(t *testing.T) {
testReplicaName := "mockReplicaSet"

// drop old table
conn, err := utils.NewMongoConn(testUrlServerless, "primary", true, "", "")
conn, err := utils.NewMongoConn(testUrlServerless, "primary", true, "", "", "")
assert.Equal(t, nil, err, "should be equal")

conn.Session.DB(testDb).C(testCollection).DropCollection()
Expand Down
Loading

0 comments on commit ffa4486

Please sign in to comment.