diff --git a/common/ck.go b/common/ck.go index 4abb1495..4771a124 100644 --- a/common/ck.go +++ b/common/ck.go @@ -56,9 +56,6 @@ func ConnectClickHouse(host string, database string, opt model.ConnetOption) (*C Username: opt.User, Password: opt.Password, }, - Settings: clickhouse.Settings{ - "max_execution_time": 60, - }, Protocol: opt.Protocol, DialTimeout: time.Duration(10) * time.Second, ConnOpenStrategy: clickhouse.ConnOpenInOrder, diff --git a/service/clickhouse/clickhouse_service.go b/service/clickhouse/clickhouse_service.go index 32909697..5c771c42 100644 --- a/service/clickhouse/clickhouse_service.go +++ b/service/clickhouse/clickhouse_service.go @@ -1780,19 +1780,25 @@ func RebalanceByShardingkey(conf *model.CKManClickHouseConfig, rebalancer *CKReb return err } if err = rebalancer.CheckCounts(rebalancer.TmpTable); err != nil { - return err + time.Sleep(5 * time.Second) + if err = rebalancer.CheckCounts(rebalancer.TmpTable); err != nil { + return err + } } log.Logger.Info("[rebalance] STEP InsertPlan") if err = rebalancer.InsertPlan(); err != nil { return errors.Wrapf(err, "table %s.%s rebalance failed, data can be corrupted, please move back from temp table[%s] manually", rebalancer.Database, rebalancer.Table, rebalancer.TmpTable) } if err = rebalancer.CheckCounts(rebalancer.Table); err != nil { - return err + time.Sleep(5 * time.Second) + if err = rebalancer.CheckCounts(rebalancer.Table); err != nil { + return err + } } log.Logger.Info("[rebalance] STEP Cleanup") rebalancer.Cleanup() - log.Logger.Infof("[rebalance] DONE, Elapsed: %v sec", time.Since(start).Seconds()) + log.Logger.Infof("[rebalance] DONE, Total counts: %d, Elapsed: %v sec", rebalancer.OriCount, time.Since(start).Seconds()) return nil } diff --git a/service/clickhouse/rebalance.go b/service/clickhouse/rebalance.go index b6918629..38b23c02 100644 --- a/service/clickhouse/rebalance.go +++ b/service/clickhouse/rebalance.go @@ -3,8 +3,6 @@ package clickhouse import ( "fmt" "path/filepath" - "regexp" - "runtime" "sort" "strings" "sync" @@ -12,7 +10,6 @@ import ( "github.com/housepower/ckman/common" "github.com/housepower/ckman/log" "github.com/housepower/ckman/model" - "github.com/housepower/ckman/repository" "github.com/k0kubun/pp" "github.com/pkg/errors" ) @@ -429,7 +426,6 @@ func (r *CKRebalance) CheckCounts(tableName string) error { } func (r *CKRebalance) InsertPlan() error { - max_insert_threads := runtime.NumCPU()*3/4 + 1 // add 1 to ensure threads not zero var lastError error var wg sync.WaitGroup for idx, host := range r.Hosts { @@ -445,14 +441,38 @@ func (r *CKRebalance) InsertPlan() error { lastError = errors.Wrap(err, host) return } - - query = fmt.Sprintf("INSERT INTO `%s`.`%s` SELECT * FROM cluster('%s', '%s.%s') WHERE %s %% %d = %d SETTINGS max_insert_threads=%d, max_execution_time=0", - r.Database, r.Table, r.Cluster, r.Database, r.TmpTable, ShardingFunc(r.Shardingkey), len(r.Hosts), idx, max_insert_threads) + //手动触发merge,使part稳定 + query = fmt.Sprintf("OPTIMIZE TABLE `%s`.`%s`", r.Database, r.TmpTable) + _ = conn.Exec(query) + query = fmt.Sprintf(`SELECT distinct name FROM cluster('%s', 'system.parts') WHERE database = '%s' AND table = '%s' AND active=1`, r.Cluster, r.Database, r.TmpTable) log.Logger.Debugf("[%s]%s", host, query) - if err := conn.Exec(query); err != nil { + rows, err := conn.Query(query) + if err != nil { lastError = errors.Wrap(err, host) return } + parts := make([]string, 0) + for rows.Next() { + var name string + err = rows.Scan(&name) + if err != nil { + lastError = errors.Wrap(err, host) + return + } + parts = append(parts, name) + } + rows.Close() + log.Logger.Debugf("host:[%s], parts: %v", host, parts) + + for i, part := range parts { + query = fmt.Sprintf("INSERT INTO `%s`.`%s` SELECT * FROM cluster('%s', '%s.%s') WHERE _part = '%s' AND %s %% %d = %d SETTINGS insert_deduplicate=false,max_execution_time=0,max_insert_threads=8", + r.Database, r.Table, r.Cluster, r.Database, r.TmpTable, part, ShardingFunc(r.Shardingkey), len(r.Hosts), idx) + log.Logger.Debugf("[%s](%d/%d) %s", host, i+1, len(parts), query) + if err = conn.Exec(query); err != nil { + lastError = errors.Wrap(err, host) + return + } + } }) } @@ -461,10 +481,6 @@ func (r *CKRebalance) InsertPlan() error { } func (r *CKRebalance) MoveBackup() error { - conf, err := repository.Ps.GetClusterbyName(r.Cluster) - if err != nil { - return err - } var wg sync.WaitGroup var lastError error for _, host := range r.Hosts { @@ -473,67 +489,36 @@ func (r *CKRebalance) MoveBackup() error { _ = common.Pool.Submit(func() { defer wg.Done() conn := common.GetConnection(host) - // copy data - cmd := fmt.Sprintf("ls -l %sclickhouse/data/%s/%s/ |grep -v total |awk '{print $9}'", r.DataDir, r.Database, r.Table) - sshOpts := common.SshOptions{ - User: conf.SshUser, - Password: conf.SshPassword, - Port: conf.SshPort, - Host: host, - NeedSudo: conf.NeedSudo, - AuthenticateType: conf.AuthenticateType, - } - out, err := common.RemoteExecute(sshOpts, cmd) + // 手动触发merge,使part稳定 + query := fmt.Sprintf("OPTIMIZE TABLE `%s`.`%s`", r.Database, r.Table) + _ = conn.Exec(query) + query = fmt.Sprintf(`SELECT distinct name FROM system.parts WHERE database = '%s' AND table = '%s' AND active=1`, r.Database, r.Table) + log.Logger.Debugf("[%s]%s", host, query) + rows, err := conn.Query(query) if err != nil { lastError = errors.Wrap(err, host) return } parts := make([]string, 0) - for _, file := range strings.Split(out, "\n") { - file = strings.TrimSpace(strings.TrimSuffix(file, "\r")) - reg, err := regexp.Compile(`[^_]+(_\d+){3,}$`) //parts name + for rows.Next() { + var name string + err = rows.Scan(&name) if err != nil { lastError = errors.Wrap(err, host) return } - if reg.MatchString(file) && !strings.HasPrefix(file, "tmp_merge") { - parts = append(parts, file) - } + parts = append(parts, name) } + rows.Close() log.Logger.Debugf("host:[%s], parts: %v", host, parts) - var cmds []string - for _, part := range parts { - cmds = append(cmds, fmt.Sprintf("cp -prf %sclickhouse/data/%s/%s/%s %sclickhouse/data/%s/%s/detached/", r.DataDir, r.Database, r.Table, part, r.DataDir, r.Database, r.TmpTable)) - } - if len(cmds) > 0 { - log.Logger.Debugf("host:[%s], cmds: %v", host, cmds) - _, err = common.RemoteExecute(sshOpts, strings.Join(cmds, ";")) - if err != nil { - lastError = errors.Wrap(err, host) - return - } - } - var failedParts []string - for _, part := range parts { - query := fmt.Sprintf("ALTER TABLE `%s`.`%s` ATTACH PART '%s' settings mutations_sync=1", r.Database, r.TmpTable, part) - log.Logger.Debugf("[%s]%s", host, query) + for idx, part := range parts { + query = fmt.Sprintf("INSERT INTO `%s`.`%s` SELECT * FROM `%s`.`%s` WHERE _part = '%s' SETTINGS insert_deduplicate=false,max_execution_time=0,max_insert_threads=8", + r.Database, r.TmpTable, r.Database, r.Table, part) + log.Logger.Debugf("[%s](%d/%d) %s", host, idx+1, len(parts), query) if err = conn.Exec(query); err != nil { - failedParts = append(failedParts, part) - continue - } - } - - if len(failedParts) > 0 { - max_insert_threads := runtime.NumCPU()*3/4 + 1 - log.Logger.Infof("[%s]failed parts: %v, retry again", host, failedParts) - for _, part := range failedParts { - query := fmt.Sprintf("INSERT INTO `%s`.`%s` SELECT * FROM `%s`.`%s` WHERE _part = '%s' SETTINGS max_insert_threads=%d, max_execution_time=0", r.Database, r.TmpTable, r.Database, r.Table, part, max_insert_threads) - log.Logger.Debugf("[%s]%s", host, query) - if err = conn.Exec(query); err != nil { - lastError = errors.Wrap(err, host) - return - } + lastError = errors.Wrap(err, host) + return } } })