From c7b038ee7adf684670f02f860b1deb81830b5bfc Mon Sep 17 00:00:00 2001 From: wcy00000000000000 <2269766985@qq.com> Date: Tue, 30 Jan 2024 14:41:46 +0800 Subject: [PATCH] feat: move migration logics to tool --- scripts/init_db.sh | 14 +- src/apimachinery/adminserver/adminserver.go | 1 - src/apimachinery/adminserver/api.go | 24 -- .../admin_server/service/migrate.go | 275 ------------ .../admin_server/service/service.go | 2 - .../{ => upgrader/imports}/imports.go | 2 +- .../{ => upgrader/imports}/imports_history.go | 2 +- src/test/test.go | 21 +- src/tools/cmdb_ctl/cmd/migrate.go | 407 ++++++++++++++++++ src/tools/cmdb_ctl/readme.md | 37 ++ 10 files changed, 466 insertions(+), 319 deletions(-) rename src/scene_server/admin_server/{ => upgrader/imports}/imports.go (99%) rename src/scene_server/admin_server/{ => upgrader/imports}/imports_history.go (99%) create mode 100644 src/tools/cmdb_ctl/cmd/migrate.go diff --git a/scripts/init_db.sh b/scripts/init_db.sh index 1aafbeaa73..aa097365b8 100755 --- a/scripts/init_db.sh +++ b/scripts/init_db.sh @@ -2,16 +2,4 @@ #!/bin/bash set -e -# get local IP. -localIp=`python ip.py` - -# 判断是否为IPV6,是则在地址两端加中括号 -if [[ ${localIp} =~ ":" ]] -then - localIp="[${localIp}]" -fi -echo "localIp:${localIp}" - -curl -X POST -H 'Content-Type:application/json' -H 'BK_USER:migrate' -H 'HTTP_BLUEKING_SUPPLIER_ID:0' http://${localIp}:60004/migrate/v3/migrate/community/0 - -echo "" +./tool_ctl/tool_ctl migrate db --config-path=cmdb_adminserver/configures/migrate.yaml --enable-auth=false \ No newline at end of file diff --git a/src/apimachinery/adminserver/adminserver.go b/src/apimachinery/adminserver/adminserver.go index b2963d61d8..88540eae87 100644 --- a/src/apimachinery/adminserver/adminserver.go +++ b/src/apimachinery/adminserver/adminserver.go @@ -27,7 +27,6 @@ import ( type AdminServerClientInterface interface { ClearDatabase(ctx context.Context, h http.Header) (resp *metadata.Response, err error) Set(ctx context.Context, ownerID string, h http.Header) (resp *metadata.Response, err error) - Migrate(ctx context.Context, ownerID string, distribution string, h http.Header) error RunSyncDBIndex(ctx context.Context, h http.Header) error } diff --git a/src/apimachinery/adminserver/api.go b/src/apimachinery/adminserver/api.go index 39f7f4b622..97a6de5ac2 100644 --- a/src/apimachinery/adminserver/api.go +++ b/src/apimachinery/adminserver/api.go @@ -50,30 +50,6 @@ func (a *adminServer) Set(ctx context.Context, ownerID string, h http.Header) (r return } -// Migrate TODO -func (a *adminServer) Migrate(ctx context.Context, ownerID string, distribution string, h http.Header) error { - resp := new(metadata.Response) - subPath := "/migrate/%s/%s" - - err := a.client.Post(). - WithContext(ctx). - Body(nil). - SubResourcef(subPath, distribution, ownerID). - WithHeaders(h). - Do(). - Into(resp) - - if err != nil { - return errors.CCHttpError - } - - if err = resp.CCError(); err != nil { - return err - } - - return nil -} - // RunSyncDBIndex TODO func (a *adminServer) RunSyncDBIndex(ctx context.Context, h http.Header) error { resp := new(metadata.Response) diff --git a/src/scene_server/admin_server/service/migrate.go b/src/scene_server/admin_server/service/migrate.go index e677100433..7eb1639e04 100644 --- a/src/scene_server/admin_server/service/migrate.go +++ b/src/scene_server/admin_server/service/migrate.go @@ -17,275 +17,16 @@ import ( "fmt" "net/http" "path/filepath" - "time" "configcenter/src/common" "configcenter/src/common/blog" - "configcenter/src/common/mapstr" "configcenter/src/common/metadata" "configcenter/src/common/types" "configcenter/src/common/util" - "configcenter/src/common/version" - "configcenter/src/common/watch" - "configcenter/src/scene_server/admin_server/upgrader" - "configcenter/src/source_controller/cacheservice/event" - daltypes "configcenter/src/storage/dal/types" - streamtypes "configcenter/src/storage/stream/types" "github.com/emicklei/go-restful/v3" - "go.mongodb.org/mongo-driver/bson" ) -func (s *Service) migrate(req *restful.Request, resp *restful.Response) { - rHeader := req.Request.Header - rid := util.GetHTTPCCRequestID(rHeader) - defErr := s.CCErr.CreateDefaultCCErrorIf(util.GetLanguage(rHeader)) - ownerID := common.BKDefaultOwnerID - updateCfg := &upgrader.Config{ - OwnerID: ownerID, - User: common.CCSystemOperatorUserName, - } - - if err := s.createWatchDBChainCollections(rid); err != nil { - blog.Errorf("create watch db chain collections failed, err: %v, rid: %s", err, rid) - result := &metadata.RespError{ - Msg: defErr.Errorf(common.CCErrCommMigrateFailed, err.Error()), - } - resp.WriteError(http.StatusInternalServerError, result) - return - } - - preVersion, finishedVersions, err := upgrader.Upgrade(s.ctx, s.db, s.cache, s.iam, updateCfg) - if err != nil { - blog.Errorf("db upgrade failed, err: %+v, rid: %s", err, rid) - result := &metadata.RespError{ - Msg: defErr.Errorf(common.CCErrCommMigrateFailed, err.Error()), - } - resp.WriteError(http.StatusInternalServerError, result) - return - } - - currentVersion := preVersion - if len(finishedVersions) > 0 { - currentVersion = finishedVersions[len(finishedVersions)-1] - } - - result := MigrationResponse{ - BaseResp: metadata.BaseResp{ - Result: true, - Code: 0, - ErrMsg: "", - Permissions: nil, - }, - Data: "migrate success", - PreVersion: preVersion, - CurrentVersion: currentVersion, - FinishedVersions: finishedVersions, - } - resp.WriteEntity(result) -} - -// dbChainTTLTime the ttl time seconds of the db event chain, used to set the ttl index of mongodb -const dbChainTTLTime = 5 * 24 * 60 * 60 - -func (s *Service) createWatchDBChainCollections(rid string) error { - // create watch token table to store the last watch token info for every collections - exists, err := s.watchDB.HasTable(s.ctx, common.BKTableNameWatchToken) - if err != nil { - blog.Errorf("check if table %s exists failed, err: %v, rid: %s", common.BKTableNameWatchToken, err, rid) - return err - } - - if !exists { - err = s.watchDB.CreateTable(s.ctx, common.BKTableNameWatchToken) - if err != nil && !s.watchDB.IsDuplicatedError(err) { - blog.Errorf("create table %s failed, err: %v, rid: %s", common.BKTableNameWatchToken, err, rid) - return err - } - } - - // create watch chain node table and init the last token info as empty for all collections - cursorTypes := watch.ListCursorTypes() - for _, cursorType := range cursorTypes { - key, err := event.GetResourceKeyWithCursorType(cursorType) - if err != nil { - blog.Errorf("get resource key with cursor type %s failed, err: %v, rid: %s", cursorType, err, rid) - return err - } - - exists, err := s.watchDB.HasTable(s.ctx, key.ChainCollection()) - if err != nil { - blog.Errorf("check if table %s exists failed, err: %v, rid: %s", key.ChainCollection(), err, rid) - return err - } - - if !exists { - err = s.watchDB.CreateTable(s.ctx, key.ChainCollection()) - if err != nil && !s.watchDB.IsDuplicatedError(err) { - blog.Errorf("create table %s failed, err: %v, rid: %s", key.ChainCollection(), err, rid) - return err - } - } - - if err = s.createWatchIndexes(cursorType, key, rid); err != nil { - return err - } - - if err = s.createWatchToken(key); err != nil { - return err - } - } - return nil -} - -func (s *Service) createWatchIndexes(cursorType watch.CursorType, key event.Key, rid string) error { - indexes := []daltypes.Index{ - {Name: "index_id", Keys: bson.D{{common.BKFieldID, -1}}, Background: true, Unique: true}, - {Name: "index_cursor", Keys: bson.D{{common.BKCursorField, -1}}, Background: true, Unique: true}, - {Name: "index_cluster_time", Keys: bson.D{{common.BKClusterTimeField, -1}}, Background: true, - ExpireAfterSeconds: dbChainTTLTime}, - } - - if cursorType == watch.ObjectBase || cursorType == watch.MainlineInstance || cursorType == watch.InstAsst { - subResourceIndex := daltypes.Index{ - Name: "index_sub_resource", Keys: bson.D{{common.BKSubResourceField, 1}}, Background: true, - } - indexes = append(indexes, subResourceIndex) - } - - existIndexArr, err := s.watchDB.Table(key.ChainCollection()).Indexes(s.ctx) - if err != nil { - blog.Errorf("get exist indexes for table %s failed, err: %v, rid: %s", key.ChainCollection(), err, rid) - return err - } - - existIdxMap := make(map[string]bool) - for _, index := range existIndexArr { - existIdxMap[index.Name] = true - } - - for _, index := range indexes { - if _, exist := existIdxMap[index.Name]; exist { - continue - } - - err = s.watchDB.Table(key.ChainCollection()).CreateIndex(s.ctx, index) - if err != nil && !s.watchDB.IsDuplicatedError(err) { - blog.Errorf("create indexes for table %s failed, err: %v, rid: %s", key.ChainCollection(), err, rid) - return err - } - } - return nil -} - -func (s *Service) createWatchToken(key event.Key) error { - filter := map[string]interface{}{ - "_id": key.Collection(), - } - - count, err := s.watchDB.Table(common.BKTableNameWatchToken).Find(filter).Count(s.ctx) - if err != nil { - blog.Errorf("check if last watch token exists failed, err: %v, filter: %+v", err, filter) - return err - } - - if count > 0 { - return nil - } - - if key.Collection() == event.HostIdentityKey.Collection() { - // host identity's watch token is different with other identity. - // only set coll is ok, the other fields is useless - data := mapstr.MapStr{ - "_id": key.Collection(), - common.BKTableNameBaseHost: watch.LastChainNodeData{Coll: common.BKTableNameBaseHost}, - common.BKTableNameModuleHostConfig: watch.LastChainNodeData{Coll: common.BKTableNameModuleHostConfig}, - common.BKTableNameBaseProcess: watch.LastChainNodeData{Coll: common.BKTableNameBaseProcess}, - } - if err = s.watchDB.Table(common.BKTableNameWatchToken).Insert(s.ctx, data); err != nil { - blog.Errorf("init last watch token failed, err: %v, data: %+v", err, data) - return err - } - return nil - } - - if key.Collection() == event.BizSetRelationKey.Collection() { - // biz set relation's watch token is generated in the same way with the host identity's watch token - data := mapstr.MapStr{ - "_id": key.Collection(), - common.BKTableNameBaseApp: watch.LastChainNodeData{Coll: common.BKTableNameBaseApp}, - common.BKTableNameBaseBizSet: watch.LastChainNodeData{Coll: common.BKTableNameBaseBizSet}, - common.BKFieldID: 0, - common.BKTokenField: "", - } - if err = s.watchDB.Table(common.BKTableNameWatchToken).Insert(s.ctx, data); err != nil { - blog.Errorf("init last biz set relation watch token failed, err: %v, data: %+v", err, data) - return err - } - return nil - } - - data := watch.LastChainNodeData{ - Coll: key.Collection(), - Token: "", - StartAtTime: streamtypes.TimeStamp{ - Sec: uint32(time.Now().Unix()), - Nano: 0, - }, - } - if err = s.watchDB.Table(common.BKTableNameWatchToken).Insert(s.ctx, data); err != nil { - blog.Errorf("init last watch token failed, err: %v, data: %+v", err, data) - return err - } - return nil -} - -func (s *Service) migrateSpecifyVersion(req *restful.Request, resp *restful.Response) { - rHeader := req.Request.Header - rid := util.GetHTTPCCRequestID(rHeader) - defErr := s.CCErr.CreateDefaultCCErrorIf(util.GetLanguage(rHeader)) - ownerID := common.BKDefaultOwnerID - updateCfg := &upgrader.Config{ - OwnerID: ownerID, - User: common.CCSystemOperatorUserName, - } - - input := new(MigrateSpecifyVersionRequest) - if err := json.NewDecoder(req.Request.Body).Decode(input); err != nil { - blog.Errorf("migrateSpecifyVersion failed, decode body err: %v, body:%+v,rid:%s", err, req.Request.Body, rid) - _ = resp.WriteError(http.StatusOK, &metadata.RespError{Msg: defErr.Error(common.CCErrCommJSONUnmarshalFailed)}) - return - } - - if input.CommitID != version.CCGitHash { - _ = resp.WriteError(http.StatusOK, - &metadata.RespError{Msg: defErr.Errorf(common.CCErrCommParamsInvalid, "commit_id")}) - return - } - - err := upgrader.UpgradeSpecifyVersion(s.ctx, s.db, s.cache, s.iam, updateCfg, input.Version) - if err != nil { - blog.Errorf("db upgrade specify failed, err: %+v, rid: %s", err, rid) - result := &metadata.RespError{ - Msg: defErr.Errorf(common.CCErrCommMigrateFailed, err.Error()), - } - resp.WriteError(http.StatusInternalServerError, result) - return - } - - result := MigrationResponse{ - BaseResp: metadata.BaseResp{ - Result: true, - Code: 0, - ErrMsg: "", - Permissions: nil, - }, - Data: "migrate success. version: " + input.Version, - } - resp.WriteEntity(result) - -} - var allConfigNames = map[string]bool{ "redis": true, "mongodb": true, @@ -350,19 +91,3 @@ func (s *Service) refreshConfig(req *restful.Request, resp *restful.Response) { blog.Infof("refresh config success, input:%#v", input) resp.WriteEntity(metadata.NewSuccessResp("refresh config success")) } - -// MigrationResponse TODO -type MigrationResponse struct { - metadata.BaseResp `json:",inline"` - Data interface{} `json:"data"` - PreVersion string `json:"pre_version"` - CurrentVersion string `json:"current_version"` - FinishedVersions []string `json:"finished_migrations"` -} - -// MigrateSpecifyVersionRequest TODO -type MigrateSpecifyVersionRequest struct { - CommitID string `json:"commit_id"` - TimeStamp int64 `json:"time_stamp"` - Version string `json:"version"` -} diff --git a/src/scene_server/admin_server/service/service.go b/src/scene_server/admin_server/service/service.go index f39ccd74ce..8fb335369b 100644 --- a/src/scene_server/admin_server/service/service.go +++ b/src/scene_server/admin_server/service/service.go @@ -96,7 +96,6 @@ func (s *Service) WebService() *restful.Container { api.Route(api.POST("/authcenter/init").To(s.InitAuthCenter)) api.Route(api.POST("/authcenter/register").To(s.RegisterAuthAccount)) - api.Route(api.POST("/migrate/{distribution}/{ownerID}").To(s.migrate)) api.Route(api.POST("/migrate/system/hostcrossbiz/{ownerID}").To(s.SetSystemConfiguration)) api.Route(api.POST("/migrate/system/user_config/{key}/{can}").To(s.UserConfigSwitch)) api.Route(api.GET("/find/system/config_admin").To(s.SearchConfigAdmin)) @@ -105,7 +104,6 @@ func (s *Service) WebService() *restful.Container { api.Route(api.PUT("/update/system_config/platform_setting").To(s.UpdatePlatformSettingConfig)) api.Route(api.GET("/find/system_config/platform_setting/{type}").To(s.SearchPlatformSettingConfig)) - api.Route(api.POST("/migrate/specify/version/{distribution}/{ownerID}").To(s.migrateSpecifyVersion)) api.Route(api.POST("/migrate/config/refresh").To(s.refreshConfig)) api.Route(api.POST("/migrate/dataid").To(s.migrateDataID)) api.Route(api.POST("/migrate/old/dataid").To(s.migrateOldDataID)) diff --git a/src/scene_server/admin_server/imports.go b/src/scene_server/admin_server/upgrader/imports/imports.go similarity index 99% rename from src/scene_server/admin_server/imports.go rename to src/scene_server/admin_server/upgrader/imports/imports.go index 09ac6507f5..29b48f5992 100644 --- a/src/scene_server/admin_server/imports.go +++ b/src/scene_server/admin_server/upgrader/imports/imports.go @@ -10,7 +10,7 @@ * limitations under the License. */ -package main +package imports import ( diff --git a/src/scene_server/admin_server/imports_history.go b/src/scene_server/admin_server/upgrader/imports/imports_history.go similarity index 99% rename from src/scene_server/admin_server/imports_history.go rename to src/scene_server/admin_server/upgrader/imports/imports_history.go index 76d8786447..3bfa1d1aad 100644 --- a/src/scene_server/admin_server/imports_history.go +++ b/src/scene_server/admin_server/upgrader/imports/imports_history.go @@ -10,7 +10,7 @@ * limitations under the License. */ -package main +package imports import ( _ "configcenter/src/scene_server/admin_server/upgrader/history/v3.0.8" diff --git a/src/test/test.go b/src/test/test.go index ac0d8ccb84..9cf9522afb 100644 --- a/src/test/test.go +++ b/src/test/test.go @@ -15,12 +15,15 @@ import ( "configcenter/src/apimachinery/discovery" "configcenter/src/apimachinery/util" "configcenter/src/common" + "configcenter/src/common/auth" "configcenter/src/common/backbone/service_mange/zk" "configcenter/src/common/mapstr" "configcenter/src/common/metadata" kubetypes "configcenter/src/kube/types" + "configcenter/src/scene_server/admin_server/upgrader" "configcenter/src/storage/dal/mongo" "configcenter/src/storage/dal/mongo/local" + "configcenter/src/storage/dal/redis" "configcenter/src/test/run" testutil "configcenter/src/test/util" @@ -72,12 +75,14 @@ func init() { run.SustainSeconds = tConfig.SustainSeconds run.TotalRequest = tConfig.TotalRequest + err := auth.EnableAuthFlag.Set("false") + Expect(err).Should(BeNil()) + RegisterFailHandler(testutil.Fail) fmt.Println("before suit") js, _ := json.MarshalIndent(tConfig, "", " ") fmt.Printf("test config: %s\n", run.SetRed(string(js))) client := zk.NewZkClient(tConfig.ZkAddr, 40*time.Second) - var err error mongoConfig := local.MongoConf{ MaxOpenConns: mongo.DefaultMaxOpenConns, MaxIdleConns: mongo.MinimumMaxIdleOpenConns, @@ -151,7 +156,19 @@ func ClearDatabase() { } _ = db.Close() - err = adminClient.Migrate(context.Background(), "0", "community", GetHeader()) + redisCfg := redis.Config{ + Address: tConfig.RedisCfg.RedisAddress, + Password: tConfig.RedisCfg.RedisPasswd, + Database: "0", + } + redisCli, err := redis.NewFromConfig(redisCfg) + Expect(err).To(BeNil()) + + updateCfg := &upgrader.Config{ + OwnerID: common.BKDefaultOwnerID, + User: common.CCSystemOperatorUserName, + } + _, _, err = upgrader.Upgrade(context.Background(), db, redisCli, nil, updateCfg) Expect(err).Should(BeNil()) err = adminClient.RunSyncDBIndex(context.Background(), GetHeader()) Expect(err).Should(BeNil()) diff --git a/src/tools/cmdb_ctl/cmd/migrate.go b/src/tools/cmdb_ctl/cmd/migrate.go new file mode 100644 index 0000000000..17947637e2 --- /dev/null +++ b/src/tools/cmdb_ctl/cmd/migrate.go @@ -0,0 +1,407 @@ +/* + * Tencent is pleased to support the open source community by making + * 蓝鲸智云 - 配置平台 (BlueKing - Configuration System) available. + * Copyright (C) 2017 THL A29 Limited, + * a Tencent company. All rights reserved. + * Licensed under the MIT License (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at http://opensource.org/licenses/MIT + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * We undertake not to change the open source license (MIT license) applicable + * to the current version of the project delivered to anyone in the future. + */ + +package cmd + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + "configcenter/src/ac/iam" + "configcenter/src/common" + "configcenter/src/common/auth" + cc "configcenter/src/common/backbone/configcenter" + "configcenter/src/common/mapstr" + "configcenter/src/common/metrics" + "configcenter/src/common/types" + "configcenter/src/common/version" + "configcenter/src/common/watch" + "configcenter/src/scene_server/admin_server/upgrader" + // import upgrader + _ "configcenter/src/scene_server/admin_server/upgrader/imports" + "configcenter/src/source_controller/cacheservice/event" + "configcenter/src/storage/dal" + "configcenter/src/storage/dal/mongo/local" + "configcenter/src/storage/dal/redis" + daltypes "configcenter/src/storage/dal/types" + streamtypes "configcenter/src/storage/stream/types" + + "github.com/spf13/cobra" + "go.mongodb.org/mongo-driver/bson" +) + +func init() { + rootCmd.AddCommand(NewMigrateCommand()) +} + +// NewMigrateCommand new tool command for migration +func NewMigrateCommand() *cobra.Command { + var configPath string + + cmd := &cobra.Command{ + Use: "migrate", + Short: "migrate cmdb data", + RunE: func(cmd *cobra.Command, args []string) error { + return cmd.Help() + }, + } + cmd.PersistentFlags().StringVar(&configPath, "config-path", "", "migrate tool config file path") + cmd.PersistentFlags().Var(auth.EnableAuthFlag, "enable-auth", + "The auth center enable status, true for enabled, false for disabled") + + cmd.AddCommand(&cobra.Command{ + Use: "db", + Short: "migrate cmdb data in database", + RunE: func(cmd *cobra.Command, args []string) error { + srv, err := newMigrateService(configPath) + if err != nil { + return err + } + return srv.migrateDB() + }, + }) + + specifyVersionReq := new(MigrateSpecifyVersionRequest) + specifyVersionCmd := &cobra.Command{ + Use: "specify-version", + Short: "migrate cmdb data of specify version", + RunE: func(cmd *cobra.Command, args []string) error { + srv, err := newMigrateService(configPath) + if err != nil { + return err + } + return srv.migrateSpecifyVersion(specifyVersionReq) + }, + } + specifyVersionReq.addFlags(specifyVersionCmd) + + cmd.AddCommand(specifyVersionCmd) + + return cmd +} + +type migrateService struct { + db dal.DB + watchDB dal.DB + cache redis.Client + iam *iam.IAM +} + +func newMigrateService(configPath string) (*migrateService, error) { + if !strings.HasSuffix(configPath, ".yaml") { + return nil, fmt.Errorf("config path %s is invalid, should be a yaml file", configPath) + } + configPath = strings.TrimSuffix(configPath, ".yaml") + + if err := cc.SetMigrateFromFile(configPath); err != nil { + return nil, fmt.Errorf("parse migration config from file[%s] failed, err: %v", configPath, err) + } + + configDir, err := cc.String("confs.dir") + if err != nil { + return nil, fmt.Errorf("get migration config directory from file[%s] failed, err: %v", configPath, err) + } + + // load mongodb, redis and common config from configure directory + mongodbPath := configDir + "/" + types.CCConfigureMongo + if err = cc.SetMongodbFromFile(mongodbPath); err != nil { + return nil, fmt.Errorf("parse mongodb config from file[%s] failed, err: %v", mongodbPath, err) + } + + redisPath := configDir + "/" + types.CCConfigureRedis + if err = cc.SetRedisFromFile(redisPath); err != nil { + return nil, fmt.Errorf("parse redis config from file[%s] failed, err: %v", redisPath, err) + } + + commonPath := configDir + "/" + types.CCConfigureCommon + if err = cc.SetCommonFromFile(commonPath); err != nil { + return nil, fmt.Errorf("parse common config from file[%s] failed, err: %v", commonPath, err) + } + + svc := new(migrateService) + + // new mongodb client + dbConf, err := cc.Mongo("mongodb") + if err != nil { + return nil, fmt.Errorf("get mongodb config failed, err: %v", err) + } + + svc.db, err = local.NewMgo(dbConf.GetMongoConf(), time.Minute) + if err != nil { + return nil, fmt.Errorf("new mongodb client failed, err: %v", err) + } + + // new watch mongodb client + watchDBConf, err := cc.Mongo("watch") + if err != nil { + return nil, fmt.Errorf("get watch mongodb config failed, err: %v", err) + } + + svc.watchDB, err = local.NewMgo(watchDBConf.GetMongoConf(), time.Minute) + if err != nil { + return nil, fmt.Errorf("new watch mongodb client failed, err: %v", err) + } + + // new redis client + redisConf, err := cc.Redis("redis") + if err != nil { + return nil, fmt.Errorf("get redis config failed, err: %v", err) + } + + svc.cache, err = redis.NewFromConfig(redisConf) + if err != nil { + return nil, fmt.Errorf("new redis client failed, err: %v", err) + } + + // new iam client + if auth.EnableAuthorize() { + iamConf, err := iam.ParseConfigFromKV("iam", nil) + if err != nil && auth.EnableAuthorize() { + return nil, fmt.Errorf("parse iam config failed, err: %v", err) + } + + metricService := metrics.NewService(metrics.Config{ProcessName: "migrate_tool"}) + svc.iam, err = iam.NewIAM(iamConf, metricService.Registry()) + if err != nil { + return nil, fmt.Errorf("new iam client failed, err: %v", err) + } + } + + return svc, nil +} + +func (s *migrateService) migrateDB() error { + ctx := context.Background() + if err := s.createWatchDBChainCollections(ctx); err != nil { + return err + } + + updateCfg := &upgrader.Config{ + OwnerID: common.BKDefaultOwnerID, + User: common.CCSystemOperatorUserName, + } + + preVersion, finishedVersions, err := upgrader.Upgrade(ctx, s.db, s.cache, s.iam, updateCfg) + if err != nil { + return fmt.Errorf("db upgrade failed, err: %v", err) + } + + currentVersion := preVersion + if len(finishedVersions) > 0 { + currentVersion = finishedVersions[len(finishedVersions)-1] + } + + result := MigrationResult{ + Data: "migrate success", + PreVersion: preVersion, + CurrentVersion: currentVersion, + FinishedVersions: finishedVersions, + } + + res, err := json.Marshal(result) + if err != nil { + return err + } + + fmt.Printf("%s\n", string(res)) + return nil +} + +// MigrationResult is the migration result +type MigrationResult struct { + Data interface{} `json:"data"` + PreVersion string `json:"pre_version"` + CurrentVersion string `json:"current_version"` + FinishedVersions []string `json:"finished_migrations"` +} + +// dbChainTTLTime the ttl time seconds of the db event chain, used to set the ttl index of mongodb +const dbChainTTLTime = 5 * 24 * 60 * 60 + +func (s *migrateService) createWatchDBChainCollections(ctx context.Context) error { + // create watch token table to store the last watch token info for every collection + exists, err := s.watchDB.HasTable(ctx, common.BKTableNameWatchToken) + if err != nil { + return fmt.Errorf("check if table %s exists failed, err: %v", common.BKTableNameWatchToken, err) + } + + if !exists { + err = s.watchDB.CreateTable(ctx, common.BKTableNameWatchToken) + if err != nil && !s.watchDB.IsDuplicatedError(err) { + return fmt.Errorf("create table %s failed, err: %v", common.BKTableNameWatchToken, err) + } + } + + // create watch chain node table and init the last token info as empty for all collections + cursorTypes := watch.ListCursorTypes() + for _, cursorType := range cursorTypes { + key, err := event.GetResourceKeyWithCursorType(cursorType) + if err != nil { + return fmt.Errorf("get resource key with cursor type %s failed, err: %v", cursorType, err) + } + + exists, err := s.watchDB.HasTable(ctx, key.ChainCollection()) + if err != nil { + return fmt.Errorf("check if table %s exists failed, err: %v", key.ChainCollection(), err) + } + + if !exists { + err = s.watchDB.CreateTable(ctx, key.ChainCollection()) + if err != nil && !s.watchDB.IsDuplicatedError(err) { + return fmt.Errorf("create table %s failed, err: %v", key.ChainCollection(), err) + } + } + + if err = s.createWatchIndexes(ctx, cursorType, key); err != nil { + return err + } + + if err = s.createWatchToken(ctx, key); err != nil { + return err + } + } + return nil +} + +func (s *migrateService) createWatchIndexes(ctx context.Context, cursorType watch.CursorType, key event.Key) error { + indexes := []daltypes.Index{ + {Name: "index_id", Keys: bson.D{{common.BKFieldID, -1}}, Background: true, Unique: true}, + {Name: "index_cursor", Keys: bson.D{{common.BKCursorField, -1}}, Background: true, Unique: true}, + {Name: "index_cluster_time", Keys: bson.D{{common.BKClusterTimeField, -1}}, Background: true, + ExpireAfterSeconds: dbChainTTLTime}, + } + + if cursorType == watch.ObjectBase || cursorType == watch.MainlineInstance || cursorType == watch.InstAsst { + subResourceIndex := daltypes.Index{ + Name: "index_sub_resource", Keys: bson.D{{common.BKSubResourceField, 1}}, Background: true, + } + indexes = append(indexes, subResourceIndex) + } + + existIndexArr, err := s.watchDB.Table(key.ChainCollection()).Indexes(ctx) + if err != nil { + return fmt.Errorf("get exist indexes for table %s failed, err: %v", key.ChainCollection(), err) + } + + existIdxMap := make(map[string]bool) + for _, index := range existIndexArr { + existIdxMap[index.Name] = true + } + + for _, index := range indexes { + if _, exist := existIdxMap[index.Name]; exist { + continue + } + + err = s.watchDB.Table(key.ChainCollection()).CreateIndex(ctx, index) + if err != nil && !s.watchDB.IsDuplicatedError(err) { + return fmt.Errorf("create indexes for table %s failed, err: %v", key.ChainCollection(), err) + } + } + return nil +} + +func (s *migrateService) createWatchToken(ctx context.Context, key event.Key) error { + filter := map[string]interface{}{ + "_id": key.Collection(), + } + + count, err := s.watchDB.Table(common.BKTableNameWatchToken).Find(filter).Count(ctx) + if err != nil { + return fmt.Errorf("check if last watch token exists failed, err: %v, filter: %+v", err, filter) + } + + if count > 0 { + return nil + } + + if key.Collection() == event.HostIdentityKey.Collection() { + // host identity's watch token is different with other identity. + // only set coll is ok, the other fields is useless + data := mapstr.MapStr{ + "_id": key.Collection(), + common.BKTableNameBaseHost: watch.LastChainNodeData{Coll: common.BKTableNameBaseHost}, + common.BKTableNameModuleHostConfig: watch.LastChainNodeData{Coll: common.BKTableNameModuleHostConfig}, + common.BKTableNameBaseProcess: watch.LastChainNodeData{Coll: common.BKTableNameBaseProcess}, + } + if err = s.watchDB.Table(common.BKTableNameWatchToken).Insert(ctx, data); err != nil { + return fmt.Errorf("init last watch token failed, err: %v, data: %+v", err, data) + } + return nil + } + + if key.Collection() == event.BizSetRelationKey.Collection() { + // biz set relation's watch token is generated in the same way with the host identity's watch token + data := mapstr.MapStr{ + "_id": key.Collection(), + common.BKTableNameBaseApp: watch.LastChainNodeData{Coll: common.BKTableNameBaseApp}, + common.BKTableNameBaseBizSet: watch.LastChainNodeData{Coll: common.BKTableNameBaseBizSet}, + common.BKFieldID: 0, + common.BKTokenField: "", + } + if err = s.watchDB.Table(common.BKTableNameWatchToken).Insert(ctx, data); err != nil { + return fmt.Errorf("init last biz set relation watch token failed, err: %v, data: %+v", err, data) + } + return nil + } + + data := watch.LastChainNodeData{ + Coll: key.Collection(), + Token: "", + StartAtTime: streamtypes.TimeStamp{ + Sec: uint32(time.Now().Unix()), + Nano: 0, + }, + } + if err = s.watchDB.Table(common.BKTableNameWatchToken).Insert(ctx, data); err != nil { + return fmt.Errorf("init last watch token failed, err: %v, data: %+v", err, data) + } + return nil +} + +func (s *migrateService) migrateSpecifyVersion(input *MigrateSpecifyVersionRequest) error { + updateCfg := &upgrader.Config{ + OwnerID: common.BKDefaultOwnerID, + User: common.CCSystemOperatorUserName, + } + + if input.CommitID != version.CCGitHash { + return fmt.Errorf("commit id %s is not the same with current version %s", input.CommitID, version.CCGitHash) + } + + err := upgrader.UpgradeSpecifyVersion(context.Background(), s.db, s.cache, s.iam, updateCfg, input.Version) + if err != nil { + return fmt.Errorf("db upgrade specify failed, err: %v", err) + } + + fmt.Printf("migrate success, version: %s\n", input.Version) + return nil +} + +// MigrateSpecifyVersionRequest migrate specify version request +type MigrateSpecifyVersionRequest struct { + CommitID string `json:"commit_id"` + Version string `json:"version"` +} + +func (m *MigrateSpecifyVersionRequest) addFlags(cmd *cobra.Command) { + cmd.Flags().StringVar(&m.CommitID, "commit-id", "", "the commit id of this tool") + cmd.Flags().StringVar(&m.Version, "version", "", "version to migrate") +} diff --git a/src/tools/cmdb_ctl/readme.md b/src/tools/cmdb_ctl/readme.md index dcf54207f8..dfc7bfbc9b 100644 --- a/src/tools/cmdb_ctl/readme.md +++ b/src/tools/cmdb_ctl/readme.md @@ -416,3 +416,40 @@ denyall配置为false的情况下,limit和ttl配置才能生效 } ] ``` + +### CMDB数据迁移 + +- 使用方式 + ``` + ./tool_ctl migrate [flag] [command] + ``` +- 子命令 + ``` + db migrate cmdb data in database + specify-version migrate cmdb data of specify version + ``` +- 命令行参数 + ``` + --config-path string migrate tool config file path + --enable-auth The auth center enable status, true for enabled, false for disabled (default true) + --commit-id string the commit id of this tool(仅用于specify-version命令) + --version string version to migrate(仅用于specify-version命令) + ``` +- 配置文件示例(参考admin-server的配置文件migrate.yaml) + ``` + # 指定configures的路径,通过这个路径找到其他的配置文件 + confs: + dir: /cmdb/admin_server/configures/ + ``` +- 示例 + ``` + 迁移DB数据操作示例: + ./tool_ctl migrate db --config-path=migrate.yaml --enable-auth=false + 回显样式: + {"data":"migrate success","pre_version":"y3.12.202311061800","current_version":"y3.12.202311061800","finished_migrations":[]} + + 迁移指定版本的DB数据操作示例: + ./tool_ctl migrate specify-version --config-path=migrate.yaml --enable-auth=false --commit-id=759fa78e118faa92fc488fc90e5fffaa6617bd6d --version=y3.12.202311061800 + 回显样式: + migrate success, version: y3.12.202311061800 + ```