Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: add table filter for log restore #57394

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,6 @@ func BuildBackupSchemas(
if err != nil {
return errors.Trace(err)
}

for _, dbInfo := range dbs {
// skip system databases
if !tableFilter.MatchSchema(dbInfo.Name.O) || util.IsMemDB(dbInfo.Name.L) || utils.IsTemplateSysDB(dbInfo.Name) {
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/checkpoint/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,12 @@ func TestCheckpointMetaForRestore(t *testing.T) {
exists := checkpoint.ExistsCheckpointProgress(ctx, dom)
require.False(t, exists)
err = checkpoint.SaveCheckpointProgress(ctx, se, &checkpoint.CheckpointProgress{
Progress: checkpoint.InLogRestoreAndIdMapPersist,
Progress: checkpoint.InLogRestoreAndIdMapPersisted,
})
require.NoError(t, err)
progress, err := checkpoint.LoadCheckpointProgress(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor())
require.NoError(t, err)
require.Equal(t, checkpoint.InLogRestoreAndIdMapPersist, progress.Progress)
require.Equal(t, checkpoint.InLogRestoreAndIdMapPersisted, progress.Progress)

taskInfo, err := checkpoint.TryToGetCheckpointTaskInfo(ctx, s.Mock.Domain, se.GetSessionCtx().GetRestrictedSQLExecutor())
require.NoError(t, err)
Expand All @@ -120,7 +120,7 @@ func TestCheckpointMetaForRestore(t *testing.T) {
require.Equal(t, uint64(333), taskInfo.Metadata.RewriteTS)
require.Equal(t, "1.0", taskInfo.Metadata.GcRatio)
require.Equal(t, true, taskInfo.HasSnapshotMetadata)
require.Equal(t, checkpoint.InLogRestoreAndIdMapPersist, taskInfo.Progress)
require.Equal(t, checkpoint.InLogRestoreAndIdMapPersisted, taskInfo.Progress)

exists = checkpoint.ExistsCheckpointIngestIndexRepairSQLs(ctx, dom)
require.False(t, exists)
Expand Down
29 changes: 16 additions & 13 deletions br/pkg/checkpoint/log_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,16 @@ func StartCheckpointLogRestoreRunnerForTest(
return runner, nil
}

// Notice that the session is owned by the checkpoint runner, and it will be also closed by it.
func StartCheckpointRunnerForLogRestore(
ctx context.Context,
se glue.Session,
createSessionFn func() (glue.Session, error),
) (*CheckpointRunner[LogRestoreKeyType, LogRestoreValueType], error) {
session, err := createSessionFn()
if err != nil {
return nil, errors.Trace(err)
}
runner := newCheckpointRunner[LogRestoreKeyType, LogRestoreValueType](
newTableCheckpointStorage(se, LogRestoreCheckpointDatabaseName),
newTableCheckpointStorage(session, LogRestoreCheckpointDatabaseName),
nil, valueMarshalerForLogRestore)

// for restore, no need to set lock
Expand Down Expand Up @@ -205,22 +208,22 @@ func ExistsLogRestoreCheckpointMetadata(
TableExists(pmodel.NewCIStr(LogRestoreCheckpointDatabaseName), pmodel.NewCIStr(checkpointMetaTableName))
}

// A progress type for snapshot + log restore.
// RestoreProgress is a progress type for snapshot + log restore.
//
// Before the id-maps is persist into external storage, the snapshot restore and
// id-maps constructure can be repeated. So if the progress is in `InSnapshotRestore`,
// Before the id-maps is persisted into external storage, the snapshot restore and
// id-maps building can be retried. So if the progress is in `InSnapshotRestore`,
// it can retry from snapshot restore.
//
// After the id-maps is persist into external storage, there are some meta-kvs has
// been restored into the cluster, such as `rename ddl`. Where would be a situation:
// After the id-maps is persisted into external storage, there are some meta-kvs has
// been restored into the cluster, such as `rename ddl`. A situation could be:
//
// the first execution:
//
// table A created in snapshot restore is renamed to table B in log restore
// table A (id 80) --------------> table B (id 80)
// ( snapshot restore ) ( log restore )
//
// the second execution if don't skip snasphot restore:
// the second execution if don't skip snapshot restore:
//
// table A is created again in snapshot restore, because there is no table named A
// table A (id 81) --------------> [not in id-maps, so ignored]
Expand All @@ -232,8 +235,8 @@ type RestoreProgress int

const (
InSnapshotRestore RestoreProgress = iota
// Only when the id-maps is persist, status turns into it.
InLogRestoreAndIdMapPersist
// Only when the id-maps is persisted, status turns into it.
InLogRestoreAndIdMapPersisted
)

type CheckpointProgress struct {
Expand Down Expand Up @@ -265,8 +268,8 @@ func ExistsCheckpointProgress(
TableExists(pmodel.NewCIStr(LogRestoreCheckpointDatabaseName), pmodel.NewCIStr(checkpointProgressTableName))
}

// CheckpointTaskInfo is unique information within the same cluster id. It represents the last
// restore task executed for this cluster.
// CheckpointTaskInfoForLogRestore is tied to a specific cluster.
// It represents the last restore task executed in this cluster.
type CheckpointTaskInfoForLogRestore struct {
Metadata *CheckpointMetadataForLogRestore
HasSnapshotMetadata bool
Expand Down
9 changes: 6 additions & 3 deletions br/pkg/checkpoint/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,16 @@ func StartCheckpointRestoreRunnerForTest(
return runner, nil
}

// Notice that the session is owned by the checkpoint runner, and it will be also closed by it.
func StartCheckpointRunnerForRestore(
ctx context.Context,
se glue.Session,
createSessionFn func() (glue.Session, error),
) (*CheckpointRunner[RestoreKeyType, RestoreValueType], error) {
session, err := createSessionFn()
if err != nil {
return nil, errors.Trace(err)
}
runner := newCheckpointRunner[RestoreKeyType, RestoreValueType](
newTableCheckpointStorage(se, SnapshotRestoreCheckpointDatabaseName),
newTableCheckpointStorage(session, SnapshotRestoreCheckpointDatabaseName),
nil, valueMarshalerForRestore)

// for restore, no need to set lock
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/import_mode_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (switcher *ImportModeSwitcher) SwitchToImportMode(
}()
}

// RestorePreWork executes some prepare work before restore.
// RestorePreWork switches to import mode and removes pd schedulers if needed
// TODO make this function returns a restore post work.
func RestorePreWork(
ctx context.Context,
Expand Down
4 changes: 3 additions & 1 deletion br/pkg/restore/log_client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "log_client",
srcs = [
"batch_file_processor.go",
"client.go",
"compacted_file_strategy.go",
"import.go",
Expand Down Expand Up @@ -36,6 +37,7 @@ go_library(
"//br/pkg/stream",
"//br/pkg/summary",
"//br/pkg/utils",
"//br/pkg/utils/consts",
"//br/pkg/utils/iter",
"//br/pkg/version",
"//pkg/ddl/util",
Expand Down Expand Up @@ -103,6 +105,7 @@ go_test(
"//br/pkg/storage",
"//br/pkg/stream",
"//br/pkg/utils",
"//br/pkg/utils/consts",
"//br/pkg/utils/iter",
"//br/pkg/utiltest",
"//pkg/domain",
Expand All @@ -117,7 +120,6 @@ go_test(
"//pkg/util/chunk",
"//pkg/util/codec",
"//pkg/util/sqlexec",
"//pkg/util/table-filter",
"@com_github_docker_go_units//:go-units",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
Expand Down
131 changes: 131 additions & 0 deletions br/pkg/restore/log_client/batch_file_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package logclient

import (
"context"
"encoding/json"

"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/tidb/br/pkg/stream"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/utils/consts"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/meta/model"
)

// BatchFileProcessor defines how to process a batch of files
type BatchFileProcessor interface {
// ProcessBatch processes a batch of files and with a filterTS and return what's not processed for next iteration
ProcessBatch(
ctx context.Context,
files []*backuppb.DataFileInfo,
entries []*KvEntryWithTS,
filterTS uint64,
cf string,
) ([]*KvEntryWithTS, error)
}

// RestoreProcessor implements BatchFileProcessor for restoring files
type RestoreProcessor struct {
client *LogClient
schemasReplace *stream.SchemasReplace
updateStats func(kvCount uint64, size uint64)
progressInc func()
}

func (rp *RestoreProcessor) ProcessBatch(
ctx context.Context,
files []*backuppb.DataFileInfo,
entries []*KvEntryWithTS,
filterTS uint64,
cf string,
) ([]*KvEntryWithTS, error) {
return rp.client.RestoreBatchMetaKVFiles(
ctx, files, rp.schemasReplace, entries,
filterTS, rp.updateStats, rp.progressInc, cf,
)
}

// DDLCollector implements BatchFileProcessor for collecting DDL information
// 1. It collects table renaming information. The table rename operation will not change the table id, and the process
// will drop the original table and create a new one with the same table id, so in DDL history there will be two events
// that corresponds to the same table id.
//
// add more logic in future if needed
type DDLCollector struct {
client *LogClient
tableRenameInfo *stream.LogBackupTableHistory
}

func (dc *DDLCollector) ProcessBatch(
ctx context.Context,
files []*backuppb.DataFileInfo,
entries []*KvEntryWithTS,
filterTS uint64,
cf string,
) ([]*KvEntryWithTS, error) {
// doesn't need to parse writeCF as it contains value like "p\XXXX\XXX" which is meaningless.
// DefaultCF value should contain everything we want for DDL operation
if cf == consts.WriteCF {
return nil, nil
}

curSortedEntries, filteredEntries, err := dc.client.filterAndSortKvEntriesFromFiles(ctx, files, entries, filterTS)
if err != nil {
return nil, errors.Trace(err)
}

// process entries to collect table IDs
for _, entry := range curSortedEntries {
value := entry.E.Value

if utils.IsMetaDBKey(entry.E.Key) {
rawKey, err := stream.ParseTxnMetaKeyFrom(entry.E.Key)
if err != nil {
return nil, errors.Trace(err)
}

// collect db id -> name mapping during log backup, it will contain information about newly created db
if meta.IsDBkey(rawKey.Field) {
var dbInfo model.DBInfo
if err := json.Unmarshal(value, &dbInfo); err != nil {
return nil, errors.Trace(err)
}
dc.tableRenameInfo.RecordDBIdToName(dbInfo.ID, dbInfo.Name.O)
} else if !meta.IsDBkey(rawKey.Key) {
// also see RewriteMetaKvEntry
continue
}

// collect table history indexed by table id, same id may have different table names in history
if meta.IsTableKey(rawKey.Field) {
var tableInfo model.TableInfo
if err := json.Unmarshal(value, &tableInfo); err != nil {
return nil, errors.Trace(err)
}
// cannot use dbib in the parsed table info cuz it might not set so default to 0
dbID, err := meta.ParseDBKey(rawKey.Key)
if err != nil {
return nil, errors.Trace(err)
}

dc.tableRenameInfo.AddTableHistory(tableInfo.ID, tableInfo.Name.String(), dbID)
}
}
}
return filteredEntries, nil
}
Loading