Skip to content

Commit

Permalink
fix: rebalance with multi disks
Browse files Browse the repository at this point in the history
  • Loading branch information
YenchangChan committed Mar 8, 2024
1 parent d9a1311 commit 7ba18e7
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 66 deletions.
3 changes: 0 additions & 3 deletions common/ck.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,6 @@ func ConnectClickHouse(host string, database string, opt model.ConnetOption) (*C
Username: opt.User,
Password: opt.Password,
},
Settings: clickhouse.Settings{
"max_execution_time": 60,
},
Protocol: opt.Protocol,
DialTimeout: time.Duration(10) * time.Second,
ConnOpenStrategy: clickhouse.ConnOpenInOrder,
Expand Down
12 changes: 9 additions & 3 deletions service/clickhouse/clickhouse_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1780,19 +1780,25 @@ func RebalanceByShardingkey(conf *model.CKManClickHouseConfig, rebalancer *CKReb
return err
}
if err = rebalancer.CheckCounts(rebalancer.TmpTable); err != nil {
return err
time.Sleep(5 * time.Second)
if err = rebalancer.CheckCounts(rebalancer.TmpTable); err != nil {
return err
}
}
log.Logger.Info("[rebalance] STEP InsertPlan")
if err = rebalancer.InsertPlan(); err != nil {
return errors.Wrapf(err, "table %s.%s rebalance failed, data can be corrupted, please move back from temp table[%s] manually", rebalancer.Database, rebalancer.Table, rebalancer.TmpTable)
}
if err = rebalancer.CheckCounts(rebalancer.Table); err != nil {
return err
time.Sleep(5 * time.Second)
if err = rebalancer.CheckCounts(rebalancer.Table); err != nil {
return err
}
}
log.Logger.Info("[rebalance] STEP Cleanup")
rebalancer.Cleanup()

log.Logger.Infof("[rebalance] DONE, Elapsed: %v sec", time.Since(start).Seconds())
log.Logger.Infof("[rebalance] DONE, Total counts: %d, Elapsed: %v sec", rebalancer.OriCount, time.Since(start).Seconds())
return nil
}

Expand Down
105 changes: 45 additions & 60 deletions service/clickhouse/rebalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,13 @@ package clickhouse
import (
"fmt"
"path/filepath"
"regexp"
"runtime"
"sort"
"strings"
"sync"

"github.com/housepower/ckman/common"
"github.com/housepower/ckman/log"
"github.com/housepower/ckman/model"
"github.com/housepower/ckman/repository"
"github.com/k0kubun/pp"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -429,7 +426,6 @@ func (r *CKRebalance) CheckCounts(tableName string) error {
}

func (r *CKRebalance) InsertPlan() error {
max_insert_threads := runtime.NumCPU()*3/4 + 1 // add 1 to ensure threads not zero
var lastError error
var wg sync.WaitGroup
for idx, host := range r.Hosts {
Expand All @@ -445,14 +441,38 @@ func (r *CKRebalance) InsertPlan() error {
lastError = errors.Wrap(err, host)
return
}

query = fmt.Sprintf("INSERT INTO `%s`.`%s` SELECT * FROM cluster('%s', '%s.%s') WHERE %s %% %d = %d SETTINGS max_insert_threads=%d, max_execution_time=0",
r.Database, r.Table, r.Cluster, r.Database, r.TmpTable, ShardingFunc(r.Shardingkey), len(r.Hosts), idx, max_insert_threads)
//手动触发merge,使part稳定
query = fmt.Sprintf("OPTIMIZE TABLE `%s`.`%s`", r.Database, r.TmpTable)
_ = conn.Exec(query)
query = fmt.Sprintf(`SELECT distinct name FROM cluster('%s', 'system.parts') WHERE database = '%s' AND table = '%s' AND active=1`, r.Cluster, r.Database, r.TmpTable)
log.Logger.Debugf("[%s]%s", host, query)
if err := conn.Exec(query); err != nil {
rows, err := conn.Query(query)
if err != nil {
lastError = errors.Wrap(err, host)
return
}
parts := make([]string, 0)
for rows.Next() {
var name string
err = rows.Scan(&name)
if err != nil {
lastError = errors.Wrap(err, host)
return
}
parts = append(parts, name)
}
rows.Close()
log.Logger.Debugf("host:[%s], parts: %v", host, parts)

for i, part := range parts {
query = fmt.Sprintf("INSERT INTO `%s`.`%s` SELECT * FROM cluster('%s', '%s.%s') WHERE _part = '%s' AND %s %% %d = %d SETTINGS insert_deduplicate=false,max_execution_time=0,max_insert_threads=8",
r.Database, r.Table, r.Cluster, r.Database, r.TmpTable, part, ShardingFunc(r.Shardingkey), len(r.Hosts), idx)
log.Logger.Debugf("[%s](%d/%d) %s", host, i+1, len(parts), query)
if err = conn.Exec(query); err != nil {
lastError = errors.Wrap(err, host)
return
}
}

})
}
Expand All @@ -461,10 +481,6 @@ func (r *CKRebalance) InsertPlan() error {
}

func (r *CKRebalance) MoveBackup() error {
conf, err := repository.Ps.GetClusterbyName(r.Cluster)
if err != nil {
return err
}
var wg sync.WaitGroup
var lastError error
for _, host := range r.Hosts {
Expand All @@ -473,67 +489,36 @@ func (r *CKRebalance) MoveBackup() error {
_ = common.Pool.Submit(func() {
defer wg.Done()
conn := common.GetConnection(host)
// copy data
cmd := fmt.Sprintf("ls -l %sclickhouse/data/%s/%s/ |grep -v total |awk '{print $9}'", r.DataDir, r.Database, r.Table)
sshOpts := common.SshOptions{
User: conf.SshUser,
Password: conf.SshPassword,
Port: conf.SshPort,
Host: host,
NeedSudo: conf.NeedSudo,
AuthenticateType: conf.AuthenticateType,
}
out, err := common.RemoteExecute(sshOpts, cmd)
// 手动触发merge,使part稳定
query := fmt.Sprintf("OPTIMIZE TABLE `%s`.`%s`", r.Database, r.Table)
_ = conn.Exec(query)
query = fmt.Sprintf(`SELECT distinct name FROM system.parts WHERE database = '%s' AND table = '%s' AND active=1`, r.Database, r.Table)
log.Logger.Debugf("[%s]%s", host, query)
rows, err := conn.Query(query)
if err != nil {
lastError = errors.Wrap(err, host)
return
}
parts := make([]string, 0)
for _, file := range strings.Split(out, "\n") {
file = strings.TrimSpace(strings.TrimSuffix(file, "\r"))
reg, err := regexp.Compile(`[^_]+(_\d+){3,}$`) //parts name
for rows.Next() {
var name string
err = rows.Scan(&name)
if err != nil {
lastError = errors.Wrap(err, host)
return
}
if reg.MatchString(file) && !strings.HasPrefix(file, "tmp_merge") {
parts = append(parts, file)
}
parts = append(parts, name)
}
rows.Close()
log.Logger.Debugf("host:[%s], parts: %v", host, parts)
var cmds []string
for _, part := range parts {
cmds = append(cmds, fmt.Sprintf("cp -prf %sclickhouse/data/%s/%s/%s %sclickhouse/data/%s/%s/detached/", r.DataDir, r.Database, r.Table, part, r.DataDir, r.Database, r.TmpTable))
}
if len(cmds) > 0 {
log.Logger.Debugf("host:[%s], cmds: %v", host, cmds)
_, err = common.RemoteExecute(sshOpts, strings.Join(cmds, ";"))
if err != nil {
lastError = errors.Wrap(err, host)
return
}
}

var failedParts []string
for _, part := range parts {
query := fmt.Sprintf("ALTER TABLE `%s`.`%s` ATTACH PART '%s' settings mutations_sync=1", r.Database, r.TmpTable, part)
log.Logger.Debugf("[%s]%s", host, query)
for idx, part := range parts {
query = fmt.Sprintf("INSERT INTO `%s`.`%s` SELECT * FROM `%s`.`%s` WHERE _part = '%s' SETTINGS insert_deduplicate=false,max_execution_time=0,max_insert_threads=8",
r.Database, r.TmpTable, r.Database, r.Table, part)
log.Logger.Debugf("[%s](%d/%d) %s", host, idx+1, len(parts), query)
if err = conn.Exec(query); err != nil {
failedParts = append(failedParts, part)
continue
}
}

if len(failedParts) > 0 {
max_insert_threads := runtime.NumCPU()*3/4 + 1
log.Logger.Infof("[%s]failed parts: %v, retry again", host, failedParts)
for _, part := range failedParts {
query := fmt.Sprintf("INSERT INTO `%s`.`%s` SELECT * FROM `%s`.`%s` WHERE _part = '%s' SETTINGS max_insert_threads=%d, max_execution_time=0", r.Database, r.TmpTable, r.Database, r.Table, part, max_insert_threads)
log.Logger.Debugf("[%s]%s", host, query)
if err = conn.Exec(query); err != nil {
lastError = errors.Wrap(err, host)
return
}
lastError = errors.Wrap(err, host)
return
}
}
})
Expand Down

0 comments on commit 7ba18e7

Please sign in to comment.