Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: Fixed partitioning a non-partitioned table with placement rules #57560

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
107 changes: 55 additions & 52 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,17 +159,12 @@ func (w *worker) onAddTablePartition(jobCtx *jobContext, job *model.Job) (ver in
}
}

bundles, err := alterTablePartitionBundles(jobCtx.metaMut, tblInfo, tblInfo.Partition.AddingDefinitions)
_, err = alterTablePartitionBundles(jobCtx.metaMut, tblInfo, tblInfo.Partition.AddingDefinitions)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

if err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}

ids := getIDs([]*model.TableInfo{tblInfo})
for _, p := range tblInfo.Partition.AddingDefinitions {
ids = append(ids, p.ID)
Expand Down Expand Up @@ -270,33 +265,48 @@ func alterTableLabelRule(schemaName string, meta *model.TableInfo, ids []int64)
return false, nil
}

func alterTablePartitionBundles(t *meta.Mutator, tblInfo *model.TableInfo, addingDefinitions []model.PartitionDefinition) ([]*placement.Bundle, error) {
var bundles []*placement.Bundle
func alterTablePartitionBundles(t *meta.Mutator, tblInfo *model.TableInfo, addDefs []model.PartitionDefinition) (bool, error) {
// We want to achieve:
// - before we do any reorganization/write to new partitions/global indexes that the placement rules are in-place
// - not removing any placement rules for removed partitions
// So we will:
// 1) First write the new bundles including both new and old partitions,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the new bundles should include old partitions? The old bundles have already defined these rules and still take effects here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, so if the Bundle is overwritten, the older bundle rules will still be in effect?
Like if the first bundle had rules for table id 111, part id 112 and 113, and the new bundle would have table id 111, partition id 113 and 114, would part id 112 still be placed as the old bundle rules or would it be placed without any rules?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or would it be placed without any rules?

This, I believe.

// it will filter the rules depends on the interval index override in the same group or the
// group-index override between different groups
// For example, given rules:
// ruleA: group_id: 4, id: 2, override: true
// ruleB: group_id: 4, id: 1, override: true
// ruleC: group_id: 3
// ruleD: group_id: 2
// RuleGroupA: id:4, override: false
// RuleGroupB: id:3, override: true
// RuleGroupC: id:2, override: false
// Finally only ruleA and ruleC will be selected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A better example:

CREATE TABLE t (a int) partition by range (a) (partition p0 values less than (1000000), partition p1M values less than (2000000)) placement policy pp1;
ALTER TABLE t REORGANIZE PARTITION p1M INTO (PARTITION p1M values less than (2000000), partition p2M values less than (3000000));

During the DDL we would still want the original p1M to follow the table level rule, so we keep it in the bundle.
When the final bundles will be created it will not be included.
Since the table keeps its table id (for REORGANIZE PARTITION) there are no other bundles that covers the inherited table's placement rules for the original partition p1M, which means we need to cover it during the time of the DDL, since it can still be accessed (and double written).

Another alternative would be to create partition level bundles for the old replaced partitions, and let them be removed by GC later, but I think that would qualify for a separate PR.

// EXCEPT if the old partition is in fact a table, then skip that partition
// 2) Then overwrite the bundles with the final partitioning scheme (second call in onReorg/

// tblInfo do not include added partitions, so we should add them first
tblInfo = tblInfo.Clone()
p := *tblInfo.Partition
p.Definitions = append([]model.PartitionDefinition{}, p.Definitions...)
p.Definitions = append(tblInfo.Partition.Definitions, addingDefinitions...)
tblInfo.Partition = &p
p := tblInfo.Partition
if p != nil {
// skip the original table as partition if partitioning a non-partitioned table
if p.Definitions[0].ID != tblInfo.ID {
mjonss marked this conversation as resolved.
Show resolved Hide resolved
// prepend with existing partitions
addDefs = append(p.Definitions, addDefs...)
}
p.Definitions = addDefs
}

// bundle for table should be recomputed because it includes some default configs for partitions
tblBundle, err := placement.NewTableBundle(t, tblInfo)
if err != nil {
return nil, errors.Trace(err)
return false, errors.Trace(err)
}

var bundles []*placement.Bundle
if tblBundle != nil {
bundles = append(bundles, tblBundle)
}

partitionBundles, err := placement.NewPartitionListBundles(t, addingDefinitions)
partitionBundles, err := placement.NewPartitionListBundles(t, addDefs)
if err != nil {
return nil, errors.Trace(err)
return false, errors.Trace(err)
}

bundles = append(bundles, partitionBundles...)
return bundles, nil

if len(bundles) > 0 {
return true, infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles)
}
return false, nil
}

// When drop/truncate a partition, we should still keep the dropped partition's placement settings to avoid unnecessary region schedules.
Expand Down Expand Up @@ -337,19 +347,15 @@ func updateAddingPartitionInfo(partitionInfo *model.PartitionInfo, tblInfo *mode
}

// rollbackAddingPartitionInfo remove the `addingDefinitions` in the tableInfo.
func rollbackAddingPartitionInfo(tblInfo *model.TableInfo) ([]int64, []string, []*placement.Bundle) {
func rollbackAddingPartitionInfo(tblInfo *model.TableInfo) ([]int64, []string) {
mjonss marked this conversation as resolved.
Show resolved Hide resolved
physicalTableIDs := make([]int64, 0, len(tblInfo.Partition.AddingDefinitions))
partNames := make([]string, 0, len(tblInfo.Partition.AddingDefinitions))
rollbackBundles := make([]*placement.Bundle, 0, len(tblInfo.Partition.AddingDefinitions))
for _, one := range tblInfo.Partition.AddingDefinitions {
physicalTableIDs = append(physicalTableIDs, one.ID)
partNames = append(partNames, one.Name.L)
if one.PlacementPolicyRef != nil {
rollbackBundles = append(rollbackBundles, placement.NewBundle(one.ID))
}
}
tblInfo.Partition.AddingDefinitions = nil
return physicalTableIDs, partNames, rollbackBundles
return physicalTableIDs, partNames
}

// checkAddPartitionValue check add Partition Values,
Expand Down Expand Up @@ -2147,20 +2153,18 @@ func dropLabelRules(ctx context.Context, schemaName, tableName string, partNames
// It will drop newly created partitions that has not yet been used, including cleaning
// up label rules and bundles as well as changed indexes due to global flag.
func (w *worker) rollbackLikeDropPartition(jobCtx *jobContext, job *model.Job) (ver int64, _ error) {
args := jobCtx.jobArgs.(*model.TablePartitionArgs)
args, err := model.GetTablePartitionArgs(job)
if err != nil {
return ver, errors.Trace(err)
}
partInfo := args.PartInfo
metaMut := jobCtx.metaMut
tblInfo, err := GetTableInfoAndCancelFaultJob(metaMut, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}
tblInfo.Partition.DroppingDefinitions = nil
physicalTableIDs, pNames, rollbackBundles := rollbackAddingPartitionInfo(tblInfo)
err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), rollbackBundles)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}
physicalTableIDs, pNames := rollbackAddingPartitionInfo(tblInfo)
mjonss marked this conversation as resolved.
Show resolved Hide resolved
// TODO: Will this drop LabelRules for existing partitions, if the new partitions have the same name?
err = dropLabelRules(w.ctx, job.SchemaName, tblInfo.Name.L, pNames)
if err != nil {
Expand All @@ -2175,7 +2179,9 @@ func (w *worker) rollbackLikeDropPartition(jobCtx *jobContext, job *model.Job) (
if partInfo.Type != pmodel.PartitionTypeNone {
// ALTER TABLE ... PARTITION BY
// Also remove anything with the new table id
physicalTableIDs = append(physicalTableIDs, partInfo.NewTableID)
if partInfo.NewTableID != 0 {
physicalTableIDs = append(physicalTableIDs, partInfo.NewTableID)
}
// Reset if it was normal table before
if tblInfo.Partition.Type == pmodel.PartitionTypeNone ||
tblInfo.Partition.DDLType == pmodel.PartitionTypeNone {
Expand All @@ -2198,6 +2204,11 @@ func (w *worker) rollbackLikeDropPartition(jobCtx *jobContext, job *model.Job) (
tblInfo.Partition.ClearReorgIntermediateInfo()
}

_, err = alterTablePartitionBundles(metaMut, tblInfo, nil)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
Expand Down Expand Up @@ -3158,11 +3169,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
jobCtx.jobArgs = args
// Handle the rolling back job
if job.IsRollingback() {
ver, err := w.rollbackLikeDropPartition(jobCtx, job)
if err != nil {
return ver, errors.Trace(err)
}
return ver, nil
return w.rollbackLikeDropPartition(jobCtx, job)
}

tblInfo, partNames, partInfo, _, addingDefinitions, err := getReorgPartitionInfo(jobCtx.metaMut, job, args)
Expand All @@ -3180,6 +3187,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
// The partInfo may have been checked against an older schema version for example.
// If the check is done here, it does not need to be repeated, since no other
// DDL on the same table can be run concurrently.
tblInfo.Partition.DDLAction = job.Type
num := len(partInfo.Definitions) - len(partNames) + len(tblInfo.Partition.Definitions)
err = checkAddPartitionTooManyPartitions(uint64(num))
if err != nil {
Expand Down Expand Up @@ -3307,31 +3315,21 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
// In the next step, StateDeleteOnly, wait to verify the TiFlash replicas are OK
}

bundles, err := alterTablePartitionBundles(metaMut, tblInfo, tblInfo.Partition.AddingDefinitions)
changed, err := alterTablePartitionBundles(metaMut, tblInfo, tblInfo.Partition.AddingDefinitions)
if err != nil {
if !changesMade {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
return rollbackReorganizePartitionWithErr(jobCtx, job, err)
}

if len(bundles) > 0 {
if err = infosync.PutRuleBundlesWithDefaultRetry(context.TODO(), bundles); err != nil {
if !changesMade {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}
return rollbackReorganizePartitionWithErr(jobCtx, job, err)
}
changesMade = true
}
changesMade = changesMade || changed
lcwangchao marked this conversation as resolved.
Show resolved Hide resolved

ids := getIDs([]*model.TableInfo{tblInfo})
for _, p := range tblInfo.Partition.AddingDefinitions {
ids = append(ids, p.ID)
}
changed, err := alterTableLabelRule(job.SchemaName, tblInfo, ids)
changed, err = alterTableLabelRule(job.SchemaName, tblInfo, ids)
changesMade = changesMade || changed
if err != nil {
if !changesMade {
Expand All @@ -3357,7 +3355,6 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String()).Set(0.1 / float64(math.MaxUint64))
job.SchemaState = model.StateDeleteOnly
tblInfo.Partition.DDLState = job.SchemaState
tblInfo.Partition.DDLAction = job.Type
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
Expand Down Expand Up @@ -3574,7 +3571,6 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
// REMOVE PARTITIONING
// Storing the old table ID, used for updating statistics.
oldTblID = tblInfo.ID
// TODO: Handle bundles?
// TODO: Add concurrent test!
// TODO: Will this result in big gaps?
// TODO: How to carrie over AUTO_INCREMENT etc.?
Expand All @@ -3588,7 +3584,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
return ver, errors.Trace(err)
}
tblInfo.ID = partInfo.NewTableID
if partInfo.DDLType != pmodel.PartitionTypeNone {
if oldTblID != physicalTableIDs[0] {
// if partitioned before, then also add the old table ID,
// otherwise it will be the already included first partition
physicalTableIDs = append(physicalTableIDs, oldTblID)
Expand All @@ -3614,6 +3610,13 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
return ver, errors.Trace(err)
}
}

// We need to update the Placement rule bundles with the final partitions.
_, err = alterTablePartitionBundles(metaMut, tblInfo, nil)
if err != nil {
return ver, err
}

failpoint.Inject("reorgPartFail5", func(val failpoint.Value) {
if val.(bool) {
job.ErrorCount += variable.GetDDLErrorCountLimit() / 2
Expand Down
Loading