diff --git a/server/controller/db/mysql/common/gorm.go b/server/controller/db/mysql/common/gorm.go index 2f07d2bdfb9..ad296ba1714 100644 --- a/server/controller/db/mysql/common/gorm.go +++ b/server/controller/db/mysql/common/gorm.go @@ -17,11 +17,14 @@ package common import ( + "database/sql" + "database/sql/driver" "fmt" l "log" "os" "time" + mysql_driver "github.com/go-sql-driver/mysql" "gorm.io/driver/mysql" "gorm.io/gorm" "gorm.io/gorm/logger" @@ -31,33 +34,49 @@ import ( ) func GetSession(cfg config.MySqlConfig) (*gorm.DB, error) { - dsn := GenerateDSN(cfg, true, cfg.TimeOut, false) - return InitSession(dsn) + connector, err := GetConnector(cfg, true, cfg.TimeOut, false) + if err != nil { + return nil, err + } + return InitSession(connector) } -func GenerateDSN(cfg config.MySqlConfig, useDatabase bool, timeout uint32, multiStatements bool) string { +func GetConnector(cfg config.MySqlConfig, useDatabase bool, timeout uint32, multiStatements bool) (driver.Connector, error) { var database string if useDatabase { database = cfg.Database } - dsn := fmt.Sprintf( - "%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local&timeout=%ds", - cfg.UserName, - cfg.UserPassword, - cfg.Host, - cfg.Port, - database, - timeout, - ) - if multiStatements { - dsn += "&multiStatements=true" + + location, err := time.LoadLocation("Local") + if err != nil { + log.Error("Get location failed with error: %v", err.Error()) + return nil, err + } + + config := mysql_driver.Config{ + User: cfg.UserName, + Passwd: cfg.UserPassword, + Net: "tcp", + Addr: fmt.Sprintf("%s:%d", cfg.Host, cfg.Port), + DBName: database, + AllowNativePasswords: true, + Loc: location, + Timeout: time.Duration(timeout) * time.Second, + ParseTime: true, + MultiStatements: multiStatements, + Params: map[string]string{"charset": "utf8mb4"}, + } + connector, err := mysql_driver.NewConnector(&config) + if err != nil { + log.Error("Get database(%s) connector failed with error: %v", database, err.Error()) + return nil, err } - return dsn + return connector, nil } -func InitSession(dsn string) (*gorm.DB, error) { +func InitSession(connector driver.Connector) (*gorm.DB, error) { db, err := gorm.Open(mysql.New(mysql.Config{ - DSN: dsn, // DSN data source name + Conn: sql.OpenDB(connector), DefaultStringSize: 256, // string 类型字段的默认长度 DisableDatetimePrecision: true, // 禁用 datetime 精度,MySQL 5.6 之前的数据库不支持 DontSupportRenameIndex: true, // 重命名索引时采用删除并新建的方式,MySQL 5.7 之前的数据库和 MariaDB 不支持重命名索引 @@ -75,10 +94,10 @@ func InitSession(dsn string) (*gorm.DB, error) { }), // 配置log }) if err != nil { - log.Errorf("failed to initialize session: %v, dsn: %s", err.Error(), dsn) + log.Errorf("failed to initialize session: %v", err.Error()) return nil, err } - log.Infof("initialized mysql session successfully, dsn: %s", dsn) + log.Infof("initialized mysql session successfully") sqlDB, _ := db.DB() // 限制最大空闲连接数、最大连接数和连接的生命周期 diff --git a/server/controller/db/mysql/migration/rawsql/init.sql b/server/controller/db/mysql/migration/rawsql/init.sql index a4b9464f64a..973e165bbfc 100644 --- a/server/controller/db/mysql/migration/rawsql/init.sql +++ b/server/controller/db/mysql/migration/rawsql/init.sql @@ -287,7 +287,7 @@ CREATE TABLE IF NOT EXISTS vl2_net ( id INTEGER NOT NULL AUTO_INCREMENT, prefix CHAR(64) DEFAULT '', netmask CHAR(64) DEFAULT '', - vl2id INTEGER REFERENCES vl2(id), + vl2id INTEGER DEFAULT 0, net_index INTEGER DEFAULT 0, name VARCHAR(256) DEFAULT '', label VARCHAR(64) DEFAULT '', @@ -361,12 +361,12 @@ CREATE TABLE IF NOT EXISTS vinterface_ip ( netmask CHAR(64) DEFAULT '', gateway CHAR(64) DEFAULT '', create_method INTEGER DEFAULT 0 COMMENT '0.learning 1.user_defined', - vl2id INTEGER REFERENCES vl2(id), + vl2id INTEGER DEFAULT 0, vl2_net_id INTEGER DEFAULT 0, net_index INTEGER DEFAULT 0, sub_domain CHAR(64) DEFAULT '', domain CHAR(64) DEFAULT '', - vifid INTEGER REFERENCES vinterface(id), + vifid INTEGER DEFAULT 0, isp INTEGER DEFAULT 0 COMMENT 'Used for multi-ISP access', lcuuid CHAR(64) DEFAULT '', PRIMARY KEY (id) diff --git a/server/controller/db/mysql/migrator/common/db.go b/server/controller/db/mysql/migrator/common/db.go index cd8afb35088..d2790358b18 100644 --- a/server/controller/db/mysql/migrator/common/db.go +++ b/server/controller/db/mysql/migrator/common/db.go @@ -24,12 +24,18 @@ import ( ) func GetSessionWithoutName(cfg config.MySqlConfig) (*gorm.DB, error) { - dsn := common.GenerateDSN(cfg, false, cfg.TimeOut, false) - return common.InitSession(dsn) + connector, err := common.GetConnector(cfg, false, cfg.TimeOut, false) + if err != nil { + return nil, err + } + return common.InitSession(connector) } func GetSessionWithName(cfg config.MySqlConfig) (*gorm.DB, error) { // set multiStatements=true in dsn only when migrating MySQL - dsn := common.GenerateDSN(cfg, true, cfg.TimeOut*2, true) - return common.InitSession(dsn) + connector, err := common.GetConnector(cfg, true, cfg.TimeOut*2, true) + if err != nil { + return nil, err + } + return common.InitSession(connector) } diff --git a/server/controller/http/service/resource/domain.go b/server/controller/http/service/resource/domain.go index 99f62d8af10..260da446f17 100644 --- a/server/controller/http/service/resource/domain.go +++ b/server/controller/http/service/resource/domain.go @@ -1035,9 +1035,17 @@ func (c *DomainChecker) Stop() { func (c *DomainChecker) CheckRegularly() { go func() { - for range time.Tick(time.Duration(5) * time.Minute) { - for _, db := range mysql.GetDBs().All() { - c.checkAndAllocateController(db) + ticker := time.NewTicker(time.Duration(5) * time.Minute) + defer ticker.Stop() + LOOP: + for { + select { + case <-ticker.C: + for _, db := range mysql.GetDBs().All() { + c.checkAndAllocateController(db) + } + case <-c.ctx.Done(): + break LOOP } } }() diff --git a/server/controller/monitor/analyzer.go b/server/controller/monitor/analyzer.go index b81e79c1cc2..ba54799fc64 100644 --- a/server/controller/monitor/analyzer.go +++ b/server/controller/monitor/analyzer.go @@ -59,23 +59,39 @@ func NewAnalyzerCheck(cfg *config.ControllerConfig, ctx context.Context) *Analyz func (c *AnalyzerCheck) Start() { log.Info("analyzer check start") go func() { - for range time.Tick(time.Duration(c.cfg.SyncDefaultORGDataInterval) * time.Second) { - c.SyncDefaultOrgData() + ticker := time.NewTicker(time.Duration(c.cfg.SyncDefaultORGDataInterval) * time.Second) + defer ticker.Stop() + LOOP1: + for { + select { + case <-ticker.C: + c.SyncDefaultOrgData() + case <-c.cCtx.Done(): + break LOOP1 + } } }() go func() { - for range time.Tick(time.Duration(c.cfg.HealthCheckInterval) * time.Second) { - if err := mysql.GetDBs().DoOnAllDBs(func(db *mysql.DB) error { - // 数据节点健康检查 - c.healthCheck(db) - // 检查没有分配数据节点的采集器,并进行分配 - c.vtapAnalyzerCheck(db) - // check az_analyzer_connection, delete unused item - c.azConnectionCheck(db) - return nil - }); err != nil { - log.Error(err) + ticker := time.NewTicker(time.Duration(c.cfg.HealthCheckInterval) * time.Second) + defer ticker.Stop() + LOOP2: + for { + select { + case <-ticker.C: + if err := mysql.GetDBs().DoOnAllDBs(func(db *mysql.DB) error { + // 数据节点健康检查 + c.healthCheck(db) + // 检查没有分配数据节点的采集器,并进行分配 + c.vtapAnalyzerCheck(db) + // check az_analyzer_connection, delete unused item + c.azConnectionCheck(db) + return nil + }); err != nil { + log.Error(err) + } + case <-c.cCtx.Done(): + break LOOP2 } } }() diff --git a/server/controller/monitor/controller.go b/server/controller/monitor/controller.go index adc873b6a2b..4e41895a6d5 100644 --- a/server/controller/monitor/controller.go +++ b/server/controller/monitor/controller.go @@ -64,29 +64,39 @@ func NewControllerCheck(cfg *config.ControllerConfig, ctx context.Context) *Cont func (c *ControllerCheck) Start() { log.Info("controller check start") go func() { - for range time.Tick(time.Duration(c.cfg.SyncDefaultORGDataInterval) * time.Second) { - c.SyncDefaultOrgData() - } - }() - - go func() { - for range time.Tick(time.Duration(c.cfg.SyncDefaultORGDataInterval) * time.Second) { - c.SyncDefaultOrgData() + ticker := time.NewTicker(time.Duration(c.cfg.SyncDefaultORGDataInterval) * time.Second) + defer ticker.Stop() + LOOP1: + for { + select { + case <-ticker.C: + c.SyncDefaultOrgData() + case <-c.cCtx.Done(): + break LOOP1 + } } }() go func() { - for range time.Tick(time.Duration(c.cfg.HealthCheckInterval) * time.Second) { - if err := mysql.GetDBs().DoOnAllDBs(func(db *mysql.DB) error { - // 控制器健康检查 - c.healthCheck(db) - // 检查没有分配控制器的采集器,并进行分配 - c.vtapControllerCheck(db) - // check az_controller_connection, delete unused item - c.azConnectionCheck(db) - return nil - }); err != nil { - log.Error(err) + ticker := time.NewTicker(time.Duration(c.cfg.HealthCheckInterval) * time.Second) + defer ticker.Stop() + LOOP2: + for { + select { + case <-ticker.C: + if err := mysql.GetDBs().DoOnAllDBs(func(db *mysql.DB) error { + // 控制器健康检查 + c.healthCheck(db) + // 检查没有分配控制器的采集器,并进行分配 + c.vtapControllerCheck(db) + // check az_controller_connection, delete unused item + c.azConnectionCheck(db) + return nil + }); err != nil { + log.Error(err) + } + case <-c.cCtx.Done(): + break LOOP2 } } }() diff --git a/server/controller/monitor/license/pseudo_license.go b/server/controller/monitor/license/pseudo_license.go index 1e1161e37fe..3fca48aabbb 100644 --- a/server/controller/monitor/license/pseudo_license.go +++ b/server/controller/monitor/license/pseudo_license.go @@ -59,12 +59,20 @@ func NewVTapLicenseAllocation(cfg config.MonitorConfig, ctx context.Context) *VT func (v *VTapLicenseAllocation) Start() { log.Info("vtap license allocation and check start") go func() { - for range time.Tick(time.Duration(v.cfg.LicenseCheckInterval) * time.Second) { - if err := mysql.GetDBs().DoOnAllDBs(func(db *mysql.DB) error { - v.allocLicense(db) - return nil - }); err != nil { - log.Error(err) + ticker := time.NewTicker(time.Duration(v.cfg.LicenseCheckInterval) * time.Second) + defer ticker.Stop() + LOOP: + for { + select { + case <-ticker.C: + if err := mysql.GetDBs().DoOnAllDBs(func(db *mysql.DB) error { + v.allocLicense(db) + return nil + }); err != nil { + log.Error(err) + } + case <-v.vCtx.Done(): + break LOOP } } }() diff --git a/server/controller/monitor/vtap/rebalance.go b/server/controller/monitor/vtap/rebalance.go index d02cc17ae77..7ea0e787cc4 100644 --- a/server/controller/monitor/vtap/rebalance.go +++ b/server/controller/monitor/vtap/rebalance.go @@ -50,12 +50,20 @@ func (r *RebalanceCheck) Start() { return } - for range time.Tick(time.Duration(r.cfg.RebalanceCheckInterval) * time.Second) { - for _, db := range mysql.GetDBs().All() { - r.controllerRebalance(db) - if r.cfg.IngesterLoadBalancingConfig.Algorithm == common.ANALYZER_ALLOC_BY_AGENT_COUNT { - r.analyzerRebalance(db) + ticker := time.NewTicker(time.Duration(r.cfg.RebalanceCheckInterval) * time.Second) + defer ticker.Stop() + LOOP: + for { + select { + case <-ticker.C: + for _, db := range mysql.GetDBs().All() { + r.controllerRebalance(db) + if r.cfg.IngesterLoadBalancingConfig.Algorithm == common.ANALYZER_ALLOC_BY_AGENT_COUNT { + r.analyzerRebalance(db) + } } + case <-r.vCtx.Done(): + break LOOP } } }() @@ -65,11 +73,21 @@ func (r *RebalanceCheck) Start() { return } - if r.cfg.IngesterLoadBalancingConfig.Algorithm == common.ANALYZER_ALLOC_BY_INGESTED_DATA { - duration := r.cfg.IngesterLoadBalancingConfig.DataDuration - r.analyzerRebalanceByTraffic(duration) - for range time.Tick(time.Duration(r.cfg.IngesterLoadBalancingConfig.RebalanceInterval) * time.Second) { + if r.cfg.IngesterLoadBalancingConfig.Algorithm != common.ANALYZER_ALLOC_BY_INGESTED_DATA { + return + } + + duration := r.cfg.IngesterLoadBalancingConfig.DataDuration + r.analyzerRebalanceByTraffic(duration) + + ticker := time.NewTicker(time.Duration(r.cfg.IngesterLoadBalancingConfig.RebalanceInterval) * time.Second) + LOOP: + for { + select { + case <-ticker.C: r.analyzerRebalanceByTraffic(duration) + case <-r.vCtx.Done(): + break LOOP } } }() diff --git a/server/go.mod b/server/go.mod index 7058959e5c8..8e981193453 100644 --- a/server/go.mod +++ b/server/go.mod @@ -114,6 +114,7 @@ require ( ) require ( + filippo.io/edwards25519 v1.1.0 // indirect github.com/DataDog/zstd v1.4.1 // indirect github.com/bytedance/sonic v1.11.2 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect @@ -189,7 +190,7 @@ require ( github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect github.com/go-playground/validator/v10 v10.19.0 // indirect - github.com/go-sql-driver/mysql v1.6.0 // indirect + github.com/go-sql-driver/mysql v1.8.1 github.com/goccy/go-json v0.10.2 github.com/golang/snappy v0.0.4 github.com/google/gnostic v0.5.7-v3refs // indirect diff --git a/server/go.sum b/server/go.sum index 45bac3685b0..b2438fba28f 100644 --- a/server/go.sum +++ b/server/go.sum @@ -12,6 +12,8 @@ cloud.google.com/go/compute v1.23.3 h1:6sVlXXBmbd7jNX0Ipq0trII3e4n1/MsADLK6a+aiV cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY= cloud.google.com/go/iam v0.3.0/go.mod h1:XzJPvDayI+9zsASAFO68Hk07u3z+f+JrT2xXNdp4bnY= cloud.google.com/go/storage v1.23.0/go.mod h1:vOEEDNFnciUMhBeT6hsJIn3ieU5cFRmzeLgDvXzfIXc= +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= github.com/Azure/azure-sdk-for-go v65.0.0+incompatible h1:HzKLt3kIwMm4KeJYTdx9EbjRYTySD/t8i1Ee/W5EGXw= github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs= github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= @@ -288,8 +290,9 @@ github.com/go-redis/redis/v9 v9.0.0-rc.2 h1:IN1eI8AvJJeWHjMW/hlFAv2sAfvTun2DVksD github.com/go-redis/redis/v9 v9.0.0-rc.2/go.mod h1:cgBknjwcBJa2prbnuHH/4k/Mlj4r0pWNV2HBanHujfY= github.com/go-resty/resty/v2 v2.1.1-0.20191201195748-d7b97669fe48 h1:JVrqSeQfdhYRFk24TvhTZWU0q8lfCojxZQFi3Ou7+uY= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= -github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE= github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= +github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= +github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-zookeeper/zk v1.0.2 h1:4mx0EYENAdX/B/rbunjlt5+4RTA/a9SMHBRuSKdGxPM= github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=