Skip to content

Commit

Permalink
[controller] remove keyword - reference in init.sql
Browse files Browse the repository at this point in the history
- reference not supported in OceanBase DB
- modify dsn to connector, because ":" in username don't support in dsn
- fix bug in master stop
  • Loading branch information
SongZhen0704 committed May 28, 2024
1 parent 3a588b1 commit 541f07f
Show file tree
Hide file tree
Showing 10 changed files with 167 additions and 78 deletions.
57 changes: 38 additions & 19 deletions server/controller/db/mysql/common/gorm.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
package common

import (
"database/sql"
"database/sql/driver"
"fmt"
l "log"
"os"
"time"

mysql_driver "github.com/go-sql-driver/mysql"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"gorm.io/gorm/logger"
Expand All @@ -31,33 +34,49 @@ import (
)

func GetSession(cfg config.MySqlConfig) (*gorm.DB, error) {
dsn := GenerateDSN(cfg, true, cfg.TimeOut, false)
return InitSession(dsn)
connector, err := GetConnector(cfg, true, cfg.TimeOut, false)
if err != nil {
return nil, err
}
return InitSession(connector)
}

func GenerateDSN(cfg config.MySqlConfig, useDatabase bool, timeout uint32, multiStatements bool) string {
func GetConnector(cfg config.MySqlConfig, useDatabase bool, timeout uint32, multiStatements bool) (driver.Connector, error) {
var database string
if useDatabase {
database = cfg.Database
}
dsn := fmt.Sprintf(
"%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local&timeout=%ds",
cfg.UserName,
cfg.UserPassword,
cfg.Host,
cfg.Port,
database,
timeout,
)
if multiStatements {
dsn += "&multiStatements=true"

location, err := time.LoadLocation("Local")
if err != nil {
log.Error("Get location failed with error: %v", err.Error())
return nil, err
}

config := mysql_driver.Config{
User: cfg.UserName,
Passwd: cfg.UserPassword,
Net: "tcp",
Addr: fmt.Sprintf("%s:%d", cfg.Host, cfg.Port),
DBName: database,
AllowNativePasswords: true,
Loc: location,
Timeout: time.Duration(timeout) * time.Second,
ParseTime: true,
MultiStatements: multiStatements,
Params: map[string]string{"charset": "utf8mb4"},
}
connector, err := mysql_driver.NewConnector(&config)
if err != nil {
log.Error("Get database(%s) connector failed with error: %v", database, err.Error())
return nil, err
}
return dsn
return connector, nil
}

func InitSession(dsn string) (*gorm.DB, error) {
func InitSession(connector driver.Connector) (*gorm.DB, error) {
db, err := gorm.Open(mysql.New(mysql.Config{
DSN: dsn, // DSN data source name
Conn: sql.OpenDB(connector),
DefaultStringSize: 256, // string 类型字段的默认长度
DisableDatetimePrecision: true, // 禁用 datetime 精度,MySQL 5.6 之前的数据库不支持
DontSupportRenameIndex: true, // 重命名索引时采用删除并新建的方式,MySQL 5.7 之前的数据库和 MariaDB 不支持重命名索引
Expand All @@ -75,10 +94,10 @@ func InitSession(dsn string) (*gorm.DB, error) {
}), // 配置log
})
if err != nil {
log.Errorf("failed to initialize session: %v, dsn: %s", err.Error(), dsn)
log.Errorf("failed to initialize session: %v", err.Error())
return nil, err
}
log.Infof("initialized mysql session successfully, dsn: %s", dsn)
log.Infof("initialized mysql session successfully")

sqlDB, _ := db.DB()
// 限制最大空闲连接数、最大连接数和连接的生命周期
Expand Down
6 changes: 3 additions & 3 deletions server/controller/db/mysql/migration/rawsql/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ CREATE TABLE IF NOT EXISTS vl2_net (
id INTEGER NOT NULL AUTO_INCREMENT,
prefix CHAR(64) DEFAULT '',
netmask CHAR(64) DEFAULT '',
vl2id INTEGER REFERENCES vl2(id),
vl2id INTEGER DEFAULT 0,
net_index INTEGER DEFAULT 0,
name VARCHAR(256) DEFAULT '',
label VARCHAR(64) DEFAULT '',
Expand Down Expand Up @@ -361,12 +361,12 @@ CREATE TABLE IF NOT EXISTS vinterface_ip (
netmask CHAR(64) DEFAULT '',
gateway CHAR(64) DEFAULT '',
create_method INTEGER DEFAULT 0 COMMENT '0.learning 1.user_defined',
vl2id INTEGER REFERENCES vl2(id),
vl2id INTEGER DEFAULT 0,
vl2_net_id INTEGER DEFAULT 0,
net_index INTEGER DEFAULT 0,
sub_domain CHAR(64) DEFAULT '',
domain CHAR(64) DEFAULT '',
vifid INTEGER REFERENCES vinterface(id),
vifid INTEGER DEFAULT 0,
isp INTEGER DEFAULT 0 COMMENT 'Used for multi-ISP access',
lcuuid CHAR(64) DEFAULT '',
PRIMARY KEY (id)
Expand Down
14 changes: 10 additions & 4 deletions server/controller/db/mysql/migrator/common/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,18 @@ import (
)

func GetSessionWithoutName(cfg config.MySqlConfig) (*gorm.DB, error) {
dsn := common.GenerateDSN(cfg, false, cfg.TimeOut, false)
return common.InitSession(dsn)
connector, err := common.GetConnector(cfg, false, cfg.TimeOut, false)
if err != nil {
return nil, err
}
return common.InitSession(connector)
}

func GetSessionWithName(cfg config.MySqlConfig) (*gorm.DB, error) {
// set multiStatements=true in dsn only when migrating MySQL
dsn := common.GenerateDSN(cfg, true, cfg.TimeOut*2, true)
return common.InitSession(dsn)
connector, err := common.GetConnector(cfg, true, cfg.TimeOut*2, true)
if err != nil {
return nil, err
}
return common.InitSession(connector)
}
14 changes: 11 additions & 3 deletions server/controller/http/service/resource/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1035,9 +1035,17 @@ func (c *DomainChecker) Stop() {

func (c *DomainChecker) CheckRegularly() {
go func() {
for range time.Tick(time.Duration(5) * time.Minute) {
for _, db := range mysql.GetDBs().All() {
c.checkAndAllocateController(db)
ticker := time.NewTicker(time.Duration(5) * time.Minute)
defer ticker.Stop()
LOOP:
for {
select {
case <-ticker.C:
for _, db := range mysql.GetDBs().All() {
c.checkAndAllocateController(db)
}
case <-c.ctx.Done():
break LOOP
}
}
}()
Expand Down
42 changes: 29 additions & 13 deletions server/controller/monitor/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,23 +59,39 @@ func NewAnalyzerCheck(cfg *config.ControllerConfig, ctx context.Context) *Analyz
func (c *AnalyzerCheck) Start() {
log.Info("analyzer check start")
go func() {
for range time.Tick(time.Duration(c.cfg.SyncDefaultORGDataInterval) * time.Second) {
c.SyncDefaultOrgData()
ticker := time.NewTicker(time.Duration(c.cfg.SyncDefaultORGDataInterval) * time.Second)
defer ticker.Stop()
LOOP1:
for {
select {
case <-ticker.C:
c.SyncDefaultOrgData()
case <-c.cCtx.Done():
break LOOP1
}
}
}()

go func() {
for range time.Tick(time.Duration(c.cfg.HealthCheckInterval) * time.Second) {
if err := mysql.GetDBs().DoOnAllDBs(func(db *mysql.DB) error {
// 数据节点健康检查
c.healthCheck(db)
// 检查没有分配数据节点的采集器,并进行分配
c.vtapAnalyzerCheck(db)
// check az_analyzer_connection, delete unused item
c.azConnectionCheck(db)
return nil
}); err != nil {
log.Error(err)
ticker := time.NewTicker(time.Duration(c.cfg.HealthCheckInterval) * time.Second)
defer ticker.Stop()
LOOP2:
for {
select {
case <-ticker.C:
if err := mysql.GetDBs().DoOnAllDBs(func(db *mysql.DB) error {
// 数据节点健康检查
c.healthCheck(db)
// 检查没有分配数据节点的采集器,并进行分配
c.vtapAnalyzerCheck(db)
// check az_analyzer_connection, delete unused item
c.azConnectionCheck(db)
return nil
}); err != nil {
log.Error(err)
}
case <-c.cCtx.Done():
break LOOP2
}
}
}()
Expand Down
48 changes: 29 additions & 19 deletions server/controller/monitor/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,29 +64,39 @@ func NewControllerCheck(cfg *config.ControllerConfig, ctx context.Context) *Cont
func (c *ControllerCheck) Start() {
log.Info("controller check start")
go func() {
for range time.Tick(time.Duration(c.cfg.SyncDefaultORGDataInterval) * time.Second) {
c.SyncDefaultOrgData()
}
}()

go func() {
for range time.Tick(time.Duration(c.cfg.SyncDefaultORGDataInterval) * time.Second) {
c.SyncDefaultOrgData()
ticker := time.NewTicker(time.Duration(c.cfg.SyncDefaultORGDataInterval) * time.Second)
defer ticker.Stop()
LOOP1:
for {
select {
case <-ticker.C:
c.SyncDefaultOrgData()
case <-c.cCtx.Done():
break LOOP1
}
}
}()

go func() {
for range time.Tick(time.Duration(c.cfg.HealthCheckInterval) * time.Second) {
if err := mysql.GetDBs().DoOnAllDBs(func(db *mysql.DB) error {
// 控制器健康检查
c.healthCheck(db)
// 检查没有分配控制器的采集器,并进行分配
c.vtapControllerCheck(db)
// check az_controller_connection, delete unused item
c.azConnectionCheck(db)
return nil
}); err != nil {
log.Error(err)
ticker := time.NewTicker(time.Duration(c.cfg.HealthCheckInterval) * time.Second)
defer ticker.Stop()
LOOP2:
for {
select {
case <-ticker.C:
if err := mysql.GetDBs().DoOnAllDBs(func(db *mysql.DB) error {
// 控制器健康检查
c.healthCheck(db)
// 检查没有分配控制器的采集器,并进行分配
c.vtapControllerCheck(db)
// check az_controller_connection, delete unused item
c.azConnectionCheck(db)
return nil
}); err != nil {
log.Error(err)
}
case <-c.cCtx.Done():
break LOOP2
}
}
}()
Expand Down
20 changes: 14 additions & 6 deletions server/controller/monitor/license/pseudo_license.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,20 @@ func NewVTapLicenseAllocation(cfg config.MonitorConfig, ctx context.Context) *VT
func (v *VTapLicenseAllocation) Start() {
log.Info("vtap license allocation and check start")
go func() {
for range time.Tick(time.Duration(v.cfg.LicenseCheckInterval) * time.Second) {
if err := mysql.GetDBs().DoOnAllDBs(func(db *mysql.DB) error {
v.allocLicense(db)
return nil
}); err != nil {
log.Error(err)
ticker := time.NewTicker(time.Duration(v.cfg.LicenseCheckInterval) * time.Second)
defer ticker.Stop()
LOOP:
for {
select {
case <-ticker.C:
if err := mysql.GetDBs().DoOnAllDBs(func(db *mysql.DB) error {
v.allocLicense(db)
return nil
}); err != nil {
log.Error(err)
}
case <-v.vCtx.Done():
break LOOP
}
}
}()
Expand Down
36 changes: 27 additions & 9 deletions server/controller/monitor/vtap/rebalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,20 @@ func (r *RebalanceCheck) Start() {
return
}

for range time.Tick(time.Duration(r.cfg.RebalanceCheckInterval) * time.Second) {
for _, db := range mysql.GetDBs().All() {
r.controllerRebalance(db)
if r.cfg.IngesterLoadBalancingConfig.Algorithm == common.ANALYZER_ALLOC_BY_AGENT_COUNT {
r.analyzerRebalance(db)
ticker := time.NewTicker(time.Duration(r.cfg.RebalanceCheckInterval) * time.Second)
defer ticker.Stop()
LOOP:
for {
select {
case <-ticker.C:
for _, db := range mysql.GetDBs().All() {
r.controllerRebalance(db)
if r.cfg.IngesterLoadBalancingConfig.Algorithm == common.ANALYZER_ALLOC_BY_AGENT_COUNT {
r.analyzerRebalance(db)
}
}
case <-r.vCtx.Done():
break LOOP
}
}
}()
Expand All @@ -65,11 +73,21 @@ func (r *RebalanceCheck) Start() {
return
}

if r.cfg.IngesterLoadBalancingConfig.Algorithm == common.ANALYZER_ALLOC_BY_INGESTED_DATA {
duration := r.cfg.IngesterLoadBalancingConfig.DataDuration
r.analyzerRebalanceByTraffic(duration)
for range time.Tick(time.Duration(r.cfg.IngesterLoadBalancingConfig.RebalanceInterval) * time.Second) {
if r.cfg.IngesterLoadBalancingConfig.Algorithm != common.ANALYZER_ALLOC_BY_INGESTED_DATA {
return
}

duration := r.cfg.IngesterLoadBalancingConfig.DataDuration
r.analyzerRebalanceByTraffic(duration)

ticker := time.NewTicker(time.Duration(r.cfg.IngesterLoadBalancingConfig.RebalanceInterval) * time.Second)
LOOP:
for {
select {
case <-ticker.C:
r.analyzerRebalanceByTraffic(duration)
case <-r.vCtx.Done():
break LOOP
}
}
}()
Expand Down
3 changes: 2 additions & 1 deletion server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ require (
)

require (
filippo.io/edwards25519 v1.1.0 // indirect
github.com/DataDog/zstd v1.4.1 // indirect
github.com/bytedance/sonic v1.11.2 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
Expand Down Expand Up @@ -189,7 +190,7 @@ require (
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.19.0 // indirect
github.com/go-sql-driver/mysql v1.6.0 // indirect
github.com/go-sql-driver/mysql v1.8.1
github.com/goccy/go-json v0.10.2
github.com/golang/snappy v0.0.4
github.com/google/gnostic v0.5.7-v3refs // indirect
Expand Down
5 changes: 4 additions & 1 deletion server/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ cloud.google.com/go/compute v1.23.3 h1:6sVlXXBmbd7jNX0Ipq0trII3e4n1/MsADLK6a+aiV
cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY=
cloud.google.com/go/iam v0.3.0/go.mod h1:XzJPvDayI+9zsASAFO68Hk07u3z+f+JrT2xXNdp4bnY=
cloud.google.com/go/storage v1.23.0/go.mod h1:vOEEDNFnciUMhBeT6hsJIn3ieU5cFRmzeLgDvXzfIXc=
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible h1:HzKLt3kIwMm4KeJYTdx9EbjRYTySD/t8i1Ee/W5EGXw=
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
Expand Down Expand Up @@ -288,8 +290,9 @@ github.com/go-redis/redis/v9 v9.0.0-rc.2 h1:IN1eI8AvJJeWHjMW/hlFAv2sAfvTun2DVksD
github.com/go-redis/redis/v9 v9.0.0-rc.2/go.mod h1:cgBknjwcBJa2prbnuHH/4k/Mlj4r0pWNV2HBanHujfY=
github.com/go-resty/resty/v2 v2.1.1-0.20191201195748-d7b97669fe48 h1:JVrqSeQfdhYRFk24TvhTZWU0q8lfCojxZQFi3Ou7+uY=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE=
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y=
github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-zookeeper/zk v1.0.2 h1:4mx0EYENAdX/B/rbunjlt5+4RTA/a9SMHBRuSKdGxPM=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
Expand Down

0 comments on commit 541f07f

Please sign in to comment.