Skip to content

Commit

Permalink
fix: Avoid creation of workflows with non-empty tables in target keys…
Browse files Browse the repository at this point in the history
…pace (#16874)

Signed-off-by: Noble Mittal <[email protected]>
  • Loading branch information
beingnoble03 authored Oct 18, 2024
1 parent eadba7b commit b8ba593
Show file tree
Hide file tree
Showing 5 changed files with 273 additions and 19 deletions.
75 changes: 75 additions & 0 deletions go/vt/vtctl/workflow/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ import (
"sync"
"time"

"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"

"vitess.io/vitess/go/ptr"
"vitess.io/vitess/go/sqlescape"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/textutil"
"vitess.io/vitess/go/vt/concurrency"
Expand All @@ -45,6 +49,7 @@ import (
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

const (
Expand All @@ -54,6 +59,7 @@ const (
// For automatically created sequence tables, use a standard format
// of tableName_seq.
autoSequenceTableFormat = "%s_seq"
getNonEmptyTableQuery = "select 1 from %s limit 1"
)

type materializer struct {
Expand Down Expand Up @@ -290,6 +296,15 @@ func (mz *materializer) deploySchema() error {
}
}

// Check if any table being moved is already non-empty in the target keyspace.
// Skip this check for multi-tenant migrations.
if !mz.IsMultiTenantMigration() {
err := mz.validateEmptyTables()
if err != nil {
return vterrors.Wrap(err, "failed to validate that all target tables are empty")
}
}

err := forAllShards(mz.targetShards, func(target *topo.ShardInfo) error {
allTables := []string{"/.*/"}

Expand Down Expand Up @@ -544,6 +559,66 @@ func (mz *materializer) buildMaterializer() error {
return nil
}

// validateEmptyTables checks if all tables are empty across all target shards.
// It queries each shard's primary tablet and if any non-empty table is found,
// returns an error containing a list of non-empty tables.
func (mz *materializer) validateEmptyTables() error {
var mu sync.Mutex
isNonEmptyTable := map[string]bool{}

err := forAllShards(mz.targetShards, func(shard *topo.ShardInfo) error {
primary := shard.PrimaryAlias
if primary == nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "no primary tablet found for shard %s/%s", shard.Keyspace(), shard.ShardName())
}

ti, err := mz.ts.GetTablet(mz.ctx, primary)
if err != nil {
return err
}

eg, groupCtx := errgroup.WithContext(mz.ctx)
eg.SetLimit(20)

for _, ts := range mz.ms.TableSettings {
eg.Go(func() error {
table, err := sqlescape.EnsureEscaped(ts.TargetTable)
if err != nil {
return err
}
query := fmt.Sprintf(getNonEmptyTableQuery, table)
res, err := mz.tmc.ExecuteFetchAsAllPrivs(groupCtx, ti.Tablet, &tabletmanagerdatapb.ExecuteFetchAsAllPrivsRequest{
Query: []byte(query),
MaxRows: 1,
})
// Ignore table not found error
if err != nil && !IsTableDidNotExistError(err) {
return err
}
if res != nil && len(res.Rows) > 0 {
mu.Lock()
isNonEmptyTable[ts.TargetTable] = true
mu.Unlock()
}
return nil
})
}
if err = eg.Wait(); err != nil {
return err
}
return nil
})
if err != nil {
return err
}

nonEmptyTables := maps.Keys(isNonEmptyTable)
if len(nonEmptyTables) > 0 {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "non-empty tables found in target keyspace(%s): %s", mz.ms.TargetKeyspace, strings.Join(nonEmptyTables, ", "))
}
return nil
}

func (mz *materializer) startStreams(ctx context.Context) error {
return forAllShards(mz.targetShards, func(target *topo.ShardInfo) error {
targetPrimary, err := mz.ts.GetTablet(ctx, target.PrimaryAlias)
Expand Down
28 changes: 27 additions & 1 deletion go/vt/vtctl/workflow/materializer_env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

Expand Down Expand Up @@ -224,6 +225,7 @@ type testMaterializerTMClient struct {

mu sync.Mutex
vrQueries map[int][]*queryResult
fetchAsAllPrivsQueries map[int]map[string]*queryResult
createVReplicationWorkflowRequests map[uint32]*createVReplicationWorkflowRequestResponse

// Used to confirm the number of times WorkflowDelete was called.
Expand All @@ -243,6 +245,7 @@ func newTestMaterializerTMClient(keyspace string, sourceShards []string, tableSe
sourceShards: sourceShards,
tableSettings: tableSettings,
vrQueries: make(map[int][]*queryResult),
fetchAsAllPrivsQueries: make(map[int]map[string]*queryResult),
createVReplicationWorkflowRequests: make(map[uint32]*createVReplicationWorkflowRequestResponse),
getSchemaResponses: make(map[uint32]*tabletmanagerdatapb.SchemaDefinition),
}
Expand Down Expand Up @@ -370,6 +373,20 @@ func (tmc *testMaterializerTMClient) expectVRQuery(tabletID int, query string, r
})
}

func (tmc *testMaterializerTMClient) expectFetchAsAllPrivsQuery(tabletID int, query string, result *sqltypes.Result) {
tmc.mu.Lock()
defer tmc.mu.Unlock()

if tmc.fetchAsAllPrivsQueries[tabletID] == nil {
tmc.fetchAsAllPrivsQueries[tabletID] = make(map[string]*queryResult)
}

tmc.fetchAsAllPrivsQueries[tabletID][query] = &queryResult{
query: query,
result: sqltypes.ResultToProto3(result),
}
}

func (tmc *testMaterializerTMClient) expectCreateVReplicationWorkflowRequest(tabletID uint32, req *createVReplicationWorkflowRequestResponse) {
tmc.mu.Lock()
defer tmc.mu.Unlock()
Expand Down Expand Up @@ -420,7 +437,16 @@ func (tmc *testMaterializerTMClient) ExecuteFetchAsDba(ctx context.Context, tabl
}

func (tmc *testMaterializerTMClient) ExecuteFetchAsAllPrivs(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.ExecuteFetchAsAllPrivsRequest) (*querypb.QueryResult, error) {
return nil, nil
tmc.mu.Lock()
defer tmc.mu.Unlock()

if resultsForTablet, ok := tmc.fetchAsAllPrivsQueries[int(tablet.Alias.Uid)]; ok {
if result, ok := resultsForTablet[string(req.Query)]; ok {
return result.result, result.err
}
}

return nil, fmt.Errorf("%w: no ExecuteFetchAsAllPrivs result set for tablet %d", assert.AnError, int(tablet.Alias.Uid))
}

// Note: ONLY breaks up change.SQL into individual statements and executes it. Does NOT fully implement ApplySchema.
Expand Down
Loading

0 comments on commit b8ba593

Please sign in to comment.