Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
Signed-off-by: Wenqi Mou <[email protected]>
  • Loading branch information
Tristan1900 committed Nov 22, 2024
1 parent 49c3eba commit 8ef978b
Show file tree
Hide file tree
Showing 55 changed files with 1,805 additions and 871 deletions.
3 changes: 2 additions & 1 deletion br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ func BuildBackupSchemas(
if err != nil {
return errors.Trace(err)
}

log.Info("################ listing db", zap.Any("dbs", len(dbs)))
for _, dbInfo := range dbs {
// skip system databases
if !tableFilter.MatchSchema(dbInfo.Name.O) || util.IsMemDB(dbInfo.Name.L) || utils.IsTemplateSysDB(dbInfo.Name) {
Expand Down Expand Up @@ -929,6 +929,7 @@ func BuildBackupSchemas(
}

if !hasTable {
log.Info("################ doesn't have table", zap.Any("info", dbInfo))
fn(dbInfo, nil)
}
}
Expand Down
1 change: 1 addition & 0 deletions br/pkg/backup/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func (ss *Schemas) BackupSchemas(
// because the field of `dbInfo` would be modified, which affects the later iteration.
// so copy the `dbInfo` for each to `newDBInfo`
newDBInfo := *dbInfo
log.Info("############### backup schema ###############", zap.Any("schema", dbInfo.Name.O))
schema := &schemaInfo{
tableInfo: tableInfo,
dbInfo: &newDBInfo,
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/checkpoint/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,12 @@ func TestCheckpointMetaForRestore(t *testing.T) {
exists := checkpoint.ExistsCheckpointProgress(ctx, dom)
require.False(t, exists)
err = checkpoint.SaveCheckpointProgress(ctx, se, &checkpoint.CheckpointProgress{
Progress: checkpoint.InLogRestoreAndIdMapPersist,
Progress: checkpoint.InLogRestoreAndIdMapPersisted,
})
require.NoError(t, err)
progress, err := checkpoint.LoadCheckpointProgress(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor())
require.NoError(t, err)
require.Equal(t, checkpoint.InLogRestoreAndIdMapPersist, progress.Progress)
require.Equal(t, checkpoint.InLogRestoreAndIdMapPersisted, progress.Progress)

taskInfo, err := checkpoint.TryToGetCheckpointTaskInfo(ctx, s.Mock.Domain, se.GetSessionCtx().GetRestrictedSQLExecutor())
require.NoError(t, err)
Expand All @@ -120,7 +120,7 @@ func TestCheckpointMetaForRestore(t *testing.T) {
require.Equal(t, uint64(333), taskInfo.Metadata.RewriteTS)
require.Equal(t, "1.0", taskInfo.Metadata.GcRatio)
require.Equal(t, true, taskInfo.HasSnapshotMetadata)
require.Equal(t, checkpoint.InLogRestoreAndIdMapPersist, taskInfo.Progress)
require.Equal(t, checkpoint.InLogRestoreAndIdMapPersisted, taskInfo.Progress)

exists = checkpoint.ExistsCheckpointIngestIndexRepairSQLs(ctx, dom)
require.False(t, exists)
Expand Down
30 changes: 17 additions & 13 deletions br/pkg/checkpoint/log_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/model"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/util/sqlexec"
Expand Down Expand Up @@ -120,13 +121,17 @@ func StartCheckpointLogRestoreRunnerForTest(
return runner, nil
}

// Notice that the session is owned by the checkpoint runner, and it will be also closed by it.
func StartCheckpointRunnerForLogRestore(
ctx context.Context,
se glue.Session,
g glue.Glue,
store kv.Storage,
) (*CheckpointRunner[LogRestoreKeyType, LogRestoreValueType], error) {
session, err := g.CreateSession(store)
if err != nil {
return nil, errors.Trace(err)
}
runner := newCheckpointRunner[LogRestoreKeyType, LogRestoreValueType](
newTableCheckpointStorage(se, LogRestoreCheckpointDatabaseName),
newTableCheckpointStorage(session, LogRestoreCheckpointDatabaseName),
nil, valueMarshalerForLogRestore)

// for restore, no need to set lock
Expand Down Expand Up @@ -205,22 +210,22 @@ func ExistsLogRestoreCheckpointMetadata(
TableExists(pmodel.NewCIStr(LogRestoreCheckpointDatabaseName), pmodel.NewCIStr(checkpointMetaTableName))
}

// A progress type for snapshot + log restore.
// RestoreProgress is a progress type for snapshot + log restore.
//
// Before the id-maps is persist into external storage, the snapshot restore and
// id-maps constructure can be repeated. So if the progress is in `InSnapshotRestore`,
// Before the id-maps is persisted into external storage, the snapshot restore and
// id-maps building can be retried. So if the progress is in `InSnapshotRestore`,
// it can retry from snapshot restore.
//
// After the id-maps is persist into external storage, there are some meta-kvs has
// been restored into the cluster, such as `rename ddl`. Where would be a situation:
// After the id-maps is persisted into external storage, there are some meta-kvs has
// been restored into the cluster, such as `rename ddl`. A situation could be:
//
// the first execution:
//
// table A created in snapshot restore is renamed to table B in log restore
// table A (id 80) --------------> table B (id 80)
// ( snapshot restore ) ( log restore )
//
// the second execution if don't skip snasphot restore:
// the second execution if don't skip snapshot restore:
//
// table A is created again in snapshot restore, because there is no table named A
// table A (id 81) --------------> [not in id-maps, so ignored]
Expand All @@ -232,8 +237,8 @@ type RestoreProgress int

const (
InSnapshotRestore RestoreProgress = iota
// Only when the id-maps is persist, status turns into it.
InLogRestoreAndIdMapPersist
// Only when the id-maps is persisted, status turns into it.
InLogRestoreAndIdMapPersisted
)

type CheckpointProgress struct {
Expand Down Expand Up @@ -265,8 +270,7 @@ func ExistsCheckpointProgress(
TableExists(pmodel.NewCIStr(LogRestoreCheckpointDatabaseName), pmodel.NewCIStr(checkpointProgressTableName))
}

// CheckpointTaskInfo is unique information within the same cluster id. It represents the last
// restore task executed for this cluster.
// CheckpointTaskInfoForLogRestore is tied to a specific cluster. It represents the last restore task executed this cluster.
type CheckpointTaskInfoForLogRestore struct {
Metadata *CheckpointMetadataForLogRestore
HasSnapshotMetadata bool
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/import_mode_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (switcher *ImportModeSwitcher) switchToImportMode(
}()
}

// RestorePreWork executes some prepare work before restore.
// RestorePreWork switches to import mode and removes pd schedulers if needed
// TODO make this function returns a restore post work.
func RestorePreWork(
ctx context.Context,
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/restore/log_client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "log_client",
srcs = [
"batch_file_processor.go",
"client.go",
"import.go",
"import_retry.go",
Expand Down Expand Up @@ -33,6 +34,7 @@ go_library(
"//br/pkg/stream",
"//br/pkg/summary",
"//br/pkg/utils",
"//br/pkg/utils/consts",
"//br/pkg/utils/iter",
"//br/pkg/version",
"//pkg/ddl/util",
Expand Down
134 changes: 134 additions & 0 deletions br/pkg/restore/log_client/batch_file_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package logclient

import (
"context"
"encoding/json"

"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/stream"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/br/pkg/utils/consts"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/meta/model"
"go.uber.org/zap"
)

// BatchFileProcessor defines how to process a batch of files
type BatchFileProcessor interface {
// process a batch of files and with a filterTS and return what's not processed for next iteration
processBatch(
ctx context.Context,
files []*backuppb.DataFileInfo,
entries []*KvEntryWithTS,
filterTS uint64,
cf string,
) ([]*KvEntryWithTS, error)
}

// RestoreProcessor implements BatchFileProcessor for restoring files
type RestoreProcessor struct {
client *LogClient
schemasReplace *stream.SchemasReplace
updateStats func(kvCount uint64, size uint64)
progressInc func()
}

func (rp *RestoreProcessor) processBatch(
ctx context.Context,
files []*backuppb.DataFileInfo,
entries []*KvEntryWithTS,
filterTS uint64,
cf string,
) ([]*KvEntryWithTS, error) {
return rp.client.RestoreBatchMetaKVFiles(
ctx, files, rp.schemasReplace, entries,
filterTS, rp.updateStats, rp.progressInc, cf,
)
}

// DDLCollector implements BatchFileProcessor for collecting DDL information
// 1. It collects table renaming information. The table rename operation will not change the table id, and the process
// will drop the original table and create a new one with the same table id, so in DDL history there will be two events
// that corresponds to the same table id.
//
// add more logic in future if needed
type DDLCollector struct {
client *LogClient
tableRenameInfo *stream.LogBackupTableHistory
}

func (dc *DDLCollector) processBatch(
ctx context.Context,
files []*backuppb.DataFileInfo,
entries []*KvEntryWithTS,
filterTS uint64,
cf string,
) ([]*KvEntryWithTS, error) {
// doesn't need to parse writeCF as it contains value like "p\XXXX\XXX" which is meaningless.
// DefaultCF value should contain everything we want for DDL operation
if cf == consts.WriteCF {
return nil, nil
}

curSortedEntries, filteredEntries, err := dc.client.filterAndSortKvEntriesFromFiles(ctx, files, entries, filterTS)
if err != nil {
return nil, errors.Trace(err)
}

// process entries to collect table IDs
for _, entry := range curSortedEntries {
value := entry.E.Value

if utils.IsMetaDBKey(entry.E.Key) {
rawKey, err := stream.ParseTxnMetaKeyFrom(entry.E.Key)
if err != nil {
return nil, errors.Trace(err)
}

// collect db id -> name mapping during log backup, it will contain information about newly created db
if meta.IsDBkey(rawKey.Field) {
var dbInfo model.DBInfo
if err := json.Unmarshal(value, &dbInfo); err != nil {
return nil, errors.Trace(err)
}
dc.tableRenameInfo.RecordDBIdToName(dbInfo.ID, dbInfo.Name.O)
} else if !meta.IsDBkey(rawKey.Key) {
// also see RewriteMetaKvEntry
continue
}

// collect table history indexed by table id, same id may have different table names in history
if meta.IsTableKey(rawKey.Field) {
var tableInfo model.TableInfo
if err := json.Unmarshal(value, &tableInfo); err != nil {
return nil, errors.Trace(err)
}
// cannot use dbib in the parsed table info cuz it might not set so default to 0
dbID, err := meta.ParseDBKey(rawKey.Key)
if err != nil {
return nil, errors.Trace(err)
}

log.Info("######################################## adding table info", zap.Int64("tableid", tableInfo.ID), zap.String("table name", tableInfo.Name.O), zap.Int64("db id", dbID))
dc.tableRenameInfo.AddTableHistory(tableInfo.ID, tableInfo.Name.String(), dbID)
}
}
}
return filteredEntries, nil
}
Loading

0 comments on commit 8ef978b

Please sign in to comment.