Skip to content

Commit

Permalink
fix: rebalance by shardingkey, checkcounts incorrect
Browse files Browse the repository at this point in the history
  • Loading branch information
YenchangChan committed Mar 11, 2024
1 parent 7ba18e7 commit f0df3a0
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 55 deletions.
4 changes: 2 additions & 2 deletions service/clickhouse/clickhouse_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1744,7 +1744,7 @@ func getShardingType(key *model.RebalanceShardingkey, conn *common.Conn) error {

func RebalanceByPartition(conf *model.CKManClickHouseConfig, rebalancer *CKRebalance) error {
var err error
if err = rebalancer.InitCKConns(); err != nil {
if err = rebalancer.InitCKConns(false); err != nil {
log.Logger.Errorf("got error %+v", err)
return err
}
Expand All @@ -1767,7 +1767,7 @@ func RebalanceByShardingkey(conf *model.CKManClickHouseConfig, rebalancer *CKReb
var err error
start := time.Now()
log.Logger.Info("[rebalance] STEP InitCKConns")
if err = rebalancer.InitCKConns(); err != nil {
if err = rebalancer.InitCKConns(true); err != nil {
log.Logger.Errorf("got error %+v", err)
return err
}
Expand Down
128 changes: 75 additions & 53 deletions service/clickhouse/rebalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type CKRebalance struct {
Engine string
EngineFull string
OriCount uint64
SortingKey []string
}

// TblPartitions is partitions status of a host. A host never move out and move in at the same iteration.
Expand All @@ -51,7 +52,7 @@ type TblPartitions struct {
ToMoveIn bool // plan to move some partitions in
}

func (r *CKRebalance) InitCKConns() (err error) {
func (r *CKRebalance) InitCKConns(withShardingkey bool) (err error) {
locks = make(map[string]*sync.Mutex)
for _, host := range r.Hosts {
_, err = common.ConnectClickHouse(host, model.ClickHouseDefaultDB, r.ConnOpt)
Expand All @@ -62,32 +63,56 @@ func (r *CKRebalance) InitCKConns() (err error) {
locks[host] = &sync.Mutex{}
}

conn := common.GetConnection(r.Hosts[0])
query := fmt.Sprintf("SELECT engine, engine_full FROM system.tables WHERE database = '%s' AND table = '%s'", r.Database, r.Table)
log.Logger.Debugf("query:%s", query)
rows, _ := conn.Query(query)
for rows.Next() {
err = rows.Scan(&r.Engine, &r.EngineFull)
if err != nil {
return
if withShardingkey {
conn := common.GetConnection(r.Hosts[0])
// get engine
query := fmt.Sprintf("SELECT engine, engine_full FROM system.tables WHERE database = '%s' AND table = '%s'", r.Database, r.Table)
log.Logger.Debugf("query:%s", query)
rows, _ := conn.Query(query)
for rows.Next() {
err = rows.Scan(&r.Engine, &r.EngineFull)
if err != nil {
return
}
}
}
rows.Close()
log.Logger.Infof("table: %s.%s, engine: %s, engine_full:%s", r.Database, r.Table, r.Engine, r.EngineFull)
query = fmt.Sprintf("SELECT count() FROM cluster('%s', '%s.%s')", r.Cluster, r.Database, r.Table)
if strings.Contains(r.Engine, "Replacing") {
query += " FINAL"
}
log.Logger.Debugf("query: %s", query)
rows, _ = conn.Query(query)
for rows.Next() {
err = rows.Scan(&r.OriCount)
if err != nil {
return
rows.Close()
log.Logger.Infof("table: %s.%s, engine: %s, engine_full:%s", r.Database, r.Table, r.Engine, r.EngineFull)

//get sortingkey
if strings.Contains(r.Engine, "Replacing") {
query = fmt.Sprintf("SELECT name FROM system.columns WHERE (database = '%s') AND (table = '%s') AND (is_in_sorting_key = 1)", r.Database, r.Table)
log.Logger.Debugf("query:%s", query)
rows, _ := conn.Query(query)
for rows.Next() {
var sortingkey string
err = rows.Scan(&sortingkey)
if err != nil {
return
}
r.SortingKey = append(r.SortingKey, sortingkey)
}
rows.Close()
log.Logger.Infof("table: %s.%s, sortingkey:%s", r.Database, r.Table, r.SortingKey)

}

//get original count
if strings.Contains(r.Engine, "Replacing") {
query = fmt.Sprintf("SELECT count() FROM (SELECT DISTINCT %s FROM cluster('%s', '%s.%s') FINAL)", strings.Join(r.SortingKey, ","), r.Cluster, r.Database, r.Table)
} else {
query = fmt.Sprintf("SELECT count() FROM cluster('%s', '%s.%s')", r.Cluster, r.Database, r.Table)
}
log.Logger.Debugf("query: %s", query)
rows, _ = conn.Query(query)
for rows.Next() {
err = rows.Scan(&r.OriCount)
if err != nil {
return
}
}
log.Logger.Infof("table: %s.%s, count: %d", r.Database, r.Table, r.OriCount)
rows.Close()
}
log.Logger.Infof("table: %s.%s, count: %d", r.Database, r.Table, r.OriCount)
rows.Close()
return
}

Expand Down Expand Up @@ -399,9 +424,11 @@ func (r *CKRebalance) CreateTemporaryTable() error {
}

func (r *CKRebalance) CheckCounts(tableName string) error {
query := fmt.Sprintf("SELECT count() FROM cluster('%s', '%s.%s')", r.Cluster, r.Database, tableName)
var query string
if strings.Contains(r.Engine, "Replacing") {
query += " FINAL"
query = fmt.Sprintf("SELECT count() FROM (SELECT DISTINCT %s FROM cluster('%s', '%s.%s') FINAL)", strings.Join(r.SortingKey, ","), r.Cluster, r.Database, r.Table)
} else {
query = fmt.Sprintf("SELECT count() FROM cluster('%s', '%s.%s')", r.Cluster, r.Database, r.Table)
}
log.Logger.Debugf("query: %s", query)
conn := common.GetConnection(r.Hosts[0])
Expand All @@ -425,6 +452,7 @@ func (r *CKRebalance) CheckCounts(tableName string) error {
return nil
}

// moveback from tmp_table to ori_table after rehash
func (r *CKRebalance) InsertPlan() error {
var lastError error
var wg sync.WaitGroup
Expand All @@ -441,45 +469,42 @@ func (r *CKRebalance) InsertPlan() error {
lastError = errors.Wrap(err, host)
return
}
//手动触发merge,使part稳定
query = fmt.Sprintf("OPTIMIZE TABLE `%s`.`%s`", r.Database, r.TmpTable)
_ = conn.Exec(query)
query = fmt.Sprintf(`SELECT distinct name FROM cluster('%s', 'system.parts') WHERE database = '%s' AND table = '%s' AND active=1`, r.Cluster, r.Database, r.TmpTable)
query = fmt.Sprintf(`SELECT distinct partition_id FROM cluster('%s', 'system.parts') WHERE database = '%s' AND table = '%s' AND active=1 ORDER BY partition_id`, r.Cluster, r.Database, r.TmpTable)
log.Logger.Debugf("[%s]%s", host, query)
rows, err := conn.Query(query)
if err != nil {
lastError = errors.Wrap(err, host)
return
}
parts := make([]string, 0)
partitions := make([]string, 0)
for rows.Next() {
var name string
err = rows.Scan(&name)
var partitionId string
err = rows.Scan(&partitionId)
if err != nil {
lastError = errors.Wrap(err, host)
return
}
parts = append(parts, name)
partitions = append(partitions, partitionId)
}
rows.Close()
log.Logger.Debugf("host:[%s], parts: %v", host, parts)
log.Logger.Debugf("host:[%s], parts: %v", host, partitions)

for i, part := range parts {
query = fmt.Sprintf("INSERT INTO `%s`.`%s` SELECT * FROM cluster('%s', '%s.%s') WHERE _part = '%s' AND %s %% %d = %d SETTINGS insert_deduplicate=false,max_execution_time=0,max_insert_threads=8",
r.Database, r.Table, r.Cluster, r.Database, r.TmpTable, part, ShardingFunc(r.Shardingkey), len(r.Hosts), idx)
log.Logger.Debugf("[%s](%d/%d) %s", host, i+1, len(parts), query)
for i, partition := range partitions {
query = fmt.Sprintf("INSERT INTO `%s`.`%s` SELECT * FROM cluster('%s', '%s.%s') WHERE _partition_id = '%s' AND %s %% %d = %d SETTINGS insert_deduplicate=false,max_execution_time=0,max_insert_threads=8",
r.Database, r.Table, r.Cluster, r.Database, r.TmpTable, partition, ShardingFunc(r.Shardingkey), len(r.Hosts), idx)
log.Logger.Debugf("[%s](%d/%d) %s", host, i+1, len(partitions), query)
if err = conn.Exec(query); err != nil {
lastError = errors.Wrap(err, host)
return
}
}

})
}
wg.Wait()
return lastError
}

// backup from ori_table to tmp_table
func (r *CKRebalance) MoveBackup() error {
var wg sync.WaitGroup
var lastError error
Expand All @@ -489,33 +514,30 @@ func (r *CKRebalance) MoveBackup() error {
_ = common.Pool.Submit(func() {
defer wg.Done()
conn := common.GetConnection(host)
// 手动触发merge,使part稳定
query := fmt.Sprintf("OPTIMIZE TABLE `%s`.`%s`", r.Database, r.Table)
_ = conn.Exec(query)
query = fmt.Sprintf(`SELECT distinct name FROM system.parts WHERE database = '%s' AND table = '%s' AND active=1`, r.Database, r.Table)
query := fmt.Sprintf(`SELECT distinct partition_id FROM system.parts WHERE database = '%s' AND table = '%s' AND active=1 order by partition_id`, r.Database, r.Table)
log.Logger.Debugf("[%s]%s", host, query)
rows, err := conn.Query(query)
if err != nil {
lastError = errors.Wrap(err, host)
return
}
parts := make([]string, 0)
partitions := make([]string, 0)
for rows.Next() {
var name string
err = rows.Scan(&name)
var partitionId string
err = rows.Scan(&partitionId)
if err != nil {
lastError = errors.Wrap(err, host)
return
}
parts = append(parts, name)
partitions = append(partitions, partitionId)
}
rows.Close()
log.Logger.Debugf("host:[%s], parts: %v", host, parts)
log.Logger.Debugf("host:[%s], partitions: %v", host, partitions)

for idx, part := range parts {
query = fmt.Sprintf("INSERT INTO `%s`.`%s` SELECT * FROM `%s`.`%s` WHERE _part = '%s' SETTINGS insert_deduplicate=false,max_execution_time=0,max_insert_threads=8",
r.Database, r.TmpTable, r.Database, r.Table, part)
log.Logger.Debugf("[%s](%d/%d) %s", host, idx+1, len(parts), query)
for idx, partition := range partitions {
query = fmt.Sprintf("INSERT INTO `%s`.`%s` SELECT * FROM `%s`.`%s` WHERE _partition_id = '%s' SETTINGS insert_deduplicate=false,max_execution_time=0,max_insert_threads=8",
r.Database, r.TmpTable, r.Database, r.Table, partition)
log.Logger.Debugf("[%s](%d/%d) %s", host, idx+1, len(partitions), query)
if err = conn.Exec(query); err != nil {
lastError = errors.Wrap(err, host)
return
Expand Down

0 comments on commit f0df3a0

Please sign in to comment.