From 258638a23d2489362835edd852b06f54f96d5d22 Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Wed, 20 Nov 2024 11:37:53 +0100 Subject: [PATCH 01/23] Fixed partitioning a non-partitioned table with placement rules --- pkg/ddl/partition.go | 11 ++++++--- pkg/ddl/placement_policy_test.go | 12 ++++++++++ pkg/domain/infosync/placement_manager.go | 30 ++++++++++++++++++++++++ 3 files changed, 50 insertions(+), 3 deletions(-) diff --git a/pkg/ddl/partition.go b/pkg/ddl/partition.go index f0f210e44d659..c813f00df33a2 100644 --- a/pkg/ddl/partition.go +++ b/pkg/ddl/partition.go @@ -276,8 +276,13 @@ func alterTablePartitionBundles(t *meta.Mutator, tblInfo *model.TableInfo, addin // tblInfo do not include added partitions, so we should add them first tblInfo = tblInfo.Clone() p := *tblInfo.Partition - p.Definitions = append([]model.PartitionDefinition{}, p.Definitions...) - p.Definitions = append(tblInfo.Partition.Definitions, addingDefinitions...) + if p.DDLAction == model.ActionAlterTablePartitioning && p.Type == pmodel.PartitionTypeNone { + // skip adding the original table as partition + p.Definitions = []model.PartitionDefinition{} + } else { + p.Definitions = append([]model.PartitionDefinition{}, p.Definitions...) + } + p.Definitions = append(p.Definitions, addingDefinitions...) tblInfo.Partition = &p // bundle for table should be recomputed because it includes some default configs for partitions @@ -3180,6 +3185,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver // The partInfo may have been checked against an older schema version for example. // If the check is done here, it does not need to be repeated, since no other // DDL on the same table can be run concurrently. + tblInfo.Partition.DDLAction = job.Type num := len(partInfo.Definitions) - len(partNames) + len(tblInfo.Partition.Definitions) err = checkAddPartitionTooManyPartitions(uint64(num)) if err != nil { @@ -3357,7 +3363,6 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String()).Set(0.1 / float64(math.MaxUint64)) job.SchemaState = model.StateDeleteOnly tblInfo.Partition.DDLState = job.SchemaState - tblInfo.Partition.DDLAction = job.Type ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true) if err != nil { return ver, errors.Trace(err) diff --git a/pkg/ddl/placement_policy_test.go b/pkg/ddl/placement_policy_test.go index 20ed4212aa532..187750ff0fa1a 100644 --- a/pkg/ddl/placement_policy_test.go +++ b/pkg/ddl/placement_policy_test.go @@ -2401,3 +2401,15 @@ func TestRecoverTableWithPlacementPolicy(t *testing.T) { " PARTITION `p2` VALUES LESS THAN (10000))")) checkExistTableBundlesInPD(t, dom, "test", "tp3") } + +func TestPartitionByWithLabels(t *testing.T) { + store, _ := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + tk.MustExec("create placement policy pp1 primary_region='r1' regions='r1,r2'") + + tk.MustExec(`CREATE TABLE t1 (id INT)`) + tk.MustExec(`ALTER TABLE t1 placement policy pp1`) + tk.MustExec(`ALTER TABLE t1 PARTITION BY HASH (id) PARTITIONS 3`) +} diff --git a/pkg/domain/infosync/placement_manager.go b/pkg/domain/infosync/placement_manager.go index 2d9de6a24d3e4..e740257c04ab4 100644 --- a/pkg/domain/infosync/placement_manager.go +++ b/pkg/domain/infosync/placement_manager.go @@ -16,6 +16,9 @@ package infosync import ( "context" + "errors" + "fmt" + "sort" "sync" "github.com/pingcap/tidb/pkg/ddl/placement" @@ -107,11 +110,38 @@ func (m *mockPlacementManager) PutRuleBundles(_ context.Context, bundles []*plac m.bundles = make(map[string]*placement.Bundle) } + rules := 0 for _, bundle := range bundles { if bundle.IsEmpty() { delete(m.bundles, bundle.ID) } else { m.bundles[bundle.ID] = bundle + rules += len(bundle.Rules) + } + } + + // Check that no bundles are overlapping + type keyRange struct { + start string + end string + } + keys := make([]keyRange, 0, rules) + for k := range m.bundles { + for _, rule := range m.bundles[k].Rules { + if rule.Role == pd.Leader { + keys = append(keys, keyRange{start: rule.StartKeyHex, end: rule.EndKeyHex}) + } + } + } + sort.Slice(keys, func(i, j int) bool { + if keys[i].start == keys[j].start { + return keys[i].end < keys[j].end + } + return keys[i].start < keys[j].start + }) + for i := 1; i < len(keys); i++ { + if keys[i].start < keys[i-1].end { + return errors.New(fmt.Sprintf(`ERROR 8243 (HY000): "[PD:placement:ErrBuildRuleList]build rule list failed, multiple leader replicas for range {%s, %s}`, keys[i-1].start, keys[i].end)) } } From 93a14c9ec58246b32528dc17213bbcd467a1787b Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Wed, 20 Nov 2024 11:51:44 +0100 Subject: [PATCH 02/23] Linting --- pkg/domain/infosync/placement_manager.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/domain/infosync/placement_manager.go b/pkg/domain/infosync/placement_manager.go index e740257c04ab4..2445df07da427 100644 --- a/pkg/domain/infosync/placement_manager.go +++ b/pkg/domain/infosync/placement_manager.go @@ -16,7 +16,6 @@ package infosync import ( "context" - "errors" "fmt" "sort" "sync" @@ -141,7 +140,7 @@ func (m *mockPlacementManager) PutRuleBundles(_ context.Context, bundles []*plac }) for i := 1; i < len(keys); i++ { if keys[i].start < keys[i-1].end { - return errors.New(fmt.Sprintf(`ERROR 8243 (HY000): "[PD:placement:ErrBuildRuleList]build rule list failed, multiple leader replicas for range {%s, %s}`, keys[i-1].start, keys[i].end)) + return fmt.Errorf(`ERROR 8243 (HY000): "[PD:placement:ErrBuildRuleList]build rule list failed, multiple leader replicas for range {%s, %s}`, keys[i-1].start, keys[i].end) } } From 9210c20556ef8429b735aa86503982b1e20803ab Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Wed, 20 Nov 2024 23:32:49 +0100 Subject: [PATCH 03/23] Removed handling of partition bundles, since it was only using table bundles anyway --- pkg/ddl/partition.go | 13 +++++---- pkg/ddl/placement_policy_test.go | 8 +++++- pkg/domain/infosync/placement_manager.go | 4 +-- pkg/infoschema/builder.go | 35 ++++++------------------ pkg/infoschema/bundle_builder.go | 17 +----------- pkg/infoschema/infoschema_v2.go | 10 +------ 6 files changed, 28 insertions(+), 59 deletions(-) diff --git a/pkg/ddl/partition.go b/pkg/ddl/partition.go index c813f00df33a2..281501789d625 100644 --- a/pkg/ddl/partition.go +++ b/pkg/ddl/partition.go @@ -273,17 +273,20 @@ func alterTableLabelRule(schemaName string, meta *model.TableInfo, ids []int64) func alterTablePartitionBundles(t *meta.Mutator, tblInfo *model.TableInfo, addingDefinitions []model.PartitionDefinition) ([]*placement.Bundle, error) { var bundles []*placement.Bundle + // TODO: Set the new bundles as it should look for the final table! + // And verify that nothing breaks during ApplyDiff etc. for the intermediate tableInfo's // tblInfo do not include added partitions, so we should add them first tblInfo = tblInfo.Clone() - p := *tblInfo.Partition + p := tblInfo.Partition if p.DDLAction == model.ActionAlterTablePartitioning && p.Type == pmodel.PartitionTypeNone { - // skip adding the original table as partition + // skip the original table as partition p.Definitions = []model.PartitionDefinition{} - } else { - p.Definitions = append([]model.PartitionDefinition{}, p.Definitions...) } p.Definitions = append(p.Definitions, addingDefinitions...) - tblInfo.Partition = &p + + if p.NewTableID != 0 { + tblInfo.ID = p.NewTableID + } // bundle for table should be recomputed because it includes some default configs for partitions tblBundle, err := placement.NewTableBundle(t, tblInfo) diff --git a/pkg/ddl/placement_policy_test.go b/pkg/ddl/placement_policy_test.go index 187750ff0fa1a..a945e5a3b1830 100644 --- a/pkg/ddl/placement_policy_test.go +++ b/pkg/ddl/placement_policy_test.go @@ -2403,7 +2403,7 @@ func TestRecoverTableWithPlacementPolicy(t *testing.T) { } func TestPartitionByWithLabels(t *testing.T) { - store, _ := testkit.CreateMockStoreAndDomain(t) + store, do := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -2412,4 +2412,10 @@ func TestPartitionByWithLabels(t *testing.T) { tk.MustExec(`CREATE TABLE t1 (id INT)`) tk.MustExec(`ALTER TABLE t1 placement policy pp1`) tk.MustExec(`ALTER TABLE t1 PARTITION BY HASH (id) PARTITIONS 3`) + tk.MustQuery("show create table t1").Check(testkit.Rows("" + + "t1 CREATE TABLE `t1` (\n" + + " `id` int(11) DEFAULT NULL\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`pp1` */\n" + + "PARTITION BY HASH (`id`) PARTITIONS 3")) + checkExistTableBundlesInPD(t, do, "test", "t1") } diff --git a/pkg/domain/infosync/placement_manager.go b/pkg/domain/infosync/placement_manager.go index 2445df07da427..808664df01bb4 100644 --- a/pkg/domain/infosync/placement_manager.go +++ b/pkg/domain/infosync/placement_manager.go @@ -127,8 +127,8 @@ func (m *mockPlacementManager) PutRuleBundles(_ context.Context, bundles []*plac keys := make([]keyRange, 0, rules) for k := range m.bundles { for _, rule := range m.bundles[k].Rules { - if rule.Role == pd.Leader { - keys = append(keys, keyRange{start: rule.StartKeyHex, end: rule.EndKeyHex}) + if rule.Role == pd.Leader && !m.bundles[k].Override { + keys = append(keys, keyRange{start: k + ":" + rule.StartKeyHex, end: k + ":" + rule.EndKeyHex}) } } } diff --git a/pkg/infoschema/builder.go b/pkg/infoschema/builder.go index 5b5c43a5677fd..0010a580a78b9 100644 --- a/pkg/infoschema/builder.go +++ b/pkg/infoschema/builder.go @@ -119,8 +119,9 @@ func applyTruncateTableOrPartition(b *Builder, m meta.Reader, diff *model.Schema // bundle ops if diff.Type == model.ActionTruncateTable { b.deleteBundle(b.infoSchema, diff.OldTableID) - b.markTableBundleShouldUpdate(diff.TableID) } + b.markTableBundleShouldUpdate(diff.TableID) + // TODO: check that all partitions are updated to the cache! for _, opt := range diff.AffectedOpts { if diff.Type == model.ActionTruncateTablePartition { @@ -128,7 +129,6 @@ func applyTruncateTableOrPartition(b *Builder, m meta.Reader, diff *model.Schema // While session 1 performs the DML operation associated with partition 1, // the TRUNCATE operation of session 2 on partition 2 does not cause the operation of session 1 to fail. tblIDs = append(tblIDs, opt.OldTableID) - b.markPartitionBundleShouldUpdate(opt.TableID) } b.deleteBundle(b.infoSchema, opt.OldTableID) } @@ -155,15 +155,17 @@ func applyReorganizePartition(b *Builder, m meta.Reader, diff *model.SchemaDiff) return nil, errors.Trace(err) } + // The table might have changed TableID + if diff.TableID != diff.OldTableID && diff.OldTableID != 0 { + b.deleteBundle(b.infoSchema, diff.OldTableID) + } + b.markTableBundleShouldUpdate(diff.TableID) + // bundle ops for _, opt := range diff.AffectedOpts { if opt.OldTableID != 0 { b.deleteBundle(b.infoSchema, opt.OldTableID) } - if opt.TableID != 0 { - b.markTableBundleShouldUpdate(opt.TableID) - } - // TODO: Should we also check markPartitionBundleShouldUpdate?!? } return tblIDs, nil } @@ -228,6 +230,7 @@ func applyExchangeTablePartition(b *Builder, m meta.Reader, diff *model.SchemaDi } // partID is the new id for the non-partitioned table! b.markTableBundleShouldUpdate(partID) + b.markTableBundleShouldUpdate(ptID) // Then the partitioned table, will re-read the whole table, including all partitions! currDiff.TableID = ptID currDiff.SchemaID = ptSchemaID @@ -238,7 +241,6 @@ func applyExchangeTablePartition(b *Builder, m meta.Reader, diff *model.SchemaDi return nil, errors.Trace(err) } // ntID is the new id for the partition! - b.markPartitionBundleShouldUpdate(ntID) err = updateAutoIDForExchangePartition(b.Requirement.Store(), ptSchemaID, ptID, ntSchemaID, ntID) if err != nil { return nil, errors.Trace(err) @@ -615,23 +617,6 @@ func (b *Builder) copySortedTablesBucket(bucketIdx int) { b.infoSchema.sortedTablesBuckets[bucketIdx] = newSortedTables } -func (b *Builder) updateBundleForCreateTable(tblInfo *model.TableInfo, tp model.ActionType) { - switch tp { - case model.ActionDropTablePartition: - case model.ActionTruncateTablePartition: - // ReorganizePartition handle the bundles in applyReorganizePartition - case model.ActionReorganizePartition, model.ActionRemovePartitioning, - model.ActionAlterTablePartitioning: - default: - pi := tblInfo.GetPartitionInfo() - if pi != nil { - for _, partition := range pi.Definitions { - b.markPartitionBundleShouldUpdate(partition.ID) - } - } - } -} - func (b *Builder) buildAllocsForCreateTable(tp model.ActionType, dbInfo *model.DBInfo, tblInfo *model.TableInfo, allocs autoid.Allocators) autoid.Allocators { if len(allocs.Allocs) != 0 { tblVer := autoid.AllocOptionTableInfoVersion(tblInfo.Version) @@ -678,8 +663,6 @@ func applyCreateTable(b *Builder, m meta.Reader, dbInfo *model.DBInfo, tableID i ) } - b.updateBundleForCreateTable(tblInfo, tp) - if tp != model.ActionTruncateTablePartition { affected = appendAffectedIDs(affected, tblInfo) } diff --git a/pkg/infoschema/bundle_builder.go b/pkg/infoschema/bundle_builder.go index 512d8c9431860..40408a0b252ea 100644 --- a/pkg/infoschema/bundle_builder.go +++ b/pkg/infoschema/bundle_builder.go @@ -41,13 +41,10 @@ type bundleInfoBuilder struct { updateTables map[int64]any // all tables or partitions referring these policies should update placement bundle updatePolicies map[int64]any - // partitions that need to update placement bundle - updatePartitions map[int64]any } func (b *bundleInfoBuilder) initBundleInfoBuilder() { b.updateTables = make(map[int64]any) - b.updatePartitions = make(map[int64]any) b.updatePolicies = make(map[int64]any) } @@ -63,10 +60,6 @@ func (b *bundleInfoBuilder) markTableBundleShouldUpdate(tblID int64) { b.updateTables[tblID] = struct{}{} } -func (b *bundleInfoBuilder) markPartitionBundleShouldUpdate(partID int64) { - b.updatePartitions[partID] = struct{}{} -} - func (b *bundleInfoBuilder) markBundlesReferPolicyShouldUpdate(policyID int64) { b.updatePolicies[policyID] = struct{}{} } @@ -90,7 +83,7 @@ func (b *bundleInfoBuilder) updateInfoSchemaBundles(is *infoSchema) { } func (b *bundleInfoBuilder) completeUpdateTables(is *infoSchema) { - if len(b.updatePolicies) == 0 && len(b.updatePartitions) == 0 { + if len(b.updatePolicies) == 0 { return } @@ -102,14 +95,6 @@ func (b *bundleInfoBuilder) completeUpdateTables(is *infoSchema) { b.markTableBundleShouldUpdate(tblInfo.ID) } } - - if tblInfo.Partition != nil { - for _, par := range tblInfo.Partition.Definitions { - if _, ok := b.updatePartitions[par.ID]; ok { - b.markTableBundleShouldUpdate(tblInfo.ID) - } - } - } } } } diff --git a/pkg/infoschema/infoschema_v2.go b/pkg/infoschema/infoschema_v2.go index 4f732031d94ed..bdd7d6dec20b3 100644 --- a/pkg/infoschema/infoschema_v2.go +++ b/pkg/infoschema/infoschema_v2.go @@ -1428,7 +1428,7 @@ func (b *bundleInfoBuilder) updateInfoSchemaBundlesV2(is *infoschemaV2) { } func (b *bundleInfoBuilder) completeUpdateTablesV2(is *infoschemaV2) { - if len(b.updatePolicies) == 0 && len(b.updatePartitions) == 0 { + if len(b.updatePolicies) == 0 { return } @@ -1441,14 +1441,6 @@ func (b *bundleInfoBuilder) completeUpdateTablesV2(is *infoschemaV2) { b.markTableBundleShouldUpdate(tblInfo.ID) } } - - if tblInfo.Partition != nil { - for _, par := range tblInfo.Partition.Definitions { - if _, ok := b.updatePartitions[par.ID]; ok { - b.markTableBundleShouldUpdate(tblInfo.ID) - } - } - } } } } From f156b899b37f4f4aed4bbcdf1b64633c5a202d7b Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Thu, 21 Nov 2024 01:00:42 +0100 Subject: [PATCH 04/23] Fixed update placement bundles for partitions --- pkg/ddl/partition.go | 13 +++++++++++++ pkg/infoschema/builder.go | 4 ++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/pkg/ddl/partition.go b/pkg/ddl/partition.go index 281501789d625..34dea5a053395 100644 --- a/pkg/ddl/partition.go +++ b/pkg/ddl/partition.go @@ -273,6 +273,19 @@ func alterTableLabelRule(schemaName string, meta *model.TableInfo, ids []int64) func alterTablePartitionBundles(t *meta.Mutator, tblInfo *model.TableInfo, addingDefinitions []model.PartitionDefinition) ([]*placement.Bundle, error) { var bundles []*placement.Bundle + // We want to achieve: + // - before we do any reorganization/write to new partitions/global indexes that the placement rules are in-place + // - not removing any placement rules for removed partitions + // - not leaving anything in case of failure/rollback! + // So we will: + // 1) First write the new bundles including both new and old partitions, + // EXCEPT if the old partition is in fact a table, then skip that partition + // This should be done for TRUNCATE/ADD/REORGANIZE PARTITION (incl. REMOVE PARTITIONING and PARTITION BY) + // 2) Then overwrite the bundles with the final partitioning scheme + // This can be done directly for DROP PARTITION. + + // TODO: How to handle labels?!? + // TODO: Handle rollback!! // TODO: Set the new bundles as it should look for the final table! // And verify that nothing breaks during ApplyDiff etc. for the intermediate tableInfo's // tblInfo do not include added partitions, so we should add them first diff --git a/pkg/infoschema/builder.go b/pkg/infoschema/builder.go index 0010a580a78b9..cd582a72da0a1 100644 --- a/pkg/infoschema/builder.go +++ b/pkg/infoschema/builder.go @@ -370,7 +370,7 @@ func (b *Builder) getTableIDs(m meta.Reader, diff *model.SchemaDiff) (oldTableID func (b *Builder) updateBundleForTableUpdate(diff *model.SchemaDiff, newTableID, oldTableID int64) { // handle placement rule cache switch diff.Type { - case model.ActionCreateTable: + case model.ActionCreateTable, model.ActionAddTablePartition: b.markTableBundleShouldUpdate(newTableID) case model.ActionDropTable: b.deleteBundle(b.infoSchema, oldTableID) @@ -379,7 +379,7 @@ func (b *Builder) updateBundleForTableUpdate(diff *model.SchemaDiff, newTableID, b.markTableBundleShouldUpdate(newTableID) case model.ActionRecoverTable: b.markTableBundleShouldUpdate(newTableID) - case model.ActionAlterTablePlacement: + case model.ActionAlterTablePlacement, model.ActionAlterTablePartitionPlacement: b.markTableBundleShouldUpdate(newTableID) } } From 58d859da1e81394c9f907b6a660f0a12129f2f94 Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Thu, 21 Nov 2024 01:30:24 +0100 Subject: [PATCH 05/23] In REORG PARTITION we first register the intermediate state and then the final one. --- pkg/ddl/partition.go | 45 ++++++++++++-------------------- pkg/ddl/placement_policy_test.go | 11 ++++++++ 2 files changed, 28 insertions(+), 28 deletions(-) diff --git a/pkg/ddl/partition.go b/pkg/ddl/partition.go index 34dea5a053395..276e9822e37ac 100644 --- a/pkg/ddl/partition.go +++ b/pkg/ddl/partition.go @@ -159,17 +159,12 @@ func (w *worker) onAddTablePartition(jobCtx *jobContext, job *model.Job) (ver in } } - bundles, err := alterTablePartitionBundles(jobCtx.metaMut, tblInfo, tblInfo.Partition.AddingDefinitions) + _, err = alterTablePartitionBundles(jobCtx.metaMut, tblInfo, tblInfo.Partition.AddingDefinitions) if err != nil { job.State = model.JobStateCancelled return ver, errors.Trace(err) } - if err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles); err != nil { - job.State = model.JobStateCancelled - return ver, errors.Wrapf(err, "failed to notify PD the placement rules") - } - ids := getIDs([]*model.TableInfo{tblInfo}) for _, p := range tblInfo.Partition.AddingDefinitions { ids = append(ids, p.ID) @@ -270,7 +265,7 @@ func alterTableLabelRule(schemaName string, meta *model.TableInfo, ids []int64) return false, nil } -func alterTablePartitionBundles(t *meta.Mutator, tblInfo *model.TableInfo, addingDefinitions []model.PartitionDefinition) ([]*placement.Bundle, error) { +func alterTablePartitionBundles(t *meta.Mutator, tblInfo *model.TableInfo, addingDefinitions []model.PartitionDefinition) (bool, error) { var bundles []*placement.Bundle // We want to achieve: @@ -284,11 +279,6 @@ func alterTablePartitionBundles(t *meta.Mutator, tblInfo *model.TableInfo, addin // 2) Then overwrite the bundles with the final partitioning scheme // This can be done directly for DROP PARTITION. - // TODO: How to handle labels?!? - // TODO: Handle rollback!! - // TODO: Set the new bundles as it should look for the final table! - // And verify that nothing breaks during ApplyDiff etc. for the intermediate tableInfo's - // tblInfo do not include added partitions, so we should add them first tblInfo = tblInfo.Clone() p := tblInfo.Partition if p.DDLAction == model.ActionAlterTablePartitioning && p.Type == pmodel.PartitionTypeNone { @@ -304,7 +294,7 @@ func alterTablePartitionBundles(t *meta.Mutator, tblInfo *model.TableInfo, addin // bundle for table should be recomputed because it includes some default configs for partitions tblBundle, err := placement.NewTableBundle(t, tblInfo) if err != nil { - return nil, errors.Trace(err) + return false, errors.Trace(err) } if tblBundle != nil { @@ -313,11 +303,14 @@ func alterTablePartitionBundles(t *meta.Mutator, tblInfo *model.TableInfo, addin partitionBundles, err := placement.NewPartitionListBundles(t, addingDefinitions) if err != nil { - return nil, errors.Trace(err) + return false, errors.Trace(err) } bundles = append(bundles, partitionBundles...) - return bundles, nil + if len(bundles) > 0 { + return true, infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles) + } + return false, nil } // When drop/truncate a partition, we should still keep the dropped partition's placement settings to avoid unnecessary region schedules. @@ -3329,7 +3322,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver // In the next step, StateDeleteOnly, wait to verify the TiFlash replicas are OK } - bundles, err := alterTablePartitionBundles(metaMut, tblInfo, tblInfo.Partition.AddingDefinitions) + changed, err := alterTablePartitionBundles(metaMut, tblInfo, tblInfo.Partition.AddingDefinitions) if err != nil { if !changesMade { job.State = model.JobStateCancelled @@ -3337,23 +3330,13 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver } return rollbackReorganizePartitionWithErr(jobCtx, job, err) } - - if len(bundles) > 0 { - if err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles); err != nil { - if !changesMade { - job.State = model.JobStateCancelled - return ver, errors.Wrapf(err, "failed to notify PD the placement rules") - } - return rollbackReorganizePartitionWithErr(jobCtx, job, err) - } - changesMade = true - } + changesMade = changesMade || changed ids := getIDs([]*model.TableInfo{tblInfo}) for _, p := range tblInfo.Partition.AddingDefinitions { ids = append(ids, p.ID) } - changed, err := alterTableLabelRule(job.SchemaName, tblInfo, ids) + changed, err = alterTableLabelRule(job.SchemaName, tblInfo, ids) changesMade = changesMade || changed if err != nil { if !changesMade { @@ -3529,6 +3512,12 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver tblInfo.Partition.Columns, tblInfo.Partition.DDLColumns = tblInfo.Partition.DDLColumns, tblInfo.Partition.Columns } + // We need to update the Placement rule bundles with the final partitions. + _, err = alterTablePartitionBundles(metaMut, tblInfo, nil) + if err != nil { + return ver, err + } + failpoint.Inject("reorgPartFail2", func(val failpoint.Value) { if val.(bool) { job.ErrorCount += variable.GetDDLErrorCountLimit() / 2 diff --git a/pkg/ddl/placement_policy_test.go b/pkg/ddl/placement_policy_test.go index a945e5a3b1830..7ad112513919e 100644 --- a/pkg/ddl/placement_policy_test.go +++ b/pkg/ddl/placement_policy_test.go @@ -1669,6 +1669,17 @@ func TestAlterTablePartitionPlacement(t *testing.T) { "(PARTITION `p0` VALUES LESS THAN (100),\n" + " PARTITION `p1` VALUES LESS THAN (1000))")) checkExistTableBundlesInPD(t, dom, "test", "tp") + + tk.MustExec(`alter table tp reorganize partition p1 into (partition p1 values less than (750) placement policy p1, partition p2 values less than (1500) placement policy p0)`) + tk.MustQuery("show create table tp").Check(testkit.Rows("" + + "tp CREATE TABLE `tp` (\n" + + " `id` int(11) DEFAULT NULL\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`p0` */\n" + + "PARTITION BY RANGE (`id`)\n" + + "(PARTITION `p0` VALUES LESS THAN (100),\n" + + " PARTITION `p1` VALUES LESS THAN (750) /*T![placement] PLACEMENT POLICY=`p1` */,\n" + + " PARTITION `p2` VALUES LESS THAN (1500) /*T![placement] PLACEMENT POLICY=`p0` */)")) + checkExistTableBundlesInPD(t, dom, "test", "tp") } func TestAddPartitionWithPlacement(t *testing.T) { From 227ccd8602c4b484fde3f8e00d5de24772a93a86 Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Thu, 21 Nov 2024 23:28:37 +0100 Subject: [PATCH 06/23] Fixed failure and added tests for rollback of reorg partition and placement rules --- pkg/ddl/partition.go | 76 ++++---- pkg/ddl/rollingback.go | 6 +- .../tests/partition/reorg_partition_test.go | 170 ++++++++++++++++-- pkg/domain/infosync/info.go | 1 + 4 files changed, 192 insertions(+), 61 deletions(-) diff --git a/pkg/ddl/partition.go b/pkg/ddl/partition.go index 276e9822e37ac..d0ee99fb58935 100644 --- a/pkg/ddl/partition.go +++ b/pkg/ddl/partition.go @@ -265,30 +265,24 @@ func alterTableLabelRule(schemaName string, meta *model.TableInfo, ids []int64) return false, nil } -func alterTablePartitionBundles(t *meta.Mutator, tblInfo *model.TableInfo, addingDefinitions []model.PartitionDefinition) (bool, error) { - var bundles []*placement.Bundle - +func alterTablePartitionBundles(t *meta.Mutator, tblInfo *model.TableInfo, addDefs []model.PartitionDefinition) (bool, error) { // We want to achieve: // - before we do any reorganization/write to new partitions/global indexes that the placement rules are in-place // - not removing any placement rules for removed partitions - // - not leaving anything in case of failure/rollback! // So we will: // 1) First write the new bundles including both new and old partitions, // EXCEPT if the old partition is in fact a table, then skip that partition - // This should be done for TRUNCATE/ADD/REORGANIZE PARTITION (incl. REMOVE PARTITIONING and PARTITION BY) - // 2) Then overwrite the bundles with the final partitioning scheme - // This can be done directly for DROP PARTITION. + // 2) Then overwrite the bundles with the final partitioning scheme (second call in onReorg/ tblInfo = tblInfo.Clone() p := tblInfo.Partition - if p.DDLAction == model.ActionAlterTablePartitioning && p.Type == pmodel.PartitionTypeNone { - // skip the original table as partition - p.Definitions = []model.PartitionDefinition{} - } - p.Definitions = append(p.Definitions, addingDefinitions...) - - if p.NewTableID != 0 { - tblInfo.ID = p.NewTableID + if p != nil { + // skip the original table as partition if partitioning a non-partitioned table + if p.DDLAction != model.ActionAlterTablePartitioning || p.Type != pmodel.PartitionTypeNone { + // prepend with existing partitions + addDefs = append(p.Definitions, addDefs...) + } + p.Definitions = addDefs } // bundle for table should be recomputed because it includes some default configs for partitions @@ -297,16 +291,18 @@ func alterTablePartitionBundles(t *meta.Mutator, tblInfo *model.TableInfo, addin return false, errors.Trace(err) } + var bundles []*placement.Bundle if tblBundle != nil { bundles = append(bundles, tblBundle) } - partitionBundles, err := placement.NewPartitionListBundles(t, addingDefinitions) + partitionBundles, err := placement.NewPartitionListBundles(t, addDefs) if err != nil { return false, errors.Trace(err) } bundles = append(bundles, partitionBundles...) + if len(bundles) > 0 { return true, infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles) } @@ -351,19 +347,15 @@ func updateAddingPartitionInfo(partitionInfo *model.PartitionInfo, tblInfo *mode } // rollbackAddingPartitionInfo remove the `addingDefinitions` in the tableInfo. -func rollbackAddingPartitionInfo(tblInfo *model.TableInfo) ([]int64, []string, []*placement.Bundle) { +func rollbackAddingPartitionInfo(tblInfo *model.TableInfo) ([]int64, []string) { physicalTableIDs := make([]int64, 0, len(tblInfo.Partition.AddingDefinitions)) partNames := make([]string, 0, len(tblInfo.Partition.AddingDefinitions)) - rollbackBundles := make([]*placement.Bundle, 0, len(tblInfo.Partition.AddingDefinitions)) for _, one := range tblInfo.Partition.AddingDefinitions { physicalTableIDs = append(physicalTableIDs, one.ID) partNames = append(partNames, one.Name.L) - if one.PlacementPolicyRef != nil { - rollbackBundles = append(rollbackBundles, placement.NewBundle(one.ID)) - } } tblInfo.Partition.AddingDefinitions = nil - return physicalTableIDs, partNames, rollbackBundles + return physicalTableIDs, partNames } // checkAddPartitionValue check add Partition Values, @@ -2161,7 +2153,10 @@ func dropLabelRules(ctx context.Context, schemaName, tableName string, partNames // It will drop newly created partitions that has not yet been used, including cleaning // up label rules and bundles as well as changed indexes due to global flag. func (w *worker) rollbackLikeDropPartition(jobCtx *jobContext, job *model.Job) (ver int64, _ error) { - args := jobCtx.jobArgs.(*model.TablePartitionArgs) + args, err := model.GetTablePartitionArgs(job) + if err != nil { + return ver, errors.Trace(err) + } partInfo := args.PartInfo metaMut := jobCtx.metaMut tblInfo, err := GetTableInfoAndCancelFaultJob(metaMut, job, job.SchemaID) @@ -2169,12 +2164,7 @@ func (w *worker) rollbackLikeDropPartition(jobCtx *jobContext, job *model.Job) ( return ver, errors.Trace(err) } tblInfo.Partition.DroppingDefinitions = nil - physicalTableIDs, pNames, rollbackBundles := rollbackAddingPartitionInfo(tblInfo) - err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), rollbackBundles) - if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Wrapf(err, "failed to notify PD the placement rules") - } + physicalTableIDs, pNames := rollbackAddingPartitionInfo(tblInfo) // TODO: Will this drop LabelRules for existing partitions, if the new partitions have the same name? err = dropLabelRules(w.ctx, job.SchemaName, tblInfo.Name.L, pNames) if err != nil { @@ -2189,7 +2179,9 @@ func (w *worker) rollbackLikeDropPartition(jobCtx *jobContext, job *model.Job) ( if partInfo.Type != pmodel.PartitionTypeNone { // ALTER TABLE ... PARTITION BY // Also remove anything with the new table id - physicalTableIDs = append(physicalTableIDs, partInfo.NewTableID) + if partInfo.NewTableID != 0 { + physicalTableIDs = append(physicalTableIDs, partInfo.NewTableID) + } // Reset if it was normal table before if tblInfo.Partition.Type == pmodel.PartitionTypeNone || tblInfo.Partition.DDLType == pmodel.PartitionTypeNone { @@ -2212,6 +2204,11 @@ func (w *worker) rollbackLikeDropPartition(jobCtx *jobContext, job *model.Job) ( tblInfo.Partition.ClearReorgIntermediateInfo() } + _, err = alterTablePartitionBundles(metaMut, tblInfo, nil) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Wrapf(err, "failed to notify PD the placement rules") + } ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true) if err != nil { return ver, errors.Trace(err) @@ -3172,11 +3169,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver jobCtx.jobArgs = args // Handle the rolling back job if job.IsRollingback() { - ver, err := w.rollbackLikeDropPartition(jobCtx, job) - if err != nil { - return ver, errors.Trace(err) - } - return ver, nil + return w.rollbackLikeDropPartition(jobCtx, job) } tblInfo, partNames, partInfo, _, addingDefinitions, err := getReorgPartitionInfo(jobCtx.metaMut, job, args) @@ -3512,12 +3505,6 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver tblInfo.Partition.Columns, tblInfo.Partition.DDLColumns = tblInfo.Partition.DDLColumns, tblInfo.Partition.Columns } - // We need to update the Placement rule bundles with the final partitions. - _, err = alterTablePartitionBundles(metaMut, tblInfo, nil) - if err != nil { - return ver, err - } - failpoint.Inject("reorgPartFail2", func(val failpoint.Value) { if val.(bool) { job.ErrorCount += variable.GetDDLErrorCountLimit() / 2 @@ -3624,6 +3611,13 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver return ver, errors.Trace(err) } } + + // We need to update the Placement rule bundles with the final partitions. + _, err = alterTablePartitionBundles(metaMut, tblInfo, nil) + if err != nil { + return ver, err + } + failpoint.Inject("reorgPartFail5", func(val failpoint.Value) { if val.(bool) { job.ErrorCount += variable.GetDDLErrorCountLimit() / 2 diff --git a/pkg/ddl/rollingback.go b/pkg/ddl/rollingback.go index 5270437a9b857..da8c1af3850dd 100644 --- a/pkg/ddl/rollingback.go +++ b/pkg/ddl/rollingback.go @@ -453,7 +453,11 @@ func convertReorgPartitionJob2RollbackJob(jobCtx *jobContext, job *model.Job, ot pi.DDLState = job.SchemaState } - args := jobCtx.jobArgs.(*model.TablePartitionArgs) + args, err := model.GetTablePartitionArgs(job) + if err != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(err) + } args.PartNames = partNames job.FillArgs(args) ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true) diff --git a/pkg/ddl/tests/partition/reorg_partition_test.go b/pkg/ddl/tests/partition/reorg_partition_test.go index 41871048893ad..2bac49a0ed010 100644 --- a/pkg/ddl/tests/partition/reorg_partition_test.go +++ b/pkg/ddl/tests/partition/reorg_partition_test.go @@ -18,17 +18,21 @@ import ( "context" "encoding/hex" "fmt" + "math" "strconv" "testing" "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/ddl/logutil" + "github.com/pingcap/tidb/pkg/ddl/util" "github.com/pingcap/tidb/pkg/domain" + "github.com/pingcap/tidb/pkg/domain/infosync" "github.com/pingcap/tidb/pkg/errno" "github.com/pingcap/tidb/pkg/meta/model" pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessiontxn" + "github.com/pingcap/tidb/pkg/store/gcworker" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/testkit" @@ -52,7 +56,7 @@ type allTableData struct { // assumes that tableIDs are only increasing. // To be used during failure testing of ALTER, to make sure cleanup is done. func noNewTablesAfter(t *testing.T, tk *testkit.TestKit, ctx sessionctx.Context, tbl table.Table, msg string) { - waitForGC := tk.MustQuery(`select start_key, end_key from mysql.gc_delete_range union all select start_key, end_key from mysql.gc_delete_range_done`).Rows() + waitForGC := tk.MustQuery(`select start_key, end_key, "queue" from mysql.gc_delete_range union all select start_key, end_key, "done" from mysql.gc_delete_range_done`).Rows() require.NoError(t, sessiontxn.NewTxn(context.Background(), ctx)) txn, err := ctx.Txn(true) require.NoError(t, err) @@ -76,10 +80,17 @@ func noNewTablesAfter(t *testing.T, tk *testkit.TestKit, ctx sessionctx.Context, for _, rowGC := range waitForGC { logutil.DDLLogger().Info("GC", zap.String("start", fmt.Sprintf("%v", rowGC[0])), - zap.String("end", fmt.Sprintf("%v", rowGC[1]))) + zap.String("end", fmt.Sprintf("%v", rowGC[1])), + zap.String("status", fmt.Sprintf("%s", rowGC[2]))) } ROW: for it.Valid() { + foundTblID := tablecodec.DecodeTableID(it.Key()) + // There are internal table ids starting from MaxInt48 -1 and allocating decreasing ids + // Allow 0xFF of them, See JobTableID, ReorgTableID, HistoryTableID, MDLTableID + if it.Key()[0] == 't' && foundTblID >= 0xFFFFFFFFFF00 { + break + } for _, rowGC := range waitForGC { // OK if queued for range delete / GC startHex := fmt.Sprintf("%v", rowGC[0]) @@ -93,15 +104,14 @@ ROW: require.NoError(t, err) continue ROW } - logutil.DDLLogger().Info("not found in GC", - zap.String("key", keyHex), - zap.String("start", startHex), - zap.String("end", endHex)) + if keyHex < "748000f" { + logutil.DDLLogger().Error("not found in GC", + zap.String("key", keyHex), + zap.String("start", startHex), + zap.String("end", endHex)) + } } - foundTblID := tablecodec.DecodeTableID(it.Key()) - // There are internal table ids starting from MaxInt48 -1 and allocating decreasing ids - // Allow 0xFF of them, See JobTableID, ReorgTableID, HistoryTableID, MDLTableID - if it.Key()[0] == 't' && foundTblID < 0xFFFFFFFFFF00 { + if it.Key()[0] == 't' { is := sessiontxn.GetTxnManager(tk.Session()).GetTxnInfoSchema() tbl, found := is.TableByID(context.Background(), foundTblID) tblmsg := " Table ID no longer maps to a table" @@ -345,13 +355,13 @@ func TestPartitionByNonPartitionedTable(t *testing.T) { } func testReorganizePartitionFailures(t *testing.T, createSQL, alterSQL string, beforeDML []string, beforeResult [][]any, afterDML []string, afterResult [][]any, skipTests ...string) { + // Skip GC emulator, we trigger it manually to also clean up PlacementBundles + util.EmulatorGCDisable() store := testkit.CreateMockStore(t) + gcWorker, err := gcworker.NewMockGCWorker(store) + require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") - tk.MustExec("set tidb_enable_global_index=true") - defer func() { - tk.MustExec("set tidb_enable_global_index=default") - }() // Fail means we simply inject an error, and set the error count very high to see what happens // we do expect to do best effort rollback here as well! // Cancel means we set job.State = JobStateCancelled, as in no need to do more @@ -398,9 +408,13 @@ func testReorganizePartitionFailures(t *testing.T, createSQL, alterSQL string, b idxID = tOrg.Meta().Indices[0].ID } oldCreate := tk.MustQuery(`show create table t`).Rows() + // Run GC to clean changes in beforeDML + require.Nil(t, gcWorker.DeleteRanges(context.TODO(), math.MaxInt64)) + oldBundles, err := infosync.GetAllRuleBundles(context.TODO()) + require.NoError(t, err) name := "github.com/pingcap/tidb/pkg/ddl/reorgPart" + suffix testfailpoint.Enable(t, name, `return(true)`) - err := tk.ExecToErr(alterSQL) + err = tk.ExecToErr(alterSQL) require.Error(t, err, "failpoint reorgPart"+suffix) require.ErrorContains(t, err, "Injected error by reorgPart"+suffix) testfailpoint.Disable(t, name) @@ -418,16 +432,30 @@ func testReorganizePartitionFailures(t *testing.T, createSQL, alterSQL string, b if idxID != 0 { require.Equal(t, idxID, tt.Meta().Indices[0].ID, suffix) } + require.Nil(t, gcWorker.DeleteRanges(context.TODO(), math.MaxInt64)) noNewTablesAfter(t, tk, tk.Session(), tOrg, suffix) tk.MustExec(`admin check table t /* ` + suffix + ` */`) for _, sql := range afterDML { tk.MustExec(sql + " /* " + suffix + " */") } tk.MustQuery(`select * from t /* ` + suffix + ` */`).Sort().Check(afterResult) + newBundles, err := infosync.GetAllRuleBundles(context.TODO()) + require.NoError(t, err) + for i := range newBundles { + found := false + for j := range oldBundles { + if newBundles[i].ID == oldBundles[j].ID { + require.Equal(t, oldBundles[j].String(), newBundles[i].String(), suffix) + found = true + break + } + } + require.True(t, found, "%s: New bundle not cleaned up '%s':\n%s", suffix, newBundles[i].ID, newBundles[i].String()) + } + require.Equal(t, len(oldBundles), len(newBundles), suffix) tk.MustExec(`drop table t /* ` + suffix + ` */`) // TODO: Check TiFlash replicas // TODO: Check Label rules - // TODO: Check bundles // TODO: Check autoIDs } } @@ -813,9 +841,6 @@ func TestReorgPartitionRollback(t *testing.T) { ` partition p1 values less than (20),` + ` partition pMax values less than (MAXVALUE))`) tk.MustExec(`insert into t values (1,"1",1), (12,"12",21),(23,"23",32),(34,"34",43),(45,"45",54),(56,"56",65)`) - // TODO: Check that there are no additional placement rules, - // bundles, or ranges with non-completed tableIDs - // (partitions used during reorg, but was dropped) testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/mockUpdateVersionAndTableInfoErr", `return(1)`) tk.MustExecToErr("alter table t reorganize partition p1 into (partition p1a values less than (15), partition p1b values less than (20))") tk.MustExec(`admin check table t`) @@ -946,3 +971,110 @@ func TestPartitionIssue56634(t *testing.T) { tk.MustExec("create table t (a int)") tk.MustContainErrMsg("alter table t partition by range(a) (partition p1 values less than (20))", "[ddl:-1]DDL job rollback, error msg: Injected error in StateDeleteReorganization") // should NOT panic } + +func TestReorgPartitionFailuresPlacementPolicy(t *testing.T) { + create := `create table t (a int unsigned PRIMARY KEY, b varchar(255), c int, key (b), key (c,b))` + + ` partition by range (a) ` + + `(partition p0 values less than (10),` + + ` partition p1 values less than (20),` + + ` partition p2 values less than (30),` + + ` partition pMax values less than (MAXVALUE))` + beforeDML := []string{ + `create or replace placement policy pp1 followers=1`, + `create or replace placement policy pp2 followers=2`, + `create or replace placement policy pp3 followers=3`, + `alter table t placement policy ='pp1'`, + `alter table t partition p1 placement policy ='pp2'`, + `alter table t partition p2 placement policy ='pp3'`, + } + beforeResult := testkit.Rows() + alter := "alter table t reorganize partition p1,p2 into (partition p1 values less than (17), partition p1b values less than (24), partition p2 values less than (30))" + afterResult := testkit.Rows() + testReorganizePartitionFailures(t, create, alter, beforeDML, beforeResult, nil, afterResult, "Fail4") +} + +func TestRemovePartitionFailuresPlacementPolicy(t *testing.T) { + create := `create table t (a int unsigned primary key nonclustered, b int not null, c varchar(255)) partition by range(a) ( + partition p0 values less than (50), + partition p1 values less than (100), + partition p2 values less than (200))` + alter := `alter table t remove partitioning` + beforeDML := []string{ + `create or replace placement policy pp1 followers=1`, + `create or replace placement policy pp2 followers=2`, + `create or replace placement policy pp3 followers=2`, + `alter table t placement policy ='pp3'`, + `alter table t partition p1 placement policy ='pp1'`, + `alter table t partition p2 placement policy ='pp2'`, + `insert into t values (1,1,1),(2,2,2),(3,3,3),(101,101,101),(102,102,102),(103,103,103)`, + `update t set a = 11, b = "11", c = 11 where a = 1`, + `update t set b = "12", c = 12 where b = 2`, + `delete from t where a = 102`, + `delete from t where b = 103`, + } + beforeResult := testkit.Rows("101 101 101", "11 11 11", "2 12 12", "3 3 3") + afterDML := []string{ + `insert into t values (4,4,4),(5,5,5),(104,104,104)`, + `update t set a = 1, b = 1, c = 1 where a = 11`, + `update t set b = 2, c = 2 where c = 12`, + `update t set a = 9, b = 9 where a = 104`, + `delete from t where a = 5`, + `delete from t where b = 102`, + } + afterResult := testkit.Rows("1 1 1", "101 101 101", "2 2 2", "3 3 3", "4 4 4", "9 9 104") + testReorganizePartitionFailures(t, create, alter, beforeDML, beforeResult, afterDML, afterResult, "Fail4") +} + +func TestPartitionByFailuresPlacementPolicy(t *testing.T) { + create := `create table t (a int unsigned primary key nonclustered, b int not null, c varchar(255)) partition by range(a) ( + partition p0 values less than (100), + partition p1 values less than (200))` + alter := "alter table t partition by range (b) (partition pNoneC values less than (150), partition p2 values less than (300)) update indexes (`primary` global)" + beforeDML := []string{ + `insert into t values (1,1,1),(2,2,2),(3,3,3),(101,101,101),(102,102,102),(103,103,103)`, + `update t set a = 11, b = "11", c = 11 where a = 1`, + `update t set b = "12", c = 12 where b = 2`, + `delete from t where a = 102`, + `delete from t where b = 103`, + } + beforeResult := testkit.Rows("101 101 101", "11 11 11", "2 12 12", "3 3 3") + afterDML := []string{ + `insert into t values (4,4,4),(5,5,5),(104,104,104)`, + `update t set a = 1, b = 1, c = 1 where a = 11`, + `update t set b = 2, c = 2 where c = 12`, + `update t set a = 9, b = 9 where a = 104`, + `delete from t where a = 5`, + `delete from t where b = 102`, + } + afterResult := testkit.Rows("1 1 1", "101 101 101", "2 2 2", "3 3 3", "4 4 4", "9 9 104") + testReorganizePartitionFailures(t, create, alter, beforeDML, beforeResult, afterDML, afterResult) +} + +func TestReorganizePartitionFailuresAddPlacementPolicy(t *testing.T) { + create := `create table t (a int unsigned primary key nonclustered, b int not null, c varchar(255)) partition by range(a) ( + partition p0 values less than (50), + partition p1 values less than (100), + partition p2 values less than (200))` + beforeDML := []string{ + `create or replace placement policy pp1 followers=1`, + `insert into t values (4,4,4),(5,5,5),(104,104,104)`, + } + beforeResult := testkit.Rows("104 104 104", "4 4 4", "5 5 5") + alter := `alter table t reorganize partition p2 into (partition p2 values less than (200), partition pMax values less than (maxvalue) placement policy pp1)` + afterResult := beforeResult + testReorganizePartitionFailures(t, create, alter, beforeDML, beforeResult, nil, afterResult, "Fail4") +} + +func TestPartitionByFailuresAddPlacementPolicyGlobalIndex(t *testing.T) { + create := `create table t (a int unsigned primary key nonclustered, b int not null, c varchar(255)) partition by range(a) ( + partition p0 values less than (50), + partition p1 values less than (100), + partition p2 values less than (200))` + beforeDML := []string{ + `create or replace placement policy pp1 followers=1`, + } + beforeResult := testkit.Rows() + alter := `alter table t partition by range (a) (partition p2 values less than (200), partition pMax values less than (maxvalue) placement policy pp1)` + afterResult := beforeResult + testReorganizePartitionFailures(t, create, alter, beforeDML, beforeResult, nil, afterResult) +} diff --git a/pkg/domain/infosync/info.go b/pkg/domain/infosync/info.go index ec18dac803fe5..26457b5dca0b8 100644 --- a/pkg/domain/infosync/info.go +++ b/pkg/domain/infosync/info.go @@ -546,6 +546,7 @@ func GetRuleBundle(ctx context.Context, name string) (*placement.Bundle, error) } // PutRuleBundles is used to post specific rule bundles to PD. +// an "empty" bundle means delete bundle if a bundle with such ID exists. func PutRuleBundles(ctx context.Context, bundles []*placement.Bundle) error { failpoint.Inject("putRuleBundlesError", func(isServiceError failpoint.Value) { var err error From 9200fbb21082e14f8bcc68ebdc87945192e97219 Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Fri, 22 Nov 2024 01:09:19 +0100 Subject: [PATCH 07/23] Added more tests and one minor fix --- pkg/ddl/partition.go | 3 +- pkg/ddl/placement_policy_test.go | 212 +++++++++++++++++- pkg/ddl/tests/partition/BUILD.bazel | 3 + .../tests/partition/reorg_partition_test.go | 45 +++- 4 files changed, 255 insertions(+), 8 deletions(-) diff --git a/pkg/ddl/partition.go b/pkg/ddl/partition.go index d0ee99fb58935..ebafc8964872b 100644 --- a/pkg/ddl/partition.go +++ b/pkg/ddl/partition.go @@ -3571,7 +3571,6 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver // REMOVE PARTITIONING // Storing the old table ID, used for updating statistics. oldTblID = tblInfo.ID - // TODO: Handle bundles? // TODO: Add concurrent test! // TODO: Will this result in big gaps? // TODO: How to carrie over AUTO_INCREMENT etc.? @@ -3585,7 +3584,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver return ver, errors.Trace(err) } tblInfo.ID = partInfo.NewTableID - if partInfo.DDLType != pmodel.PartitionTypeNone { + if oldTblID != physicalTableIDs[0] { // if partitioned before, then also add the old table ID, // otherwise it will be the already included first partition physicalTableIDs = append(physicalTableIDs, oldTblID) diff --git a/pkg/ddl/placement_policy_test.go b/pkg/ddl/placement_policy_test.go index 7ad112513919e..570286ff1a0f6 100644 --- a/pkg/ddl/placement_policy_test.go +++ b/pkg/ddl/placement_policy_test.go @@ -2413,20 +2413,230 @@ func TestRecoverTableWithPlacementPolicy(t *testing.T) { checkExistTableBundlesInPD(t, dom, "test", "tp3") } -func TestPartitionByWithLabels(t *testing.T) { +func getChangedBundles(old, new []*placement.Bundle) (retOld, retNew []*placement.Bundle) { +OldLoop: + for i := range old { + for j := range new { + if old[i].ID == new[j].ID { + continue OldLoop + } + } + retOld = append(retOld, old[i]) + } +NewLoop: + for i := range new { + for j := range old { + if old[j].ID == new[i].ID { + continue NewLoop + } + } + retNew = append(retNew, new[i]) + } + return retOld, retNew +} + +func TestAlterPartitioningWithPlacementPolicy(t *testing.T) { + util.EmulatorGCDisable() store, do := testkit.CreateMockStoreAndDomain(t) + gcWorker, err := gcworker.NewMockGCWorker(store) + require.NoError(t, err) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") tk.MustExec("create placement policy pp1 primary_region='r1' regions='r1,r2'") + tk.MustExec("create placement policy pp2 primary_region='r2' regions='r1,r2'") tk.MustExec(`CREATE TABLE t1 (id INT)`) + tk.MustExec(`INSERT INTO t1 values (1),(2),(100),(150),(200),(213)`) tk.MustExec(`ALTER TABLE t1 placement policy pp1`) + require.Nil(t, gcWorker.DeleteRanges(context.TODO(), math.MaxInt64)) + origBundles, err := infosync.GetAllRuleBundles(context.TODO()) + require.NoError(t, err) tk.MustExec(`ALTER TABLE t1 PARTITION BY HASH (id) PARTITIONS 3`) + bundlesBeforeGC, err := infosync.GetAllRuleBundles(context.TODO()) + require.NoError(t, err) + require.Nil(t, gcWorker.DeleteRanges(context.TODO(), math.MaxInt64)) + bundlesAfterGC, err := infosync.GetAllRuleBundles(context.TODO()) + require.NoError(t, err) + oldBundles, newBundles := getChangedBundles(origBundles, bundlesBeforeGC) + require.Len(t, newBundles, 1) + require.Len(t, oldBundles, 0) + oldBundles, newBundles = getChangedBundles(bundlesBeforeGC, bundlesAfterGC) + require.Len(t, newBundles, 0) + require.Len(t, oldBundles, 1) tk.MustQuery("show create table t1").Check(testkit.Rows("" + "t1 CREATE TABLE `t1` (\n" + " `id` int(11) DEFAULT NULL\n" + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`pp1` */\n" + "PARTITION BY HASH (`id`) PARTITIONS 3")) checkExistTableBundlesInPD(t, do, "test", "t1") + + origBundles = bundlesAfterGC + tk.MustExec(`ALTER TABLE t1 ADD PARTITION (PARTITION p3 placement policy 'pp2')`) + bundlesBeforeGC, err = infosync.GetAllRuleBundles(context.TODO()) + require.NoError(t, err) + require.Nil(t, gcWorker.DeleteRanges(context.TODO(), math.MaxInt64)) + bundlesAfterGC, err = infosync.GetAllRuleBundles(context.TODO()) + require.NoError(t, err) + oldBundles, newBundles = getChangedBundles(origBundles, bundlesBeforeGC) + // One new partition level bundle + require.Len(t, newBundles, 1) + require.Len(t, oldBundles, 0) + oldBundles, newBundles = getChangedBundles(bundlesBeforeGC, bundlesAfterGC) + require.Len(t, newBundles, 0) + // No old bundles removed + require.Len(t, oldBundles, 0) + tk.MustQuery("show create table t1").Check(testkit.Rows("" + + "t1 CREATE TABLE `t1` (\n" + + " `id` int(11) DEFAULT NULL\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`pp1` */\n" + + "PARTITION BY HASH (`id`)\n" + + "(PARTITION `p0`,\n" + + " PARTITION `p1`,\n" + + " PARTITION `p2`,\n" + + " PARTITION `p3` /*T![placement] PLACEMENT POLICY=`pp2` */)")) + checkExistTableBundlesInPD(t, do, "test", "t1") + + origBundles = bundlesAfterGC + tk.MustExec(`ALTER TABLE t1 REMOVE PARTITIONING`) + bundlesBeforeGC, err = infosync.GetAllRuleBundles(context.TODO()) + require.NoError(t, err) + require.Nil(t, gcWorker.DeleteRanges(context.TODO(), math.MaxInt64)) + bundlesAfterGC, err = infosync.GetAllRuleBundles(context.TODO()) + require.NoError(t, err) + oldBundles, newBundles = getChangedBundles(origBundles, bundlesBeforeGC) + // One table level bundle, due to new table id. + require.Len(t, newBundles, 1) + require.Len(t, oldBundles, 0) + oldBundles, newBundles = getChangedBundles(bundlesBeforeGC, bundlesAfterGC) + require.Len(t, newBundles, 0) + // One table level due to new table id and one partition level policy removed + require.Len(t, oldBundles, 2) + tk.MustQuery("show create table t1").Check(testkit.Rows("" + + "t1 CREATE TABLE `t1` (\n" + + " `id` int(11) DEFAULT NULL\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`pp1` */")) + checkExistTableBundlesInPD(t, do, "test", "t1") + + origBundles = bundlesAfterGC + tk.MustExec(`ALTER TABLE t1 PARTITION BY RANGE (id) (partition p1 values less than (100) placement policy pp2,partition p2 values less than (maxvalue))`) + bundlesBeforeGC, err = infosync.GetAllRuleBundles(context.TODO()) + require.NoError(t, err) + require.Nil(t, gcWorker.DeleteRanges(context.TODO(), math.MaxInt64)) + bundlesAfterGC, err = infosync.GetAllRuleBundles(context.TODO()) + require.NoError(t, err) + oldBundles, newBundles = getChangedBundles(origBundles, bundlesBeforeGC) + // One new bundle for the new table ID and one for the partition specific + require.Len(t, newBundles, 2) + require.Len(t, oldBundles, 0) + oldBundles, newBundles = getChangedBundles(bundlesBeforeGC, bundlesAfterGC) + require.Len(t, newBundles, 0) + // Only one old table level bundle + require.Len(t, oldBundles, 1) + tk.MustQuery("show create table t1").Check(testkit.Rows("" + + "t1 CREATE TABLE `t1` (\n" + + " `id` int(11) DEFAULT NULL\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`pp1` */\n" + + "PARTITION BY RANGE (`id`)\n" + + "(PARTITION `p1` VALUES LESS THAN (100) /*T![placement] PLACEMENT POLICY=`pp2` */,\n" + + " PARTITION `p2` VALUES LESS THAN (MAXVALUE))")) + checkExistTableBundlesInPD(t, do, "test", "t1") + + origBundles = bundlesAfterGC + tk.MustExec(`ALTER TABLE t1 REORGANIZE PARTITION p2 into (partition p2 values less than (200) placement policy pp1,partition pMax values less than (maxvalue) placement policy 'pp2')`) + bundlesBeforeGC, err = infosync.GetAllRuleBundles(context.TODO()) + require.NoError(t, err) + require.Nil(t, gcWorker.DeleteRanges(context.TODO(), math.MaxInt64)) + bundlesAfterGC, err = infosync.GetAllRuleBundles(context.TODO()) + require.NoError(t, err) + // REORGANIZE keeps the table id, but the the internal rules may change + oldBundles, newBundles = getChangedBundles(origBundles, bundlesBeforeGC) + // Two new partition level bundles + require.Len(t, newBundles, 2) + require.Len(t, oldBundles, 0) + oldBundles, newBundles = getChangedBundles(bundlesBeforeGC, bundlesAfterGC) + require.Len(t, newBundles, 0) + // No change in table ID and the reorganized partition did not have a partition level policy. + require.Len(t, oldBundles, 0) + tk.MustQuery("show create table t1").Check(testkit.Rows("" + + "t1 CREATE TABLE `t1` (\n" + + " `id` int(11) DEFAULT NULL\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`pp1` */\n" + + "PARTITION BY RANGE (`id`)\n" + + "(PARTITION `p1` VALUES LESS THAN (100) /*T![placement] PLACEMENT POLICY=`pp2` */,\n" + + " PARTITION `p2` VALUES LESS THAN (200) /*T![placement] PLACEMENT POLICY=`pp1` */,\n" + + " PARTITION `pMax` VALUES LESS THAN (MAXVALUE) /*T![placement] PLACEMENT POLICY=`pp2` */)")) + checkExistTableBundlesInPD(t, do, "test", "t1") + + origBundles = bundlesAfterGC + tk.MustExec(`ALTER TABLE t1 TRUNCATE PARTITION pMax`) + bundlesBeforeGC, err = infosync.GetAllRuleBundles(context.TODO()) + require.NoError(t, err) + require.Nil(t, gcWorker.DeleteRanges(context.TODO(), math.MaxInt64)) + bundlesAfterGC, err = infosync.GetAllRuleBundles(context.TODO()) + require.NoError(t, err) + oldBundles, newBundles = getChangedBundles(origBundles, bundlesBeforeGC) + // One new partition level bundle + require.Len(t, newBundles, 1) + require.Len(t, oldBundles, 0) + oldBundles, newBundles = getChangedBundles(bundlesBeforeGC, bundlesAfterGC) + require.Len(t, newBundles, 0) + // One old partition level bundle + require.Len(t, oldBundles, 1) + tk.MustQuery("show create table t1").Check(testkit.Rows("" + + "t1 CREATE TABLE `t1` (\n" + + " `id` int(11) DEFAULT NULL\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`pp1` */\n" + + "PARTITION BY RANGE (`id`)\n" + + "(PARTITION `p1` VALUES LESS THAN (100) /*T![placement] PLACEMENT POLICY=`pp2` */,\n" + + " PARTITION `p2` VALUES LESS THAN (200) /*T![placement] PLACEMENT POLICY=`pp1` */,\n" + + " PARTITION `pMax` VALUES LESS THAN (MAXVALUE) /*T![placement] PLACEMENT POLICY=`pp2` */)")) + checkExistTableBundlesInPD(t, do, "test", "t1") + + origBundles = bundlesAfterGC + tk.MustExec(`ALTER TABLE t1 DROP PARTITION p1,pMax`) + bundlesBeforeGC, err = infosync.GetAllRuleBundles(context.TODO()) + require.NoError(t, err) + require.Nil(t, gcWorker.DeleteRanges(context.TODO(), math.MaxInt64)) + bundlesAfterGC, err = infosync.GetAllRuleBundles(context.TODO()) + require.NoError(t, err) + oldBundles, newBundles = getChangedBundles(origBundles, bundlesBeforeGC) + // No new partition level bundles + require.Len(t, newBundles, 0) + require.Len(t, oldBundles, 0) + oldBundles, newBundles = getChangedBundles(bundlesBeforeGC, bundlesAfterGC) + require.Len(t, newBundles, 0) + // Two dropped partition level bundles. + require.Len(t, oldBundles, 2) + tk.MustQuery("show create table t1").Check(testkit.Rows("" + + "t1 CREATE TABLE `t1` (\n" + + " `id` int(11) DEFAULT NULL\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`pp1` */\n" + + "PARTITION BY RANGE (`id`)\n" + + "(PARTITION `p2` VALUES LESS THAN (200) /*T![placement] PLACEMENT POLICY=`pp1` */)")) + checkExistTableBundlesInPD(t, do, "test", "t1") + + origBundles = bundlesAfterGC + tk.MustExec(`ALTER TABLE t1 ADD PARTITION (PARTITION pMax VALUES LESS THAN (MAXVALUE) placement policy 'pp2')`) + bundlesBeforeGC, err = infosync.GetAllRuleBundles(context.TODO()) + require.NoError(t, err) + require.Nil(t, gcWorker.DeleteRanges(context.TODO(), math.MaxInt64)) + bundlesAfterGC, err = infosync.GetAllRuleBundles(context.TODO()) + require.NoError(t, err) + oldBundles, newBundles = getChangedBundles(origBundles, bundlesBeforeGC) + // One new partition level bundles + require.Len(t, newBundles, 1) + require.Len(t, oldBundles, 0) + oldBundles, newBundles = getChangedBundles(bundlesBeforeGC, bundlesAfterGC) + require.Len(t, newBundles, 0) + // No change in table ID. + require.Len(t, oldBundles, 0) + tk.MustQuery("show create table t1").Check(testkit.Rows("" + + "t1 CREATE TABLE `t1` (\n" + + " `id` int(11) DEFAULT NULL\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`pp1` */\n" + + "PARTITION BY RANGE (`id`)\n" + + "(PARTITION `p2` VALUES LESS THAN (200) /*T![placement] PLACEMENT POLICY=`pp1` */,\n" + + " PARTITION `pMax` VALUES LESS THAN (MAXVALUE) /*T![placement] PLACEMENT POLICY=`pp2` */)")) + checkExistTableBundlesInPD(t, do, "test", "t1") } diff --git a/pkg/ddl/tests/partition/BUILD.bazel b/pkg/ddl/tests/partition/BUILD.bazel index 7f65dece9dc1e..6de9298b31d09 100644 --- a/pkg/ddl/tests/partition/BUILD.bazel +++ b/pkg/ddl/tests/partition/BUILD.bazel @@ -17,7 +17,9 @@ go_test( "//pkg/ddl", "//pkg/ddl/logutil", "//pkg/ddl/testutil", + "//pkg/ddl/util", "//pkg/domain", + "//pkg/domain/infosync", "//pkg/errno", "//pkg/kv", "//pkg/meta/model", @@ -29,6 +31,7 @@ go_test( "//pkg/sessionctx", "//pkg/sessionctx/variable", "//pkg/sessiontxn", + "//pkg/store/gcworker", "//pkg/store/mockstore", "//pkg/table", "//pkg/table/tables", diff --git a/pkg/ddl/tests/partition/reorg_partition_test.go b/pkg/ddl/tests/partition/reorg_partition_test.go index 2bac49a0ed010..161d644aa992b 100644 --- a/pkg/ddl/tests/partition/reorg_partition_test.go +++ b/pkg/ddl/tests/partition/reorg_partition_test.go @@ -988,7 +988,7 @@ func TestReorgPartitionFailuresPlacementPolicy(t *testing.T) { `alter table t partition p2 placement policy ='pp3'`, } beforeResult := testkit.Rows() - alter := "alter table t reorganize partition p1,p2 into (partition p1 values less than (17), partition p1b values less than (24), partition p2 values less than (30))" + alter := "alter table t reorganize partition p1,p2 into (partition p1 values less than (17), partition p1b values less than (24) placement policy 'pp1', partition p2 values less than (30))" afterResult := testkit.Rows() testReorganizePartitionFailures(t, create, alter, beforeDML, beforeResult, nil, afterResult, "Fail4") } @@ -1029,8 +1029,38 @@ func TestPartitionByFailuresPlacementPolicy(t *testing.T) { create := `create table t (a int unsigned primary key nonclustered, b int not null, c varchar(255)) partition by range(a) ( partition p0 values less than (100), partition p1 values less than (200))` - alter := "alter table t partition by range (b) (partition pNoneC values less than (150), partition p2 values less than (300)) update indexes (`primary` global)" beforeDML := []string{ + `create or replace placement policy pp1 followers=1`, + `create or replace placement policy pp2 followers=2`, + `create or replace placement policy pp3 followers=3`, + `alter table t placement policy ='pp1'`, + `alter table t partition p0 placement policy ='pp2'`, + `insert into t values (1,1,1),(2,2,2),(3,3,3),(101,101,101),(102,102,102),(103,103,103)`, + `update t set a = 11, b = "11", c = 11 where a = 1`, + `update t set b = "12", c = 12 where b = 2`, + `delete from t where a = 102`, + `delete from t where b = 103`, + } + beforeResult := testkit.Rows("101 101 101", "11 11 11", "2 12 12", "3 3 3") + alter := "alter table t partition by range (b) (partition pNoneC values less than (150) placement policy 'pp3', partition p2 values less than (300)) update indexes (`primary` global)" + afterDML := []string{ + `insert into t values (4,4,4),(5,5,5),(104,104,104)`, + `update t set a = 1, b = 1, c = 1 where a = 11`, + `update t set b = 2, c = 2 where c = 12`, + `update t set a = 9, b = 9 where a = 104`, + `delete from t where a = 5`, + `delete from t where b = 102`, + } + afterResult := testkit.Rows("1 1 1", "101 101 101", "2 2 2", "3 3 3", "4 4 4", "9 9 104") + testReorganizePartitionFailures(t, create, alter, beforeDML, beforeResult, afterDML, afterResult) +} + +func TestPartitionNonPartitionedFailuresPlacementPolicy(t *testing.T) { + create := `create table t (a int unsigned primary key nonclustered, b int not null, c varchar(255))` + beforeDML := []string{ + `create or replace placement policy pp1 followers=1`, + `create or replace placement policy pp2 followers=2`, + `alter table t placement policy ='pp1'`, `insert into t values (1,1,1),(2,2,2),(3,3,3),(101,101,101),(102,102,102),(103,103,103)`, `update t set a = 11, b = "11", c = 11 where a = 1`, `update t set b = "12", c = 12 where b = 2`, @@ -1038,6 +1068,7 @@ func TestPartitionByFailuresPlacementPolicy(t *testing.T) { `delete from t where b = 103`, } beforeResult := testkit.Rows("101 101 101", "11 11 11", "2 12 12", "3 3 3") + alter := "alter table t partition by range (b) (partition pNoneC values less than (150), partition p2 values less than (300) placement policy 'pp1') update indexes (`primary` global)" afterDML := []string{ `insert into t values (4,4,4),(5,5,5),(104,104,104)`, `update t set a = 1, b = 1, c = 1 where a = 11`, @@ -1066,15 +1097,19 @@ func TestReorganizePartitionFailuresAddPlacementPolicy(t *testing.T) { } func TestPartitionByFailuresAddPlacementPolicyGlobalIndex(t *testing.T) { - create := `create table t (a int unsigned primary key nonclustered, b int not null, c varchar(255)) partition by range(a) ( + create := `create table t (a int unsigned primary key nonclustered global, b int not null, c varchar(255), unique key (c) global) partition by range(a) ( partition p0 values less than (50), partition p1 values less than (100), partition p2 values less than (200))` beforeDML := []string{ `create or replace placement policy pp1 followers=1`, + `create or replace placement policy pp2 followers=2`, + `alter table t placement policy pp1`, + `alter table t partition p2 placement policy pp2`, + `insert into t values (4,4,4),(50,50,50),(111,111,111),(155,155,155)`, } - beforeResult := testkit.Rows() - alter := `alter table t partition by range (a) (partition p2 values less than (200), partition pMax values less than (maxvalue) placement policy pp1)` + beforeResult := testkit.Rows("111 111 111", "155 155 155", "4 4 4", "50 50 50") + alter := "alter table t partition by range (a) (partition p1 values less than (150), partition pMax values less than (maxvalue) placement policy pp1) update indexes (`primary` local, `c` global)" afterResult := beforeResult testReorganizePartitionFailures(t, create, alter, beforeDML, beforeResult, nil, afterResult) } From d58c8d2b7fc5722596f3f50c0b2b8b0ccc9093aa Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Fri, 22 Nov 2024 02:23:51 +0100 Subject: [PATCH 08/23] Linting --- pkg/ddl/partition.go | 2 +- pkg/ddl/placement_policy_test.go | 18 +++++++++--------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/ddl/partition.go b/pkg/ddl/partition.go index ebafc8964872b..c752cc69199a0 100644 --- a/pkg/ddl/partition.go +++ b/pkg/ddl/partition.go @@ -278,7 +278,7 @@ func alterTablePartitionBundles(t *meta.Mutator, tblInfo *model.TableInfo, addDe p := tblInfo.Partition if p != nil { // skip the original table as partition if partitioning a non-partitioned table - if p.DDLAction != model.ActionAlterTablePartitioning || p.Type != pmodel.PartitionTypeNone { + if p.Definitions[0].ID != tblInfo.ID { // prepend with existing partitions addDefs = append(p.Definitions, addDefs...) } diff --git a/pkg/ddl/placement_policy_test.go b/pkg/ddl/placement_policy_test.go index 570286ff1a0f6..6a65844332e7a 100644 --- a/pkg/ddl/placement_policy_test.go +++ b/pkg/ddl/placement_policy_test.go @@ -2413,24 +2413,24 @@ func TestRecoverTableWithPlacementPolicy(t *testing.T) { checkExistTableBundlesInPD(t, dom, "test", "tp3") } -func getChangedBundles(old, new []*placement.Bundle) (retOld, retNew []*placement.Bundle) { +func getChangedBundles(oldBundle, newBundle []*placement.Bundle) (retOld, retNew []*placement.Bundle) { OldLoop: - for i := range old { - for j := range new { - if old[i].ID == new[j].ID { + for i := range oldBundle { + for j := range newBundle { + if oldBundle[i].ID == newBundle[j].ID { continue OldLoop } } - retOld = append(retOld, old[i]) + retOld = append(retOld, oldBundle[i]) } NewLoop: - for i := range new { - for j := range old { - if old[j].ID == new[i].ID { + for i := range newBundle { + for j := range oldBundle { + if oldBundle[j].ID == newBundle[i].ID { continue NewLoop } } - retNew = append(retNew, new[i]) + retNew = append(retNew, newBundle[i]) } return retOld, retNew } From 54abd6d4f45e544c36f6173b7cc7c814c8c68481 Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Fri, 22 Nov 2024 13:01:54 +0100 Subject: [PATCH 09/23] More advanced comparison for bundle rules leader overlap --- pkg/ddl/placement_policy_test.go | 2 +- pkg/domain/infosync/placement_manager.go | 26 +++++++++++++++++------- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/pkg/ddl/placement_policy_test.go b/pkg/ddl/placement_policy_test.go index 6a65844332e7a..c7b4614221b14 100644 --- a/pkg/ddl/placement_policy_test.go +++ b/pkg/ddl/placement_policy_test.go @@ -2549,7 +2549,7 @@ func TestAlterPartitioningWithPlacementPolicy(t *testing.T) { require.Nil(t, gcWorker.DeleteRanges(context.TODO(), math.MaxInt64)) bundlesAfterGC, err = infosync.GetAllRuleBundles(context.TODO()) require.NoError(t, err) - // REORGANIZE keeps the table id, but the the internal rules may change + // REORGANIZE keeps the table id, but the internal rules may change oldBundles, newBundles = getChangedBundles(origBundles, bundlesBeforeGC) // Two new partition level bundles require.Len(t, newBundles, 2) diff --git a/pkg/domain/infosync/placement_manager.go b/pkg/domain/infosync/placement_manager.go index 808664df01bb4..a0330dfcedb52 100644 --- a/pkg/domain/infosync/placement_manager.go +++ b/pkg/domain/infosync/placement_manager.go @@ -119,27 +119,39 @@ func (m *mockPlacementManager) PutRuleBundles(_ context.Context, bundles []*plac } } - // Check that no bundles are overlapping + // Check that no bundles have leaders overlapping ranges type keyRange struct { - start string - end string + start string + end string + id string + string int + override bool } keys := make([]keyRange, 0, rules) for k := range m.bundles { for _, rule := range m.bundles[k].Rules { - if rule.Role == pd.Leader && !m.bundles[k].Override { - keys = append(keys, keyRange{start: k + ":" + rule.StartKeyHex, end: k + ":" + rule.EndKeyHex}) + if rule.Role == pd.Leader { + keys = append(keys, keyRange{ + id: rule.ID, + start: rule.GroupID + ":" + rule.StartKeyHex, + end: rule.GroupID + ":" + rule.EndKeyHex, + override: rule.Override, + }) } } } + // Sort on Start (includes group_id), id, end sort.Slice(keys, func(i, j int) bool { if keys[i].start == keys[j].start { - return keys[i].end < keys[j].end + if keys[i].id == keys[j].id { + return keys[i].end < keys[j].end + } + return keys[i].id < keys[j].id } return keys[i].start < keys[j].start }) for i := 1; i < len(keys); i++ { - if keys[i].start < keys[i-1].end { + if keys[i].start < keys[i-1].end && !keys[i].override { return fmt.Errorf(`ERROR 8243 (HY000): "[PD:placement:ErrBuildRuleList]build rule list failed, multiple leader replicas for range {%s, %s}`, keys[i-1].start, keys[i].end) } } From 2f97ac7d1934e8aa6ea7b35444d18ab18fec0afb Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Fri, 22 Nov 2024 13:34:02 +0100 Subject: [PATCH 10/23] Renamed a function and added more comments --- pkg/ddl/partition.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/pkg/ddl/partition.go b/pkg/ddl/partition.go index c752cc69199a0..748afd0d5b47f 100644 --- a/pkg/ddl/partition.go +++ b/pkg/ddl/partition.go @@ -277,7 +277,11 @@ func alterTablePartitionBundles(t *meta.Mutator, tblInfo *model.TableInfo, addDe tblInfo = tblInfo.Clone() p := tblInfo.Partition if p != nil { - // skip the original table as partition if partitioning a non-partitioned table + // if partitioning a non-partitioned table, we will first change the metadata, + // so the table looks like a partitioned table, with the first/only partition having + // the same partition ID as the table, so we can access the table as a single partition. + // But in this case we should not add a bundle rule for the same range + // both as table and partition. if p.Definitions[0].ID != tblInfo.ID { // prepend with existing partitions addDefs = append(p.Definitions, addDefs...) @@ -346,8 +350,8 @@ func updateAddingPartitionInfo(partitionInfo *model.PartitionInfo, tblInfo *mode tblInfo.Partition.AddingDefinitions = append(tblInfo.Partition.AddingDefinitions, newDefs...) } -// rollbackAddingPartitionInfo remove the `addingDefinitions` in the tableInfo. -func rollbackAddingPartitionInfo(tblInfo *model.TableInfo) ([]int64, []string) { +// removePartitionAddingDefinitionsFromTableInfo remove the `addingDefinitions` in the tableInfo. +func removePartitionAddingDefinitionsFromTableInfo(tblInfo *model.TableInfo) ([]int64, []string) { physicalTableIDs := make([]int64, 0, len(tblInfo.Partition.AddingDefinitions)) partNames := make([]string, 0, len(tblInfo.Partition.AddingDefinitions)) for _, one := range tblInfo.Partition.AddingDefinitions { @@ -2164,7 +2168,10 @@ func (w *worker) rollbackLikeDropPartition(jobCtx *jobContext, job *model.Job) ( return ver, errors.Trace(err) } tblInfo.Partition.DroppingDefinitions = nil - physicalTableIDs, pNames := rollbackAddingPartitionInfo(tblInfo) + // Collect table/partition ids to clean up, through args.OldPhysicalTblIDs + // GC will later also drop matching Placement bundles. + // If we delete them now, it could lead to non-compliant placement or failure during flashback + physicalTableIDs, pNames := removePartitionAddingDefinitionsFromTableInfo(tblInfo) // TODO: Will this drop LabelRules for existing partitions, if the new partitions have the same name? err = dropLabelRules(w.ctx, job.SchemaName, tblInfo.Name.L, pNames) if err != nil { From 7fcdaf3869048b4411b68cbbee137a5e0e2b51d0 Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Mon, 25 Nov 2024 22:55:23 +0100 Subject: [PATCH 11/23] Updated check for bundle, to match PD --- pkg/ddl/BUILD.bazel | 1 + pkg/ddl/placement_policy_test.go | 211 +++++++++++++++++++++++ pkg/domain/infosync/placement_manager.go | 128 ++++++++++---- 3 files changed, 306 insertions(+), 34 deletions(-) diff --git a/pkg/ddl/BUILD.bazel b/pkg/ddl/BUILD.bazel index 9a0c65f0b34b4..f2b64d949cd9e 100644 --- a/pkg/ddl/BUILD.bazel +++ b/pkg/ddl/BUILD.bazel @@ -358,6 +358,7 @@ go_test( "@com_github_tikv_client_go_v2//testutils", "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_client_go_v2//util", + "@com_github_tikv_pd_client//http", "@io_etcd_go_etcd_client_v3//:client", "@io_etcd_go_etcd_tests_v3//integration", "@org_golang_google_grpc//:grpc", diff --git a/pkg/ddl/placement_policy_test.go b/pkg/ddl/placement_policy_test.go index c7b4614221b14..8da8e3a9b61a8 100644 --- a/pkg/ddl/placement_policy_test.go +++ b/pkg/ddl/placement_policy_test.go @@ -40,6 +40,7 @@ import ( "github.com/pingcap/tidb/pkg/testkit/external" "github.com/pingcap/tidb/pkg/testkit/testfailpoint" "github.com/stretchr/testify/require" + pd "github.com/tikv/pd/client/http" ) type bundleCheck struct { @@ -2640,3 +2641,213 @@ func TestAlterPartitioningWithPlacementPolicy(t *testing.T) { " PARTITION `pMax` VALUES LESS THAN (MAXVALUE) /*T![placement] PLACEMENT POLICY=`pp2` */)")) checkExistTableBundlesInPD(t, do, "test", "t1") } + +func TestCheckBundle(t *testing.T) { + type tc struct { + bundle *placement.Bundle + success bool + } + testCases := []tc{ + { + bundle: &placement.Bundle{ + ID: "TiDB_DDL_1", + Index: 1, + Override: false, + Rules: []*pd.Rule{ + { + GroupID: "TiDB_DDL_1", + ID: "TiDB_DDL_1", + Override: false, + StartKeyHex: "F0", + EndKeyHex: "F2", + Role: pd.Leader, + }, + { + GroupID: "TiDB_DDL_2", + ID: "TiDB_DDL_1", + Override: false, + StartKeyHex: "01", + EndKeyHex: "02", + Role: pd.Leader, + }, + }, + }, + success: true, + }, + { + bundle: &placement.Bundle{ + ID: "TiDB_DDL_1", + Index: 1, + Override: false, + Rules: []*pd.Rule{ + { + GroupID: "TiDB_DDL_1", + ID: "TiDB_DDL_1", + Override: false, + StartKeyHex: "01", + EndKeyHex: "0F", + Role: pd.Leader, + }, + { + GroupID: "TiDB_DDL_1", + ID: "TiDB_DDL_1", + Override: true, + StartKeyHex: "02", + EndKeyHex: "03", + Role: pd.Leader, + }, + { + GroupID: "TiDB_DDL_1", + ID: "TiDB_DDL_1", + Override: false, + StartKeyHex: "04", + EndKeyHex: "05", + Role: pd.Leader, + }, + }, + }, + // TODO: reconsider if we should make this test fail? + success: true, + }, + { + // What issue #55705 looked like, i.e. both partition and table had the same range. + bundle: &placement.Bundle{ + ID: "TiDB_DDL_112", + Index: 40, + Override: true, + Rules: []*pd.Rule{ + { + GroupID: "TiDB_DDL_112", + ID: "table_rule_112_0", + Index: 40, + StartKeyHex: "7480000000000000ff7000000000000000f8", + EndKeyHex: "7480000000000000ff7100000000000000f8", + Role: "leader", + }, + { + GroupID: "TiDB_DDL_112", + ID: "table_rule_112_1", + Index: 40, + StartKeyHex: "7480000000000000ff7000000000000000f8", + EndKeyHex: "7480000000000000ff7100000000000000f8", + Role: "voter", + }, + { + GroupID: "TiDB_DDL_112", + ID: "table_rule_112_2", + Index: 40, + StartKeyHex: "7480000000000000ff7000000000000000f8", + EndKeyHex: "7480000000000000ff7100000000000000f8", + Role: "voter", + }, + { + GroupID: "TiDB_DDL_112", + ID: "partition_rule_112_0", + Index: 80, + StartKeyHex: "7480000000000000ff7000000000000000f8", + EndKeyHex: "7480000000000000ff7100000000000000f8", + Role: "leader", + }, + { + GroupID: "TiDB_DDL_112", + ID: "partition_rule_112_1", + Index: 80, + StartKeyHex: "7480000000000000ff7000000000000000f8", + EndKeyHex: "7480000000000000ff7100000000000000f8", + Role: "voter", + }, + { + GroupID: "TiDB_DDL_112", + ID: "partition_rule_112_2", + Index: 80, + StartKeyHex: "7480000000000000ff7000000000000000f8", + EndKeyHex: "7480000000000000ff7100000000000000f8", + Role: "voter", + }, + { + GroupID: "TiDB_DDL_112", + ID: "partition_rule_115_0", + Index: 80, + StartKeyHex: "7480000000000000ff7300000000000000f8", + EndKeyHex: "7480000000000000ff7400000000000000f8", + Role: "leader", + }, + { + GroupID: "TiDB_DDL_112", + ID: "partition_rule_115_1", + Index: 80, + StartKeyHex: "7480000000000000ff7300000000000000f8", + EndKeyHex: "7480000000000000ff7400000000000000f8", + Role: "voter", + }, + { + GroupID: "TiDB_DDL_112", + ID: "partition_rule_115_2", + Index: 80, + StartKeyHex: "7480000000000000ff7300000000000000f8", + EndKeyHex: "7480000000000000ff7400000000000000f8", + Role: "voter", + }, + { + GroupID: "TiDB_DDL_112", + ID: "partition_rule_116_0", + Index: 80, + StartKeyHex: "7480000000000000ff7400000000000000f8", + EndKeyHex: "7480000000000000ff7500000000000000f8", + Role: "leader", + }, + { + GroupID: "TiDB_DDL_112", + ID: "partition_rule_116_1", + Index: 80, + StartKeyHex: "7480000000000000ff7400000000000000f8", + EndKeyHex: "7480000000000000ff7500000000000000f8", + Role: "voter", + }, + { + GroupID: "TiDB_DDL_112", + ID: "partition_rule_116_2", + Index: 80, + StartKeyHex: "7480000000000000ff7400000000000000f8", + EndKeyHex: "7480000000000000ff7500000000000000f8", + Role: "voter", + }, + { + GroupID: "TiDB_DDL_112", + ID: "partition_rule_117_0", + Index: 80, + StartKeyHex: "7480000000000000ff7500000000000000f8", + EndKeyHex: "7480000000000000ff7600000000000000f8", + Role: "voter", + }, + { + GroupID: "TiDB_DDL_112", + ID: "partition_rule_117_1", + Index: 80, + StartKeyHex: "7480000000000000ff7500000000000000f8", + EndKeyHex: "7480000000000000ff7600000000000000f8", + Role: "voter", + }, + { + GroupID: "TiDB_DDL_112", + ID: "partition_rule_117_2", + Index: 80, + StartKeyHex: "7480000000000000ff7500000000000000f8", + EndKeyHex: "7480000000000000ff7600000000000000f8", + Role: "voter", + }, + }, + }, + success: false, + }, + } + + for _, test := range testCases { + err := infosync.CheckBundle(test.bundle) + if test.success { + require.NoError(t, err) + } else { + require.Error(t, err) + } + } +} diff --git a/pkg/domain/infosync/placement_manager.go b/pkg/domain/infosync/placement_manager.go index a0330dfcedb52..7929bbdf28301 100644 --- a/pkg/domain/infosync/placement_manager.go +++ b/pkg/domain/infosync/placement_manager.go @@ -101,45 +101,65 @@ func (m *mockPlacementManager) GetAllRuleBundles(_ context.Context) ([]*placemen return bundles, nil } -func (m *mockPlacementManager) PutRuleBundles(_ context.Context, bundles []*placement.Bundle) error { - m.Lock() - defer m.Unlock() +type keyRange struct { + groupID string + start string + end string + id string + string int + override bool + isLeader bool +} - if m.bundles == nil { - m.bundles = make(map[string]*placement.Bundle) +// CheckBundle check that the rules don't overlap without explicit Override +// Exported for testing reasons. +// Tries to match prepareRulesForApply + checkApplyRules from pd. +// And additionally checks for key overlaps. +func CheckBundle(bundle *placement.Bundle) error { + keys := make([]keyRange, 0, len(bundle.Rules)) + for _, rule := range bundle.Rules { + keys = append(keys, keyRange{ + groupID: rule.GroupID, + id: rule.ID, + start: rule.StartKeyHex, + end: rule.EndKeyHex, + override: rule.Override, + isLeader: rule.Role == pd.Leader, + }) } - - rules := 0 - for _, bundle := range bundles { - if bundle.IsEmpty() { - delete(m.bundles, bundle.ID) - } else { - m.bundles[bundle.ID] = bundle - rules += len(bundle.Rules) + if len(keys) == 0 { + return fmt.Errorf(`ERROR 8243 (HY000): "[PD:placement:ErrBuildRuleList]build rule list failed, no rule left`) + } + // Skip overridden rules, but only within the bundle, not across groups + applyKeys := keys[:0] + j := 0 + for i := 1; i < len(keys); i++ { + if keys[i].groupID != keys[j].groupID { + // currently not checking if the group overrides all other groups! + applyKeys = append(applyKeys, keys[j:i]...) // save rules belong to previous groups + j = i } + if keys[i].override { + j = i // skip all previous rules in the same group + } + } + applyKeys = append(applyKeys, keys[j:]...) + if len(applyKeys) == 0 { + return fmt.Errorf(`ERROR 8243 (HY000): "[PD:placement:ErrBuildRuleList]build rule list failed, no rule left`) } - // Check that no bundles have leaders overlapping ranges - type keyRange struct { - start string - end string - id string - string int - override bool - } - keys := make([]keyRange, 0, rules) - for k := range m.bundles { - for _, rule := range m.bundles[k].Rules { - if rule.Role == pd.Leader { - keys = append(keys, keyRange{ - id: rule.ID, - start: rule.GroupID + ":" + rule.StartKeyHex, - end: rule.GroupID + ":" + rule.EndKeyHex, - override: rule.Override, - }) - } + // Additionally check for range overlapping leaders. + j = 0 + keys = keys[:0] + for i := 0; i < len(applyKeys); i++ { + if applyKeys[i].isLeader { + keys = append(keys, applyKeys[i]) } } + if len(keys) == 0 { + return nil + } + // Sort on Start (includes group_id), id, end sort.Slice(keys, func(i, j int) bool { if keys[i].start == keys[j].start { @@ -150,11 +170,51 @@ func (m *mockPlacementManager) PutRuleBundles(_ context.Context, bundles []*plac } return keys[i].start < keys[j].start }) + + prevEnd := keys[0].end for i := 1; i < len(keys); i++ { - if keys[i].start < keys[i-1].end && !keys[i].override { - return fmt.Errorf(`ERROR 8243 (HY000): "[PD:placement:ErrBuildRuleList]build rule list failed, multiple leader replicas for range {%s, %s}`, keys[i-1].start, keys[i].end) + if keys[i].start < prevEnd { + if !keys[i].override { + + return fmt.Errorf(`ERROR 8243 (HY000): "[PD:placement:ErrBuildRuleList]build rule list failed, multiple leader replicas for range {%s, %s}`, keys[i-1].start, keys[i].end) + } + continue + } + if keys[i].end > prevEnd { + prevEnd = keys[i].end } } return nil } + +func checkBundles(bundles map[string]*placement.Bundle) error { + // Check that no bundles have leaders overlapping ranges + for k := range bundles { + if err := CheckBundle(bundles[k]); err != nil { + return err + } + } + return nil +} + +func (m *mockPlacementManager) PutRuleBundles(_ context.Context, bundles []*placement.Bundle) error { + m.Lock() + defer m.Unlock() + + if m.bundles == nil { + m.bundles = make(map[string]*placement.Bundle) + } + + rules := 0 + for _, bundle := range bundles { + if bundle.IsEmpty() { + delete(m.bundles, bundle.ID) + } else { + m.bundles[bundle.ID] = bundle + rules += len(bundle.Rules) + } + } + + return checkBundles(m.bundles) +} From cb2bd2785720cd4dd30a00bad11c50108ef7751b Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Mon, 25 Nov 2024 23:02:52 +0100 Subject: [PATCH 12/23] Added enhancement issue reference --- pkg/ddl/placement_policy_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/ddl/placement_policy_test.go b/pkg/ddl/placement_policy_test.go index 8da8e3a9b61a8..e80b1f5ca01b8 100644 --- a/pkg/ddl/placement_policy_test.go +++ b/pkg/ddl/placement_policy_test.go @@ -2707,6 +2707,8 @@ func TestCheckBundle(t *testing.T) { }, }, // TODO: reconsider if we should make this test fail? + // https://github.com/pingcap/tidb/issues/57693 + // Currently the second rule overrides the first rule in whole, not just the overlapping range. success: true, }, { From f48003863ed4ea40956ca5311a9747e86d6b1bab Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Mon, 25 Nov 2024 23:21:44 +0100 Subject: [PATCH 13/23] Linting --- pkg/domain/infosync/placement_manager.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/domain/infosync/placement_manager.go b/pkg/domain/infosync/placement_manager.go index 7929bbdf28301..b4d54b17ef3d4 100644 --- a/pkg/domain/infosync/placement_manager.go +++ b/pkg/domain/infosync/placement_manager.go @@ -149,7 +149,6 @@ func CheckBundle(bundle *placement.Bundle) error { } // Additionally check for range overlapping leaders. - j = 0 keys = keys[:0] for i := 0; i < len(applyKeys); i++ { if applyKeys[i].isLeader { From 2a4642acbed862ad00e3fda260a56ef9dc2fa6dd Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Mon, 25 Nov 2024 23:29:10 +0100 Subject: [PATCH 14/23] Linting --- pkg/domain/infosync/placement_manager.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/domain/infosync/placement_manager.go b/pkg/domain/infosync/placement_manager.go index b4d54b17ef3d4..a99e32dc4afa6 100644 --- a/pkg/domain/infosync/placement_manager.go +++ b/pkg/domain/infosync/placement_manager.go @@ -159,7 +159,8 @@ func CheckBundle(bundle *placement.Bundle) error { return nil } - // Sort on Start (includes group_id), id, end + // Sort on Start, id, end + // Could use pd's placement.sortRules() instead. sort.Slice(keys, func(i, j int) bool { if keys[i].start == keys[j].start { if keys[i].id == keys[j].id { @@ -174,7 +175,6 @@ func CheckBundle(bundle *placement.Bundle) error { for i := 1; i < len(keys); i++ { if keys[i].start < prevEnd { if !keys[i].override { - return fmt.Errorf(`ERROR 8243 (HY000): "[PD:placement:ErrBuildRuleList]build rule list failed, multiple leader replicas for range {%s, %s}`, keys[i-1].start, keys[i].end) } continue @@ -183,7 +183,6 @@ func CheckBundle(bundle *placement.Bundle) error { prevEnd = keys[i].end } } - return nil } From 5d78361b6879d513b9fd4aeb0ef2506a3281e43e Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Tue, 26 Nov 2024 21:58:35 +0100 Subject: [PATCH 15/23] bazel_prepare --- pkg/ddl/tests/partition/BUILD.bazel | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/ddl/tests/partition/BUILD.bazel b/pkg/ddl/tests/partition/BUILD.bazel index a68e93e73adb0..477577df5752b 100644 --- a/pkg/ddl/tests/partition/BUILD.bazel +++ b/pkg/ddl/tests/partition/BUILD.bazel @@ -22,6 +22,7 @@ go_test( "//pkg/domain", "//pkg/domain/infosync", "//pkg/errno", + "//pkg/expression", "//pkg/kv", "//pkg/meta/model", "//pkg/parser/ast", From acee32ed198f44759ac4d3ea48cf7a6a543ce89e Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Tue, 26 Nov 2024 22:39:42 +0100 Subject: [PATCH 16/23] Simplified CheckBundle in mockPlacementManager --- pkg/ddl/placement_policy_test.go | 2 +- pkg/domain/infosync/placement_manager.go | 20 ++++++-------------- 2 files changed, 7 insertions(+), 15 deletions(-) diff --git a/pkg/ddl/placement_policy_test.go b/pkg/ddl/placement_policy_test.go index 5411098564f4c..ae539cedf4e84 100644 --- a/pkg/ddl/placement_policy_test.go +++ b/pkg/ddl/placement_policy_test.go @@ -2664,7 +2664,7 @@ func TestCheckBundle(t *testing.T) { Role: pd.Leader, }, { - GroupID: "TiDB_DDL_2", + GroupID: "TiDB_DDL_1", ID: "TiDB_DDL_1", Override: false, StartKeyHex: "01", diff --git a/pkg/domain/infosync/placement_manager.go b/pkg/domain/infosync/placement_manager.go index a99e32dc4afa6..1f70a640d4022 100644 --- a/pkg/domain/infosync/placement_manager.go +++ b/pkg/domain/infosync/placement_manager.go @@ -118,6 +118,9 @@ type keyRange struct { func CheckBundle(bundle *placement.Bundle) error { keys := make([]keyRange, 0, len(bundle.Rules)) for _, rule := range bundle.Rules { + if rule.GroupID != bundle.ID { + return fmt.Errorf("rule group id %s is not same as bundle id %s", rule.GroupID, bundle.ID) + } keys = append(keys, keyRange{ groupID: rule.GroupID, id: rule.ID, @@ -134,13 +137,9 @@ func CheckBundle(bundle *placement.Bundle) error { applyKeys := keys[:0] j := 0 for i := 1; i < len(keys); i++ { - if keys[i].groupID != keys[j].groupID { - // currently not checking if the group overrides all other groups! - applyKeys = append(applyKeys, keys[j:i]...) // save rules belong to previous groups - j = i - } if keys[i].override { j = i // skip all previous rules in the same group + // TODO: Should we only override the matching key range? } } applyKeys = append(applyKeys, keys[j:]...) @@ -171,16 +170,9 @@ func CheckBundle(bundle *placement.Bundle) error { return keys[i].start < keys[j].start }) - prevEnd := keys[0].end for i := 1; i < len(keys); i++ { - if keys[i].start < prevEnd { - if !keys[i].override { - return fmt.Errorf(`ERROR 8243 (HY000): "[PD:placement:ErrBuildRuleList]build rule list failed, multiple leader replicas for range {%s, %s}`, keys[i-1].start, keys[i].end) - } - continue - } - if keys[i].end > prevEnd { - prevEnd = keys[i].end + if keys[i].start < keys[i-1].end { + return fmt.Errorf(`ERROR 8243 (HY000): "[PD:placement:ErrBuildRuleList]build rule list failed, multiple leader replicas for range {%s, %s}`, keys[i-1].start, keys[i].end) } } return nil From ee5cf491f2e2b9f271b6fe8fa57fc38a218500cc Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Tue, 26 Nov 2024 22:42:25 +0100 Subject: [PATCH 17/23] Linting --- pkg/domain/infosync/placement_manager.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/domain/infosync/placement_manager.go b/pkg/domain/infosync/placement_manager.go index 1f70a640d4022..3697cda838eb5 100644 --- a/pkg/domain/infosync/placement_manager.go +++ b/pkg/domain/infosync/placement_manager.go @@ -196,13 +196,11 @@ func (m *mockPlacementManager) PutRuleBundles(_ context.Context, bundles []*plac m.bundles = make(map[string]*placement.Bundle) } - rules := 0 for _, bundle := range bundles { if bundle.IsEmpty() { delete(m.bundles, bundle.ID) } else { m.bundles[bundle.ID] = bundle - rules += len(bundle.Rules) } } From 05a6a149c281e0084fdae859580f8cca83cc42cf Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Tue, 26 Nov 2024 23:11:40 +0100 Subject: [PATCH 18/23] Eased the CheckBundle to pass more test. --- pkg/domain/infosync/placement_manager.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/domain/infosync/placement_manager.go b/pkg/domain/infosync/placement_manager.go index 3697cda838eb5..0c0a36979d42b 100644 --- a/pkg/domain/infosync/placement_manager.go +++ b/pkg/domain/infosync/placement_manager.go @@ -119,7 +119,8 @@ func CheckBundle(bundle *placement.Bundle) error { keys := make([]keyRange, 0, len(bundle.Rules)) for _, rule := range bundle.Rules { if rule.GroupID != bundle.ID { - return fmt.Errorf("rule group id %s is not same as bundle id %s", rule.GroupID, bundle.ID) + // TODO: also check cross IDs + continue } keys = append(keys, keyRange{ groupID: rule.GroupID, @@ -131,7 +132,7 @@ func CheckBundle(bundle *placement.Bundle) error { }) } if len(keys) == 0 { - return fmt.Errorf(`ERROR 8243 (HY000): "[PD:placement:ErrBuildRuleList]build rule list failed, no rule left`) + return nil } // Skip overridden rules, but only within the bundle, not across groups applyKeys := keys[:0] From b25fe4aeb931e1e1be0b83f2d68180356dcbe7d8 Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Thu, 28 Nov 2024 10:09:20 +0200 Subject: [PATCH 19/23] Simplified CheckBundle --- pkg/domain/infosync/placement_manager.go | 45 +++++------------------- 1 file changed, 9 insertions(+), 36 deletions(-) diff --git a/pkg/domain/infosync/placement_manager.go b/pkg/domain/infosync/placement_manager.go index 0c0a36979d42b..4e495784b40a2 100644 --- a/pkg/domain/infosync/placement_manager.go +++ b/pkg/domain/infosync/placement_manager.go @@ -122,51 +122,24 @@ func CheckBundle(bundle *placement.Bundle) error { // TODO: also check cross IDs continue } - keys = append(keys, keyRange{ - groupID: rule.GroupID, - id: rule.ID, - start: rule.StartKeyHex, - end: rule.EndKeyHex, - override: rule.Override, - isLeader: rule.Role == pd.Leader, - }) - } - if len(keys) == 0 { - return nil - } - // Skip overridden rules, but only within the bundle, not across groups - applyKeys := keys[:0] - j := 0 - for i := 1; i < len(keys); i++ { - if keys[i].override { - j = i // skip all previous rules in the same group - // TODO: Should we only override the matching key range? - } - } - applyKeys = append(applyKeys, keys[j:]...) - if len(applyKeys) == 0 { - return fmt.Errorf(`ERROR 8243 (HY000): "[PD:placement:ErrBuildRuleList]build rule list failed, no rule left`) - } - - // Additionally check for range overlapping leaders. - keys = keys[:0] - for i := 0; i < len(applyKeys); i++ { - if applyKeys[i].isLeader { - keys = append(keys, applyKeys[i]) + if rule.Role == pd.Leader { + if rule.Override { + keys = keys[:0] + } + keys = append(keys, keyRange{ + start: rule.StartKeyHex, + end: rule.EndKeyHex, + }) } } if len(keys) == 0 { return nil } - // Sort on Start, id, end // Could use pd's placement.sortRules() instead. sort.Slice(keys, func(i, j int) bool { if keys[i].start == keys[j].start { - if keys[i].id == keys[j].id { - return keys[i].end < keys[j].end - } - return keys[i].id < keys[j].id + return keys[i].end < keys[j].end } return keys[i].start < keys[j].start }) From 1c43fbe51475238f0398e16268cc6f29eb64435e Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Thu, 28 Nov 2024 10:30:21 +0200 Subject: [PATCH 20/23] Removed non-needed struct variables for check --- pkg/domain/infosync/placement_manager.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/pkg/domain/infosync/placement_manager.go b/pkg/domain/infosync/placement_manager.go index 4e495784b40a2..b8e061601f230 100644 --- a/pkg/domain/infosync/placement_manager.go +++ b/pkg/domain/infosync/placement_manager.go @@ -102,13 +102,8 @@ func (m *mockPlacementManager) GetAllRuleBundles(_ context.Context) ([]*placemen } type keyRange struct { - groupID string - start string - end string - id string - string int - override bool - isLeader bool + start string + end string } // CheckBundle check that the rules don't overlap without explicit Override From 17dfd76edca2172ac95d1bf55934710047bd73b3 Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Thu, 28 Nov 2024 10:46:38 +0200 Subject: [PATCH 21/23] Removed old comment --- pkg/domain/infosync/placement_manager.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/domain/infosync/placement_manager.go b/pkg/domain/infosync/placement_manager.go index b8e061601f230..5af5299319eca 100644 --- a/pkg/domain/infosync/placement_manager.go +++ b/pkg/domain/infosync/placement_manager.go @@ -130,7 +130,6 @@ func CheckBundle(bundle *placement.Bundle) error { if len(keys) == 0 { return nil } - // Sort on Start, id, end // Could use pd's placement.sortRules() instead. sort.Slice(keys, func(i, j int) bool { if keys[i].start == keys[j].start { From fd74392e471e7542162bbe778c38327b3951293d Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Thu, 28 Nov 2024 11:13:06 +0200 Subject: [PATCH 22/23] Added more comments and simplified CheckBundle further --- pkg/domain/infosync/placement_manager.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/pkg/domain/infosync/placement_manager.go b/pkg/domain/infosync/placement_manager.go index 5af5299319eca..1a9d20ee1bf9b 100644 --- a/pkg/domain/infosync/placement_manager.go +++ b/pkg/domain/infosync/placement_manager.go @@ -108,17 +108,16 @@ type keyRange struct { // CheckBundle check that the rules don't overlap without explicit Override // Exported for testing reasons. -// Tries to match prepareRulesForApply + checkApplyRules from pd. +// Tries to be a simpler version of PDs +// prepareRulesForApply + checkApplyRules. // And additionally checks for key overlaps. func CheckBundle(bundle *placement.Bundle) error { keys := make([]keyRange, 0, len(bundle.Rules)) for _, rule := range bundle.Rules { - if rule.GroupID != bundle.ID { - // TODO: also check cross IDs - continue - } if rule.Role == pd.Leader { if rule.Override { + // PD would override the previous rules, + // not only the overlapping key ranges. keys = keys[:0] } keys = append(keys, keyRange{ From c6e1ad0ddf8ef1ebddca293fdf2c685cccfcaf39 Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Thu, 28 Nov 2024 11:16:18 +0200 Subject: [PATCH 23/23] Removed confusing test. --- pkg/ddl/placement_policy_test.go | 37 -------------------------------- 1 file changed, 37 deletions(-) diff --git a/pkg/ddl/placement_policy_test.go b/pkg/ddl/placement_policy_test.go index ae539cedf4e84..28d8a9fc0bb25 100644 --- a/pkg/ddl/placement_policy_test.go +++ b/pkg/ddl/placement_policy_test.go @@ -2675,43 +2675,6 @@ func TestCheckBundle(t *testing.T) { }, success: true, }, - { - bundle: &placement.Bundle{ - ID: "TiDB_DDL_1", - Index: 1, - Override: false, - Rules: []*pd.Rule{ - { - GroupID: "TiDB_DDL_1", - ID: "TiDB_DDL_1", - Override: false, - StartKeyHex: "01", - EndKeyHex: "0F", - Role: pd.Leader, - }, - { - GroupID: "TiDB_DDL_1", - ID: "TiDB_DDL_1", - Override: true, - StartKeyHex: "02", - EndKeyHex: "03", - Role: pd.Leader, - }, - { - GroupID: "TiDB_DDL_1", - ID: "TiDB_DDL_1", - Override: false, - StartKeyHex: "04", - EndKeyHex: "05", - Role: pd.Leader, - }, - }, - }, - // TODO: reconsider if we should make this test fail? - // https://github.com/pingcap/tidb/issues/57693 - // Currently the second rule overrides the first rule in whole, not just the overlapping range. - success: true, - }, { // What issue #55705 looked like, i.e. both partition and table had the same range. bundle: &placement.Bundle{