Skip to content

Commit

Permalink
feat: opti compare range stats, close #44
Browse files Browse the repository at this point in the history
  • Loading branch information
wentaojin committed Aug 9, 2024
1 parent 1e82a7d commit b63a396
Show file tree
Hide file tree
Showing 13 changed files with 564 additions and 563 deletions.
15 changes: 8 additions & 7 deletions component/cli/migrate/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,14 @@ type CompareConfig struct {
}

type DataCompareRule struct {
TableNameS string `toml:"table-name-s" json:"tableNameS"`
CompareConditionField string `toml:"compare-condition-field" json:"compareConditionField"`
CompareConditionRange string `toml:"compare-condition-range" json:"compareConditionRange"`
IgnoreSelectFields []string `toml:"ignore-select-fields" json:"ignoreSelectFields"`
IgnoreConditionFields []string `toml:"ignore-condition-fields" json:"ignoreConditionFields"`
SqlHintS string `toml:"sql-hint-s" json:"sqlHintS"`
SqlHintT string `toml:"sql-hint-t" json:"sqlHintT"`
TableNameS string `toml:"table-name-s" json:"tableNameS"`
CompareConditionField string `toml:"compare-condition-field" json:"compareConditionField"`
CompareConditionRangeS string `toml:"compare-condition-range-s" json:"compareConditionRangeS"`
CompareConditionRangeT string `toml:"compare-condition-range-t" json:"compareConditionRangeT"`
IgnoreSelectFields []string `toml:"ignore-select-fields" json:"ignoreSelectFields"`
IgnoreConditionFields []string `toml:"ignore-condition-fields" json:"ignoreConditionFields"`
SqlHintS string `toml:"sql-hint-s" json:"sqlHintS"`
SqlHintT string `toml:"sql-hint-t" json:"sqlHintT"`
}

type DataCompareParam struct {
Expand Down
14 changes: 8 additions & 6 deletions database/data_compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type IDatabaseDataCompare interface {
// IDataCompareRuleInitializer used for database table rule initializer
type IDataCompareRuleInitializer interface {
GenSchemaTableCompareMethodRule() string
GenSchemaTableCustomRule() (string, string, []string, string, string, error)
GenSchemaTableCustomRule() (string, string, string, []string, string, string, error)
IDatabaseSchemaTableRule
}

Expand All @@ -51,15 +51,16 @@ type DataCompareAttributesRule struct {
ColumnDetailTO string `json:"columnDetailTO"`
CompareMethod string `json:"compareMethod"`
ColumnNameRouteRule map[string]string `json:"columnNameRouteRule"` // keep the column name upstream and downstream mapping, a -> b
CompareConditionFieldC string `json:"compareConditionFieldC"`
CompareConditionRangeC string `json:"compareConditionRangeC"`
CompareConditionFieldS string `json:"compareConditionFieldS"`
CompareConditionRangeS string `json:"compareConditionRangeS"`
CompareConditionRangeT string `json:"compareConditionRangeT"`
IgnoreConditionFields []string `json:"ignoreConditionFields"`
SqlHintS string `json:"sqlHintS"`
SqlHintT string `json:"sqlHintT"`
}

func IDataCompareAttributesRule(i IDataCompareRuleInitializer) (*DataCompareAttributesRule, error) {
columnFields, compareRange, ignoreConditionFields, sqlHintS, sqlHintT, err := i.GenSchemaTableCustomRule()
columnFields, compareRangeS, compareRangeT, ignoreConditionFields, sqlHintS, sqlHintT, err := i.GenSchemaTableCustomRule()
if err != nil {
return &DataCompareAttributesRule{}, err
}
Expand Down Expand Up @@ -90,8 +91,9 @@ func IDataCompareAttributesRule(i IDataCompareRuleInitializer) (*DataCompareAttr
ColumnDetailTO: targetColumnTO,
ColumnDetailT: targetColumnT,
CompareMethod: i.GenSchemaTableCompareMethodRule(),
CompareConditionFieldC: columnFields,
CompareConditionRangeC: compareRange,
CompareConditionFieldS: columnFields,
CompareConditionRangeS: compareRangeS,
CompareConditionRangeT: compareRangeT,
IgnoreConditionFields: ignoreConditionFields,
ColumnNameRouteRule: columnRouteRule,
SqlHintS: sqlHintS,
Expand Down
30 changes: 18 additions & 12 deletions database/processor/data_compare_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,32 +536,38 @@ func (r *DataCompareRule) GenSchemaTableCompareMethodRule() string {
return constant.DataCompareMethodDatabaseCheckCRC32
}

func (r *DataCompareRule) GenSchemaTableCustomRule() (string, string, []string, string, string, error) {
compareRule, err := model.GetIDataCompareRuleRW().GetDataCompareRule(r.Ctx, &rule.DataCompareRule{
func (r *DataCompareRule) GenSchemaTableCustomRule() (string, string, string, []string, string, string, error) {
cr, err := model.GetIDataCompareRuleRW().GetDataCompareRule(r.Ctx, &rule.DataCompareRule{
TaskName: r.TaskName, SchemaNameS: r.SchemaNameS, TableNameS: r.TableNameS})
if err != nil {
return "", "", nil, "", "", err
return "", "", "", nil, "", "", err
}
if !strings.EqualFold(compareRule.IgnoreSelectFields, "") {
r.IgnoreSelectFields = stringutil.StringSplit(compareRule.IgnoreSelectFields, constant.StringSeparatorComma)
if !strings.EqualFold(cr.IgnoreSelectFields, "") {
r.IgnoreSelectFields = stringutil.StringSplit(cr.IgnoreSelectFields, constant.StringSeparatorComma)
}
var ignoreColumnConditionFields []string
if !strings.EqualFold(compareRule.IgnoreConditionFields, "") {
ignoreColumnConditionFields = stringutil.StringSplit(compareRule.IgnoreConditionFields, constant.StringSeparatorComma)
if !strings.EqualFold(cr.IgnoreConditionFields, "") {
ignoreColumnConditionFields = stringutil.StringSplit(cr.IgnoreConditionFields, constant.StringSeparatorComma)
} else {
ignoreColumnConditionFields = r.GlobalIgnoreConditionFields
}

var sqlHintS, sqlHintT string
if strings.EqualFold(compareRule.SqlHintS, "") {
if strings.EqualFold(cr.SqlHintS, "") {
sqlHintS = r.GlobalSqlHintS
} else {
sqlHintS = compareRule.SqlHintS
sqlHintS = cr.SqlHintS
}
if strings.EqualFold(compareRule.SqlHintT, "") {
if strings.EqualFold(cr.SqlHintT, "") {
sqlHintT = r.GlobalSqlHintT
} else {
sqlHintT = compareRule.SqlHintT
sqlHintT = cr.SqlHintT
}
return compareRule.CompareConditionField, compareRule.CompareConditionRange, ignoreColumnConditionFields, sqlHintS, sqlHintT, nil

// replace compareConditionRangeT
if strings.EqualFold(cr.CompareConditionRangeT, "") {
cr.CompareConditionRangeT = cr.CompareConditionRangeS
}

return cr.CompareConditionField, cr.CompareConditionRangeS, cr.CompareConditionRangeT, ignoreColumnConditionFields, sqlHintS, sqlHintT, nil
}
94 changes: 31 additions & 63 deletions database/taskflow/data_compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,65 +679,8 @@ func (dmt *DataCompareTask) InitDataCompareTask(databaseS, databaseT database.ID
zap.String("task_flow", dmt.Task.TaskFlow),
zap.String("schema_name_s", attsRule.SchemaNameS),
zap.String("table_name_s", attsRule.TableNameS))
// optimizer
if !strings.EqualFold(attsRule.CompareConditionRangeC, "") {
encChunk := snappy.Encode(nil, []byte(attsRule.CompareConditionRangeC))
encryptChunk, err := stringutil.Encrypt(stringutil.BytesToString(encChunk), []byte(constant.DefaultDataEncryptDecryptKey))
if err != nil {
return err
}
err = model.Transaction(gCtx, func(txnCtx context.Context) error {
_, err = model.GetIDataCompareTaskRW().CreateDataCompareTask(txnCtx, &task.DataCompareTask{
TaskName: dmt.Task.TaskName,
SchemaNameS: attsRule.SchemaNameS,
TableNameS: attsRule.TableNameS,
SchemaNameT: attsRule.SchemaNameT,
TableNameT: attsRule.TableNameT,
TableTypeS: attsRule.TableTypeS,
SnapshotPointS: globalScnS,
SnapshotPointT: globalScnT,
CompareMethod: attsRule.CompareMethod,
ColumnDetailSO: attsRule.ColumnDetailSO,
ColumnDetailS: attsRule.ColumnDetailS,
ChunkID: uuid.New().String(),
ColumnDetailTO: attsRule.ColumnDetailTO,
ColumnDetailT: attsRule.ColumnDetailT,
SqlHintS: attsRule.SqlHintS,
SqlHintT: attsRule.SqlHintT,
ChunkDetailS: encryptChunk,
ChunkDetailArgS: "",
ChunkDetailT: encryptChunk,
ChunkDetailArgT: "",
ConsistentReadS: strconv.FormatBool(dmt.TaskParams.EnableConsistentRead),
TaskStatus: constant.TaskDatabaseStatusWaiting,
})
if err != nil {
return err
}
_, err = model.GetIDataCompareSummaryRW().CreateDataCompareSummary(txnCtx, &task.DataCompareSummary{
TaskName: dmt.Task.TaskName,
SchemaNameS: attsRule.SchemaNameS,
TableNameS: attsRule.TableNameS,
SchemaNameT: attsRule.SchemaNameT,
TableNameT: attsRule.TableNameT,
SnapshotPointS: globalScnS,
SnapshotPointT: globalScnT,
TableRowsS: tableRows,
TableSizeS: tableSize,
ChunkTotals: 1,
})
if err != nil {
return err
}
return nil
})
if err != nil {
return err
}
return nil
}

upstreamCons, err := databaseS.GetDatabaseTableHighestSelectivityIndex(attsRule.SchemaNameS, attsRule.TableNameS, attsRule.CompareConditionFieldC, attsRule.IgnoreConditionFields)
upstreamCons, err := databaseS.GetDatabaseTableHighestSelectivityIndex(attsRule.SchemaNameS, attsRule.TableNameS, attsRule.CompareConditionFieldS, attsRule.IgnoreConditionFields)
if err != nil {
return err
}
Expand Down Expand Up @@ -766,16 +709,32 @@ func (dmt *DataCompareTask) InitDataCompareTask(databaseS, databaseT database.ID
}

if len(upstreamBuckets) == 0 {
var encChunkS, encChunkT []byte
if !strings.EqualFold(attsRule.CompareConditionRangeS, "") {
encChunkS = snappy.Encode(nil, []byte(attsRule.CompareConditionRangeS))
} else {
encChunkS = snappy.Encode(nil, []byte("1 = 1"))
}
if !strings.EqualFold(attsRule.CompareConditionRangeT, "") {
encChunkT = snappy.Encode(nil, []byte(attsRule.CompareConditionRangeT))
} else {
encChunkT = snappy.Encode(nil, []byte("1 = 1"))
}

logger.Warn("data compare task init table chunk",
zap.String("task_name", dmt.Task.TaskName),
zap.String("task_mode", dmt.Task.TaskMode),
zap.String("task_flow", dmt.Task.TaskFlow),
zap.String("schema_name_s", attsRule.SchemaNameS),
zap.String("table_name_s", attsRule.TableNameS),
zap.Any("upstream bucket new", upstreamConsNew),
zap.Any("upstream bucket range", "1 = 1"))
encChunk := snappy.Encode(nil, []byte("1 = 1"))
encryptChunk, err := stringutil.Encrypt(stringutil.BytesToString(encChunk), []byte(constant.DefaultDataEncryptDecryptKey))
zap.Any("upstream bucket range", string(encChunkS)))

encryptChunkS, err := stringutil.Encrypt(stringutil.BytesToString(encChunkS), []byte(constant.DefaultDataEncryptDecryptKey))
if err != nil {
return err
}
encryptChunkT, err := stringutil.Encrypt(stringutil.BytesToString(encChunkT), []byte(constant.DefaultDataEncryptDecryptKey))
if err != nil {
return err
}
Expand All @@ -797,9 +756,9 @@ func (dmt *DataCompareTask) InitDataCompareTask(databaseS, databaseT database.ID
SqlHintS: attsRule.SqlHintS,
SqlHintT: attsRule.SqlHintT,
ChunkID: uuid.New().String(),
ChunkDetailS: encryptChunk,
ChunkDetailS: encryptChunkS,
ChunkDetailArgS: "",
ChunkDetailT: encryptChunk,
ChunkDetailT: encryptChunkT,
ChunkDetailArgT: "",
ConsistentReadS: strconv.FormatBool(dmt.TaskParams.EnableConsistentRead),
TaskStatus: constant.TaskDatabaseStatusWaiting,
Expand Down Expand Up @@ -827,6 +786,7 @@ func (dmt *DataCompareTask) InitDataCompareTask(databaseS, databaseT database.ID
if err != nil {
return err
}

return nil
}

Expand Down Expand Up @@ -885,6 +845,14 @@ func (dmt *DataCompareTask) InitDataCompareTask(databaseS, databaseT database.ID
for i, r := range upstreamBuckets {
toStringS, toStringArgsS := r.ToString()
toStringT, toStringArgsT := downstreamBuckets[i].ToString()

if !strings.EqualFold(attsRule.CompareConditionRangeS, "") {
toStringS = fmt.Sprintf("%s AND (%s)", toStringS, attsRule.CompareConditionRangeS)
}
if !strings.EqualFold(attsRule.CompareConditionRangeT, "") {
toStringT = fmt.Sprintf("%s AND (%s)", toStringT, attsRule.CompareConditionRangeT)
}

encChunkS := snappy.Encode(nil, []byte(toStringS))
encChunkT := snappy.Encode(nil, []byte(toStringT))

Expand Down
8 changes: 5 additions & 3 deletions example/data_compare_task.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ column-route-rules = {}
[[data-compare-rules]]
table-name-s = ""
compare-condition-field = ""
# 指定检查数据范围或者查询条件
# compare-condition-range 优先级高于 compare-condition-field
compare-condition-range = ""
# 指定检查数据范围或者查询条件 upstream
compare-condition-range-s = ""
# 指定检查数据范围或者查询条件 downstream
# 如果没有填写,则取 compare-condition-range-s 值,主要用于上下游字段名不一样
compare-condition-range-t = ""
# 忽略对比字段列
ignore-select-fields = []
# 禁止条件查询选择字段
Expand Down
8 changes: 5 additions & 3 deletions example/data_compare_test.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ column-route-rules = {}
[[data-compare-rules]]
table-name-s = ""
compare-condition-field = ""
# 指定检查数据范围或者查询条件
# compare-condition-range 优先级高于 compare-condition-field
compare-condition-range = ""
# 指定检查数据范围或者查询条件 upstream
compare-condition-range-s = ""
# 指定检查数据范围或者查询条件 downstream
# 如果没有填写,则取 compare-condition-range-s 值,主要用于上下游字段名不一样
compare-condition-range-t = ""
# 忽略对比字段列
ignore-select-fields = []
# 禁止条件查询选择字段
Expand Down
11 changes: 6 additions & 5 deletions master/openapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,11 +535,12 @@ func (s *Server) upsertDataCompareTask(ctx context.Context, req openapi.APIPutDa

for _, r := range *req.DataCompareRules {
compareRules = append(compareRules, &pb.DataCompareRule{
TableNameS: *r.TableNameS,
CompareConditionField: *r.CompareConditionField,
CompareConditionRange: *r.CompareConditionRange,
IgnoreConditionFields: *r.IgnoreConditionFields,
IgnoreSelectFields: *r.IgnoreSelectFields,
TableNameS: *r.TableNameS,
CompareConditionField: *r.CompareConditionField,
CompareConditionRangeS: *r.CompareConditionRangeS,
CompareConditionRangeT: *r.CompareConditionRangeT,
IgnoreConditionFields: *r.IgnoreConditionFields,
IgnoreSelectFields: *r.IgnoreSelectFields,
})
}

Expand Down
21 changes: 11 additions & 10 deletions model/rule/rule_entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,17 @@ type DataMigrateRule struct {
}

type DataCompareRule struct {
ID uint64 `gorm:"primary_key;autoIncrement;comment:id" json:"id"`
TaskName string `gorm:"type:varchar(300);not null;uniqueIndex:uniq_schema_table_name;comment:migrate task datasource name" json:"taskName"`
SchemaNameS string `gorm:"type:varchar(120);not null;uniqueIndex:uniq_schema_table_name;comment:source schema name" json:"schemaNameS"`
TableNameS string `gorm:"type:varchar(120);not null;uniqueIndex:uniq_schema_table_name;comment:source table name" json:"tableNameS"`
CompareConditionField string `gorm:"type:varchar(120);comment:compare filed" json:"compareConditionField"`
CompareConditionRange string `gorm:"type:varchar(120);comment:source sql query where" json:"compareConditionRange"`
IgnoreSelectFields string `gorm:"type:text;comment:ignore select filed" json:"ignoreSelectFields"`
IgnoreConditionFields string `gorm:"type:text;comment:ignore condition filed" json:"ignoreConditionFields"`
SqlHintS string `gorm:"type:varchar(120);comment:source sql query hint" json:"sqlHintS"`
SqlHintT string `gorm:"type:varchar(120);comment:target sql query hint" json:"sqlHintT"`
ID uint64 `gorm:"primary_key;autoIncrement;comment:id" json:"id"`
TaskName string `gorm:"type:varchar(300);not null;uniqueIndex:uniq_schema_table_name;comment:migrate task datasource name" json:"taskName"`
SchemaNameS string `gorm:"type:varchar(120);not null;uniqueIndex:uniq_schema_table_name;comment:source schema name" json:"schemaNameS"`
TableNameS string `gorm:"type:varchar(120);not null;uniqueIndex:uniq_schema_table_name;comment:source table name" json:"tableNameS"`
CompareConditionField string `gorm:"type:varchar(120);comment:compare filed" json:"compareConditionField"`
CompareConditionRangeS string `gorm:"type:varchar(120);comment:source sql query where" json:"compareConditionRangeS"`
CompareConditionRangeT string `gorm:"type:varchar(120);comment:target sql query where" json:"compareConditionRangeT"`
IgnoreSelectFields string `gorm:"type:text;comment:ignore select filed" json:"ignoreSelectFields"`
IgnoreConditionFields string `gorm:"type:text;comment:ignore condition filed" json:"ignoreConditionFields"`
SqlHintS string `gorm:"type:varchar(120);comment:source sql query hint" json:"sqlHintS"`
SqlHintT string `gorm:"type:varchar(120);comment:target sql query hint" json:"sqlHintT"`
*common.Entity
}

Expand Down
Loading

0 comments on commit b63a396

Please sign in to comment.