Skip to content

Commit

Permalink
feat: 新增_客户端权限克隆ip模糊匹配未匹配任何权限时bug处理_空闲协程过多问题处理_的功能 #6476
Browse files Browse the repository at this point in the history
  • Loading branch information
fanfanyangyang authored and iSecloud committed Sep 2, 2024
1 parent ab416a1 commit 0807e08
Show file tree
Hide file tree
Showing 19 changed files with 210 additions and 158 deletions.
15 changes: 7 additions & 8 deletions dbm-services/mysql/db-partition/service/check_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,17 @@ func CheckPartitionConfigs(configs []*PartitionConfig, dbtype string, splitCnt i
tokenBucket := make(chan int, 10) // 最大并行度

for _, config := range configs {
err := limiter.Wait(context.Background())
if err != nil {
checkFailSet.Mu.Lock()
checkFailSet.IdLogs = append(checkFailSet.IdLogs, IdLog{(*config).ID, err.Error()})
checkFailSet.Mu.Unlock()
continue
}
wg.Add(1)
tokenBucket <- 0

ctx, cancel := context.WithTimeout(context.Background(), 180*time.Second)
go func(config *PartitionConfig) {
err := limiter.Wait(context.Background())
if err != nil {
checkFailSet.Mu.Lock()
checkFailSet.IdLogs = append(checkFailSet.IdLogs, IdLog{(*config).ID, err.Error()})
checkFailSet.Mu.Unlock()
return
}
CheckOnePartitionConfig(ctx, cancel, *config, &wg, &sqlSet, &nothingToDoSet, &checkFailSet, dbtype, splitCnt,
fromCron, host, &tokenBucket)
}(config)
Expand Down
30 changes: 15 additions & 15 deletions dbm-services/mysql/db-partition/service/cron_basic_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,20 +440,20 @@ func DownLoadFilesCreateTicketByMachine(cloudMachineList map[int][]string, machi
for cloud, machines := range cloudMachineList {
tmp := util.SplitArray(machines, 20)
for _, ips := range tmp {
errLimiter := limiter.Wait(context.Background())
if errLimiter != nil {
msg := "dbmeta/apis/v1/flow/scene/download_dbactor/ error"
SendMonitor(msg, errLimiter)
slog.Error("msg", msg, errLimiter)
continue
}
wg.Add(1)
go func(cloud int, ips []string) {
defer func() {
wg.Done()
}()
err := limiter.Wait(context.Background())
if err != nil {
msg := "dbmeta/apis/v1/flow/scene/download_dbactor/ error"
SendMonitor(msg, err)
slog.Error("msg", msg, err)
return
}
// 按照机器下载好dbactor
err = DownloadDbactor(cloud, ips)
err := DownloadDbactor(cloud, ips)
// dbactor下载失败,可以继续执行分区的单据,机器上可能已经存在dbactor
if err != nil {
dimension := monitor.NewDeveloperEventDimension(Scheduler, monitor.PartitionCron)
Expand Down Expand Up @@ -502,19 +502,19 @@ func DownLoadFilesCreateTicketByCluster(clusterIps map[string][]string, machineF
vcluster := strings.Split(cluster, "|")
domain := vcluster[0]
cloud, _ := strconv.Atoi(vcluster[2])
err := limiter.Wait(context.Background())
if err != nil {
msg := "get token error"
SendMonitor(msg, err)
slog.Error("msg", msg, err)
continue
}
wg.Add(1)
var clusterFiles []Info
go func(domain string, cloud int, machines []string) {
defer func() {
wg.Done()
}()
err := limiter.Wait(context.Background())
if err != nil {
msg := "get token error"
SendMonitor(msg, err)
slog.Error("msg", msg, err)
return
}
tmp := util.SplitArray(machines, 20)
for _, ips := range tmp {
// 按照机器下载好dbactor
Expand Down
15 changes: 11 additions & 4 deletions dbm-services/mysql/db-priv/handler/account_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func (m *PrivService) GetAccountRuleList(c *gin.Context) {
func (m *PrivService) AddAccountRule(c *gin.Context) {
slog.Info("do AddAccountRule!")
var input service.AccountRulePara
var rules []service.TbAccountRules
ticket := strings.TrimPrefix(c.FullPath(), "/priv/")

body, err := ioutil.ReadAll(c.Request.Body)
Expand All @@ -55,13 +56,19 @@ func (m *PrivService) AddAccountRule(c *gin.Context) {
SendResponse(c, errno.ErrBind, err)
return
}

if *input.ClusterType == "mongodb" {
err = input.MongoDBAddAccountRule(string(body), ticket)
rules, err = input.MongoDBAddAccountRule(string(body), ticket)
} else {
err = input.AddAccountRule(string(body), ticket)
rules, err = input.AddAccountRule(string(body), ticket)
}
SendResponse(c, err, nil)
if err != nil {
SendResponse(c, err, nil)
return
}
SendResponse(c, err, ListResponse{
Count: len(rules),
Items: rules,
})
return
}

Expand Down
4 changes: 4 additions & 0 deletions dbm-services/mysql/db-priv/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"dbm-services/mysql/priv-service/util"
"io"
"log/slog"
"net/http"
Expand Down Expand Up @@ -41,6 +42,9 @@ func main() {
}
}

util.DbmetaClient = util.NewClientByHosts(viper.GetString("dbmeta"))
util.DrsClient = util.NewClientByHosts(viper.GetString("dbRemoteService"))

// 注册服务
gin.SetMode(gin.ReleaseMode)
engine := gin.New()
Expand Down
16 changes: 9 additions & 7 deletions dbm-services/mysql/db-priv/service/account_rule_mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ import (
)

// MongoDBAddAccountRule 新增账号规则
func (m *AccountRulePara) MongoDBAddAccountRule(jsonPara string, ticket string) error {
func (m *AccountRulePara) MongoDBAddAccountRule(jsonPara string, ticket string) ([]TbAccountRules, error) {
var (
accountRule TbAccountRules
dbs []string
allTypePriv string
userPriv string
managerPriv string
err error
rules []TbAccountRules
)
// mongo_user: read readWrite readAnyDatabase readWriteAnyDatabase
// mongo_manager: dbAdmin backup restore userAdmin clusterAdmin
Expand All @@ -24,17 +25,17 @@ func (m *AccountRulePara) MongoDBAddAccountRule(jsonPara string, ticket string)

err = m.ParaPreCheck()
if err != nil {
return err
return nil, err
}

dbs, err = util.String2Slice(m.Dbname)
if err != nil {
return err
return nil, err
}

_, err = AccountRulePreCheck(m.BkBizId, m.AccountId, *m.ClusterType, dbs, false)
if err != nil {
return err
return nil, err
}

for _, _type := range ConstPrivType {
Expand Down Expand Up @@ -62,15 +63,16 @@ func (m *AccountRulePara) MongoDBAddAccountRule(jsonPara string, ticket string)
err = tx.Debug().Model(&TbAccountRules{}).Create(&accountRule).Error
if err != nil {
tx.Rollback()
return err
return nil, err
}
rules = append(rules, accountRule)
}
err = tx.Commit().Error
if err != nil {
return err
return nil, err
}
log := PrivLog{BkBizId: m.BkBizId, Ticket: ticket, Operator: m.Operator, Para: jsonPara, Time: vtime}
AddPrivLog(log)

return nil
return rules, nil
}
17 changes: 9 additions & 8 deletions dbm-services/mysql/db-priv/service/accout_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,14 +142,15 @@ func (m *QueryRulePara) QueryAccountRule() ([]*AccountRuleSplitUser, int, error)
}

// AddAccountRule 新增账号规则
func (m *AccountRulePara) AddAccountRule(jsonPara string, ticket string) error {
func (m *AccountRulePara) AddAccountRule(jsonPara string, ticket string) ([]TbAccountRules, error) {
var (
accountRule TbAccountRules
dbs []string
allTypePriv string
dmlDdlPriv string
globalPriv string
err error
rules []TbAccountRules
)
// dml: select,insert,update,delete
// ddl: create,alter,drop,index,execute,create view
Expand All @@ -168,17 +169,17 @@ func (m *AccountRulePara) AddAccountRule(jsonPara string, ticket string) error {

err = m.ParaPreCheck()
if err != nil {
return err
return nil, err
}

dbs, err = util.String2Slice(m.Dbname)
if err != nil {
return err
return nil, err
}

_, err = AccountRulePreCheck(m.BkBizId, m.AccountId, *m.ClusterType, dbs, false)
if err != nil {
return err
return nil, err
}

for _, _type := range ConstPrivType {
Expand All @@ -205,17 +206,17 @@ func (m *AccountRulePara) AddAccountRule(jsonPara string, ticket string) error {
err = tx.Debug().Model(&TbAccountRules{}).Create(&accountRule).Error
if err != nil {
tx.Rollback()
return err
return nil, err
}
rules = append(rules, accountRule)
}
err = tx.Commit().Error
if err != nil {
return err
return nil, err
}
log := PrivLog{BkBizId: m.BkBizId, Ticket: ticket, Operator: m.Operator, Para: jsonPara, Time: vtime}
AddPrivLog(log)

return nil
return rules, nil
}

// AddAccountRuleDryRun 新增账号规则检查
Expand Down
20 changes: 8 additions & 12 deletions dbm-services/mysql/db-priv/service/add_priv.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ import (
"golang.org/x/time/rate"

"dbm-services/common/go-pubpkg/errno"
"dbm-services/mysql/priv-service/util"

"github.com/spf13/viper"
)

// AddPrivDryRun 使用账号规则,新增权限预检查
Expand Down Expand Up @@ -80,8 +77,7 @@ func (m *PrivTaskPara) AddPriv(jsonPara string, ticket string) error {
return errno.ClusterTypeIsEmpty
}
AddPrivLog(PrivLog{BkBizId: m.BkBizId, Ticket: ticket, Operator: m.Operator, Para: jsonPara, Time: time.Now()})
client := util.NewClientByHosts(viper.GetString("dbmeta"))
limit := rate.Every(time.Millisecond * 200) // QPS:5
limit := rate.Every(time.Millisecond * 100) // QPS:10
burst := 10 // 桶容量 10
limiter := rate.NewLimiter(limit, burst)
for _, rule := range m.AccoutRules { // 添加权限,for acccountRuleList;for instanceList; do create a routine
Expand All @@ -91,6 +87,12 @@ func (m *PrivTaskPara) AddPriv(jsonPara string, ticket string) error {
continue
}
for _, dns := range m.TargetInstances {
errLimiter := limiter.Wait(context.Background())
if errLimiter != nil {
slog.Error("limiter.Wait", "error", errLimiter, "dns", dns)
AddErrorOnly(&errMsg, errors.New(errLimiter.Error()))
continue
}
wg.Add(1)
go func(dns string) {
defer func() {
Expand All @@ -111,13 +113,7 @@ func (m *PrivTaskPara) AddPriv(jsonPara string, ticket string) error {
successInfo = fmt.Sprintf(`%s,授权成功。`, baseInfo)
failInfo = fmt.Sprintf(`%s,授权失败:`, baseInfo)

err = limiter.Wait(context.Background())
if err != nil {
AddErrorOnly(&errMsg, errors.New(failInfo+sep+err.Error()))
return
}

instance, err = GetCluster(client, m.ClusterType, Domain{EntryName: dns})
instance, err = GetCluster(m.ClusterType, Domain{EntryName: dns})
if err != nil {
AddErrorOnly(&errMsg, errors.New(failInfo+sep+err.Error()))
return
Expand Down
5 changes: 1 addition & 4 deletions dbm-services/mysql/db-priv/service/add_priv_base_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/asaskevich/govalidator"
"github.com/jinzhu/gorm"
"github.com/spf13/viper"
)

// GetAccountRuleInfo 根据账号名获取账号信息,根据账号 id 以及授权数据库获取账号规则
Expand Down Expand Up @@ -489,8 +488,6 @@ func DeduplicationTargetInstance(instances []string, clusterType string) ([]stri
UniqMap = make(map[string]struct{})
err error
)

client := util.NewClientByHosts(viper.GetString("dbmeta"))
for _, instance := range instances {
instance = strings.Trim(strings.TrimSpace(instance), ".")
if !govalidator.IsDNSName(instance) {
Expand All @@ -499,7 +496,7 @@ func DeduplicationTargetInstance(instances []string, clusterType string) ([]stri
continue
}
dns = Domain{EntryName: instance}
_, err = GetCluster(client, clusterType, dns)
_, err = GetCluster(clusterType, dns)
if err != nil {
errMsg = append(errMsg, err.Error())
continue
Expand Down
7 changes: 1 addition & 6 deletions dbm-services/mysql/db-priv/service/add_priv_for_sqlserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@ import (
"log/slog"
"strings"
"time"

"dbm-services/mysql/priv-service/util"

"github.com/spf13/viper"
)

// AddPriv 使用账号规则,新增权限
Expand Down Expand Up @@ -37,11 +33,10 @@ func (m *PrivTaskPara) AddPrivForSqlserver(jsonPara string) error {
rules = append(rules, accountRule)
}

client := util.NewClientByHosts(viper.GetString("dbmeta"))
for _, dns := range m.TargetInstances {
// 获取集群相关信息
dns = strings.Trim(strings.TrimSpace(dns), ".")
cluster, err := GetCluster(client, m.ClusterType, Domain{EntryName: dns})
cluster, err := GetCluster(m.ClusterType, Domain{EntryName: dns})
if err != nil {
return err
}
Expand Down
12 changes: 6 additions & 6 deletions dbm-services/mysql/db-priv/service/admin_password.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (m *ModifyAdminUserPasswordPara) ModifyAdminPassword() (BatchResult, error)
var passwordInput string
var errCheck error

limit := rate.Every(time.Millisecond * 200) // QPS:5
limit := rate.Every(time.Millisecond * 100) // QPS:10
burst := 10 // 桶容量 10
limiter := rate.NewLimiter(limit, burst)

Expand Down Expand Up @@ -377,16 +377,16 @@ func (m *ModifyAdminUserPasswordPara) ModifyAdminPassword() (BatchResult, error)
slog.Error("SM4Encrypt", "error", errOuter)
return batch, errOuter
}
err := limiter.Wait(context.Background())
if err != nil {
AddError(&errMsg, "get parallel resource", err)
continue
}
wg.Add(1)
go func(psw, encrypt string, cluster OneCluster) {
defer func() {
wg.Done()
}()
err := limiter.Wait(context.Background())
if err != nil {
AddError(&errMsg, "get parallel resource", err)
return
}
// 如果是sqlserver授权,走sqlserver授权通道
if m.Component == "sqlserver" {
m.ModifyAdminPasswordForSqlserver(
Expand Down
Loading

0 comments on commit 0807e08

Please sign in to comment.