Skip to content

Commit

Permalink
feat: genesis add attribute isMaster
Browse files Browse the repository at this point in the history
  • Loading branch information
askyrie committed Feb 13, 2025
1 parent ecd7c25 commit 407f1ec
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestKubernetes(t *testing.T) {
})
defer k8sInfoPatch.Reset()

g := genesis.NewGenesis(context.Background(), &config.ControllerConfig{})
g := genesis.NewGenesis(context.Background(), true, &config.ControllerConfig{})
vJsonData, _ := os.ReadFile("./testfiles/vinterfaces.json")
var vData gcommon.GenesisSyncDataResponse
json.Unmarshal(vJsonData, &vData)
Expand Down
4 changes: 2 additions & 2 deletions server/controller/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func Start(ctx context.Context, configPath, serverLogFile string, shared *server
}

// 初始化Redis
if cfg.RedisCfg.Enabled && cfg.TrisolarisCfg.NodeType == "master" {
if cfg.RedisCfg.Enabled {
router.SetInitStageForHealthChecker("Redis init")

err := redis.Init(ctx, cfg.RedisCfg)
Expand All @@ -124,7 +124,7 @@ func Start(ctx context.Context, configPath, serverLogFile string, shared *server

router.SetInitStageForHealthChecker("Genesis init")
// 启动genesis
g := genesis.NewGenesis(ctx, cfg)
g := genesis.NewGenesis(ctx, isMasterController, cfg)

// start tagrecorder before manager to prevent recorder from publishing message when tagrecorder is not ready
router.SetInitStageForHealthChecker("TagRecorder init")
Expand Down
2 changes: 1 addition & 1 deletion server/controller/genesis/common/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package common

const (
SYNC_TYPE_FORMAT = "%v-%v-%v-%v" // region-orgID-type-vtapID
SYNC_TYPE_FORMAT = "%v-%v-%v" // orgID-type-vtapID
)

const (
Expand Down
4 changes: 2 additions & 2 deletions server/controller/genesis/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ type Genesis struct {
Synchronizer *grpc.SynchronizerServer
}

func NewGenesis(ctx context.Context, config *config.ControllerConfig) *Genesis {
func NewGenesis(ctx context.Context, isMaster bool, config *config.ControllerConfig) *Genesis {
syncQueue := queue.NewOverwriteQueue("genesis-sync-data", config.GenesisCfg.QueueLengths)
kubernetesQueue := queue.NewOverwriteQueue("genesis-k8s-data", config.GenesisCfg.QueueLengths)

genesisSync := sstore.NewGenesisSync(ctx, syncQueue, config)
genesisSync := sstore.NewGenesisSync(ctx, isMaster, syncQueue, config)
genesisSync.Start()

genesisK8S := kstore.NewGenesisKubernetes(ctx, kubernetesQueue, config)
Expand Down
24 changes: 13 additions & 11 deletions server/controller/genesis/store/sync/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,25 @@ import (
var log = logger.MustGetLogger("genesis.store.sync")

type GenesisSync struct {
data atomic.Value
ctx context.Context
cancel context.CancelFunc
queue queue.QueueReader
config *config.ControllerConfig
isMaster bool
data atomic.Value
ctx context.Context
cancel context.CancelFunc
queue queue.QueueReader
config *config.ControllerConfig
}

func NewGenesisSync(ctx context.Context, queue queue.QueueReader, config *config.ControllerConfig) *GenesisSync {
func NewGenesisSync(ctx context.Context, isMaster bool, queue queue.QueueReader, config *config.ControllerConfig) *GenesisSync {
var data atomic.Value
data.Store(common.GenesisSyncData{})
ctx, cancel := context.WithCancel(ctx)
return &GenesisSync{
ctx: ctx,
cancel: cancel,
data: data,
queue: queue,
config: config,
isMaster: isMaster,
ctx: ctx,
cancel: cancel,
data: data,
queue: queue,
config: config,
}
}

Expand Down

0 comments on commit 407f1ec

Please sign in to comment.