diff --git a/go/vt/vtctl/workflow/framework_test.go b/go/vt/vtctl/workflow/framework_test.go new file mode 100644 index 00000000000..81837ca9e12 --- /dev/null +++ b/go/vt/vtctl/workflow/framework_test.go @@ -0,0 +1,446 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workflow + +import ( + "context" + "fmt" + "os" + "regexp" + "slices" + "strings" + "sync" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/require" + "golang.org/x/exp/maps" + "google.golang.org/protobuf/proto" + + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/key" + "vitess.io/vitess/go/vt/mysqlctl/tmutils" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/memorytopo" + "vitess.io/vitess/go/vt/topotools" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/tmclient" + + _flag "vitess.io/vitess/go/internal/flag" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" +) + +const ( + defaultCellName = "cell" + startingSourceTabletUID = 100 + startingTargetTabletUID = 200 + tabletUIDStep = 10 +) + +type testKeyspace struct { + KeyspaceName string + ShardNames []string +} + +type queryResult struct { + query string + result *querypb.QueryResult +} + +func TestMain(m *testing.M) { + _flag.ParseFlagsForTest() + os.Exit(m.Run()) +} + +type testEnv struct { + ws *Server + ts *topo.Server + tmc *testTMClient + sourceKeyspace, targetKeyspace *testKeyspace + // Keyed first by keyspace name, then tablet UID. + tablets map[string]map[int]*topodatapb.Tablet + cell string +} + +func newTestEnv(t *testing.T, ctx context.Context, cell string, sourceKeyspace, targetKeyspace *testKeyspace) *testEnv { + t.Helper() + env := &testEnv{ + ts: memorytopo.NewServer(ctx, cell), + sourceKeyspace: sourceKeyspace, + targetKeyspace: targetKeyspace, + tablets: make(map[string]map[int]*topodatapb.Tablet), + cell: cell, + } + env.tmc = newTestTMClient(env) + env.ws = NewServer(env.ts, env.tmc) + + serving := true + tabletID := startingSourceTabletUID + for _, shardName := range sourceKeyspace.ShardNames { + _ = env.addTablet(t, ctx, tabletID, sourceKeyspace.KeyspaceName, shardName, topodatapb.TabletType_PRIMARY, serving) + tabletID += tabletUIDStep + } + + isReshard := func() bool { + return sourceKeyspace.KeyspaceName == targetKeyspace.KeyspaceName && + !slices.Equal(sourceKeyspace.ShardNames, targetKeyspace.ShardNames) + } + + if isReshard() { + serving = false + } + tabletID = startingTargetTabletUID + for _, shardName := range targetKeyspace.ShardNames { + _ = env.addTablet(t, ctx, tabletID, targetKeyspace.KeyspaceName, shardName, topodatapb.TabletType_PRIMARY, serving) + tabletID += tabletUIDStep + } + + if isReshard() { + initSrvKeyspace(t, env.ts, targetKeyspace.KeyspaceName, sourceKeyspace.ShardNames, targetKeyspace.ShardNames, []string{cell}) + } + + err := env.ts.RebuildSrvVSchema(ctx, nil) + require.NoError(t, err) + + return env +} + +func initSrvKeyspace(t *testing.T, topo *topo.Server, keyspace string, sources, targets, cells []string) { + ctx := context.Background() + srvKeyspace := &topodatapb.SrvKeyspace{ + Partitions: []*topodatapb.SrvKeyspace_KeyspacePartition{}, + } + getPartition := func(t *testing.T, shards []string) *topodatapb.SrvKeyspace_KeyspacePartition { + partition := &topodatapb.SrvKeyspace_KeyspacePartition{ + ServedType: topodatapb.TabletType_PRIMARY, + ShardReferences: []*topodatapb.ShardReference{}, + } + for _, shard := range shards { + keyRange, err := key.ParseShardingSpec(shard) + require.NoError(t, err) + require.Equal(t, 1, len(keyRange)) + partition.ShardReferences = append(partition.ShardReferences, &topodatapb.ShardReference{ + Name: shard, + KeyRange: keyRange[0], + }) + } + return partition + } + srvKeyspace.Partitions = append(srvKeyspace.Partitions, getPartition(t, sources)) + srvKeyspace.Partitions = append(srvKeyspace.Partitions, getPartition(t, targets)) + for _, cell := range cells { + err := topo.UpdateSrvKeyspace(ctx, cell, keyspace, srvKeyspace) + require.NoError(t, err) + } + err := topo.ValidateSrvKeyspace(ctx, keyspace, strings.Join(cells, ",")) + require.NoError(t, err) +} + +func (env *testEnv) close() { + for _, k := range maps.Values(env.tablets) { + for _, t := range maps.Values(k) { + env.deleteTablet(t) + } + } +} + +func (env *testEnv) addTablet(t *testing.T, ctx context.Context, id int, keyspace, shard string, tabletType topodatapb.TabletType, serving bool) *topodatapb.Tablet { + tablet := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: env.cell, + Uid: uint32(id), + }, + Keyspace: keyspace, + Shard: shard, + KeyRange: &topodatapb.KeyRange{}, + Type: tabletType, + PortMap: map[string]int32{ + "test": int32(id), + }, + } + if env.tablets[keyspace] == nil { + env.tablets[keyspace] = make(map[int]*topodatapb.Tablet) + } + env.tablets[keyspace][id] = tablet + err := env.ws.ts.InitTablet(ctx, tablet, false /* allowPrimaryOverride */, true /* createShardAndKeyspace */, false /* allowUpdate */) + require.NoError(t, err) + if tabletType == topodatapb.TabletType_PRIMARY { + _, err = env.ws.ts.UpdateShardFields(ctx, keyspace, shard, func(si *topo.ShardInfo) error { + si.PrimaryAlias = tablet.Alias + si.IsPrimaryServing = serving + return nil + }) + require.NoError(t, err) + } + return tablet +} + +// addTableRoutingRules adds routing rules from the test env's source keyspace to +// its target keyspace for the given tablet types and tables. +func (env *testEnv) addTableRoutingRules(t *testing.T, ctx context.Context, tabletTypes []topodatapb.TabletType, tables []string) { + ks := env.targetKeyspace.KeyspaceName + rules := make(map[string][]string, len(tables)*(len(tabletTypes)*3)) + for _, tabletType := range tabletTypes { + for _, tableName := range tables { + toTarget := []string{ks + "." + tableName} + tt := strings.ToLower(tabletType.String()) + if tabletType == topodatapb.TabletType_PRIMARY { + rules[tableName] = toTarget + rules[ks+"."+tableName] = toTarget + rules[env.sourceKeyspace.KeyspaceName+"."+tableName] = toTarget + } else { + rules[tableName+"@"+tt] = toTarget + rules[ks+"."+tableName+"@"+tt] = toTarget + rules[env.sourceKeyspace.KeyspaceName+"."+tableName+"@"+tt] = toTarget + } + } + } + err := topotools.SaveRoutingRules(ctx, env.ts, rules) + require.NoError(t, err) + err = env.ts.RebuildSrvVSchema(ctx, nil) + require.NoError(t, err) +} + +func (env *testEnv) deleteTablet(tablet *topodatapb.Tablet) { + _ = env.ts.DeleteTablet(context.Background(), tablet.Alias) + delete(env.tablets[tablet.Keyspace], int(tablet.Alias.Uid)) +} + +type testTMClient struct { + tmclient.TabletManagerClient + schema map[string]*tabletmanagerdatapb.SchemaDefinition + + mu sync.Mutex + vrQueries map[int][]*queryResult + createVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest + readVReplicationWorkflowRequests map[uint32]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest + + env *testEnv // For access to the env config from tmc methods. + reverse atomic.Bool // Are we reversing traffic? +} + +func newTestTMClient(env *testEnv) *testTMClient { + return &testTMClient{ + schema: make(map[string]*tabletmanagerdatapb.SchemaDefinition), + vrQueries: make(map[int][]*queryResult), + createVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.CreateVReplicationWorkflowRequest), + readVReplicationWorkflowRequests: make(map[uint32]*tabletmanagerdatapb.ReadVReplicationWorkflowRequest), + env: env, + } +} + +func (tmc *testTMClient) CreateVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) (*tabletmanagerdatapb.CreateVReplicationWorkflowResponse, error) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + + if expect := tmc.createVReplicationWorkflowRequests[tablet.Alias.Uid]; expect != nil { + if !proto.Equal(expect, req) { + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected CreateVReplicationWorkflow request: got %+v, want %+v", req, expect) + } + } + res := sqltypes.MakeTestResult(sqltypes.MakeTestFields("rowsaffected", "int64"), "1") + return &tabletmanagerdatapb.CreateVReplicationWorkflowResponse{Result: sqltypes.ResultToProto3(res)}, nil +} + +func (tmc *testTMClient) ReadVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.ReadVReplicationWorkflowRequest) (*tabletmanagerdatapb.ReadVReplicationWorkflowResponse, error) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + + if expect := tmc.readVReplicationWorkflowRequests[tablet.Alias.Uid]; expect != nil { + if !proto.Equal(expect, req) { + return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected ReadVReplicationWorkflow request: got %+v, want %+v", req, expect) + } + } + workflowType := binlogdatapb.VReplicationWorkflowType_MoveTables + if strings.Contains(req.Workflow, "lookup") { + workflowType = binlogdatapb.VReplicationWorkflowType_CreateLookupIndex + } + res := &tabletmanagerdatapb.ReadVReplicationWorkflowResponse{ + Workflow: req.Workflow, + WorkflowType: workflowType, + Streams: make([]*tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream, 0, 2), + } + rules := make([]*binlogdatapb.Rule, len(tmc.schema)) + for i, table := range maps.Keys(tmc.schema) { + rules[i] = &binlogdatapb.Rule{ + Match: table, + Filter: fmt.Sprintf("select * from %s", table), + } + } + blsKs := tmc.env.sourceKeyspace + if tmc.reverse.Load() && tablet.Keyspace == tmc.env.sourceKeyspace.KeyspaceName { + blsKs = tmc.env.targetKeyspace + } + for i, shard := range blsKs.ShardNames { + stream := &tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream{ + Id: int32(i + 1), + Bls: &binlogdatapb.BinlogSource{ + Keyspace: blsKs.KeyspaceName, + Shard: shard, + Tables: maps.Keys(tmc.schema), + Filter: &binlogdatapb.Filter{ + Rules: rules, + }, + }, + } + res.Streams = append(res.Streams, stream) + } + + return res, nil +} + +func (tmc *testTMClient) DeleteVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.DeleteVReplicationWorkflowRequest) (response *tabletmanagerdatapb.DeleteVReplicationWorkflowResponse, err error) { + return &tabletmanagerdatapb.DeleteVReplicationWorkflowResponse{ + Result: &querypb.QueryResult{ + RowsAffected: 1, + }, + }, nil +} + +func (tmc *testTMClient) GetSchema(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.GetSchemaRequest) (*tabletmanagerdatapb.SchemaDefinition, error) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + + schemaDefn := &tabletmanagerdatapb.SchemaDefinition{} + for _, table := range req.Tables { + if table == "/.*/" { + // Special case of all tables in keyspace. + for key, tableDefn := range tmc.schema { + if strings.HasPrefix(key, tablet.Keyspace+".") { + schemaDefn.TableDefinitions = append(schemaDefn.TableDefinitions, tableDefn.TableDefinitions...) + } + } + break + } + + key := tablet.Keyspace + "." + table + tableDefn := tmc.schema[key] + if tableDefn == nil { + continue + } + schemaDefn.TableDefinitions = append(schemaDefn.TableDefinitions, tableDefn.TableDefinitions...) + } + return schemaDefn, nil +} + +func (tmc *testTMClient) expectVRQuery(tabletID int, query string, result *sqltypes.Result) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + + tmc.vrQueries[tabletID] = append(tmc.vrQueries[tabletID], &queryResult{ + query: query, + result: sqltypes.ResultToProto3(result), + }) +} + +func (tmc *testTMClient) expectVRQueryResultOnKeyspaceTablets(keyspace string, queryResult *queryResult) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + + for uid := range tmc.env.tablets[keyspace] { + tmc.vrQueries[uid] = append(tmc.vrQueries[uid], queryResult) + } +} + +func (tmc *testTMClient) expectCreateVReplicationWorkflowRequest(tabletID uint32, req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + + tmc.createVReplicationWorkflowRequests[tabletID] = req +} + +func (tmc *testTMClient) VReplicationExec(ctx context.Context, tablet *topodatapb.Tablet, query string) (*querypb.QueryResult, error) { + tmc.mu.Lock() + defer tmc.mu.Unlock() + + qrs := tmc.vrQueries[int(tablet.Alias.Uid)] + if len(qrs) == 0 { + return nil, fmt.Errorf("tablet %v does not expect any more queries: %s", tablet, query) + } + matched := false + if qrs[0].query[0] == '/' { + matched = regexp.MustCompile(qrs[0].query[1:]).MatchString(query) + } else { + matched = query == qrs[0].query + } + if !matched { + return nil, fmt.Errorf("tablet %v:\nunexpected query\n%s\nwant:\n%s", tablet, query, qrs[0].query) + } + tmc.vrQueries[int(tablet.Alias.Uid)] = qrs[1:] + return qrs[0].result, nil +} + +func (tmc *testTMClient) ExecuteFetchAsDba(ctx context.Context, tablet *topodatapb.Tablet, usePool bool, req *tabletmanagerdatapb.ExecuteFetchAsDbaRequest) (*querypb.QueryResult, error) { + // Reuse VReplicationExec. + return tmc.VReplicationExec(ctx, tablet, string(req.Query)) +} + +func (tmc *testTMClient) ExecuteFetchAsAllPrivs(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.ExecuteFetchAsAllPrivsRequest) (*querypb.QueryResult, error) { + return nil, nil +} + +// Note: ONLY breaks up change.SQL into individual statements and executes it. Does NOT fully implement ApplySchema. +func (tmc *testTMClient) ApplySchema(ctx context.Context, tablet *topodatapb.Tablet, change *tmutils.SchemaChange) (*tabletmanagerdatapb.SchemaChangeResult, error) { + stmts := strings.Split(change.SQL, ";") + + for _, stmt := range stmts { + _, err := tmc.ExecuteFetchAsDba(ctx, tablet, false, &tabletmanagerdatapb.ExecuteFetchAsDbaRequest{ + Query: []byte(stmt), + MaxRows: 0, + ReloadSchema: true, + }) + if err != nil { + return nil, err + } + } + + return nil, nil +} + +func (tmc *testTMClient) VDiff(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.VDiffRequest) (*tabletmanagerdatapb.VDiffResponse, error) { + return &tabletmanagerdatapb.VDiffResponse{ + Id: 1, + VdiffUuid: req.VdiffUuid, + Output: &querypb.QueryResult{ + RowsAffected: 1, + }, + }, nil +} + +func (tmc *testTMClient) UpdateVReplicationWorkflow(ctx context.Context, tablet *topodatapb.Tablet, req *tabletmanagerdatapb.UpdateVReplicationWorkflowRequest) (*tabletmanagerdatapb.UpdateVReplicationWorkflowResponse, error) { + return &tabletmanagerdatapb.UpdateVReplicationWorkflowResponse{ + Result: &querypb.QueryResult{ + RowsAffected: 1, + }, + }, nil +} + +func (tmc *testTMClient) PrimaryPosition(ctx context.Context, tablet *topodatapb.Tablet) (string, error) { + return position, nil +} + +func (tmc *testTMClient) WaitForPosition(ctx context.Context, tablet *topodatapb.Tablet, pos string) error { + return nil +} + +func (tmc *testTMClient) VReplicationWaitForPos(ctx context.Context, tablet *topodatapb.Tablet, id int32, pos string) error { + return nil +} diff --git a/go/vt/vtctl/workflow/materializer_env_test.go b/go/vt/vtctl/workflow/materializer_env_test.go index f1ddf6be645..48ad6e548ea 100644 --- a/go/vt/vtctl/workflow/materializer_env_test.go +++ b/go/vt/vtctl/workflow/materializer_env_test.go @@ -19,14 +19,12 @@ package workflow import ( "context" "fmt" - "os" "regexp" "strconv" "strings" "sync" "testing" - _flag "vitess.io/vitess/go/internal/flag" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/mysqlctl/tmutils" "vitess.io/vitess/go/vt/sqlparser" @@ -41,11 +39,6 @@ import ( vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" ) -type queryResult struct { - query string - result *querypb.QueryResult -} - type testMaterializerEnv struct { ws *Server ms *vtctldatapb.MaterializeSettings @@ -62,11 +55,6 @@ type testMaterializerEnv struct { //---------------------------------------------- // testMaterializerEnv -func TestMain(m *testing.M) { - _flag.ParseFlagsForTest() - os.Exit(m.Run()) -} - func newTestMaterializerEnv(t *testing.T, ctx context.Context, ms *vtctldatapb.MaterializeSettings, sources, targets []string) *testMaterializerEnv { t.Helper() env := &testMaterializerEnv{ diff --git a/go/vt/vtctl/workflow/materializer_test.go b/go/vt/vtctl/workflow/materializer_test.go index 39aabcf45ad..df3d496a17b 100644 --- a/go/vt/vtctl/workflow/materializer_test.go +++ b/go/vt/vtctl/workflow/materializer_test.go @@ -39,6 +39,7 @@ import ( vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" ) +const position = "9d10e6ec-07a0-11ee-ae73-8e53f4cf3083:1-97" const getWorkflowQuery = "select id from _vt.vreplication where db_name='vt_targetks' and workflow='workflow'" const mzUpdateQuery = "update _vt.vreplication set state='Running' where db_name='vt_targetks' and workflow='workflow'" const mzSelectFrozenQuery = "select 1 from _vt.vreplication where db_name='vt_targetks' and message='FROZEN' and workflow_sub_type != 1" @@ -47,7 +48,6 @@ const mzGetWorkflowStatusQuery = "select id, workflow, source, pos, stop_pos, ma const mzGetCopyState = "select distinct table_name from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = 1" const mzGetLatestCopyState = "select table_name, lastpk from _vt.copy_state where vrepl_id = 1 and id in (select max(id) from _vt.copy_state where vrepl_id = 1 group by vrepl_id, table_name)" const insertPrefix = `/insert into _vt.vreplication\(workflow, source, pos, max_tps, max_replication_lag, cell, tablet_types, time_updated, transaction_timestamp, state, db_name, workflow_type, workflow_sub_type, defer_secondary_keys\) values ` -const eol = "$" var ( defaultOnDDL = binlogdatapb.OnDDLAction_IGNORE.String() diff --git a/go/vt/vtctl/workflow/resharder.go b/go/vt/vtctl/workflow/resharder.go index 161b1c4567d..8cdc85392dd 100644 --- a/go/vt/vtctl/workflow/resharder.go +++ b/go/vt/vtctl/workflow/resharder.go @@ -97,6 +97,9 @@ func (s *Server) buildResharder(ctx context.Context, keyspace, workflow string, if err != nil { return nil, vterrors.Wrapf(err, "GetShard(%s) failed", shard) } + if si.PrimaryAlias == nil { + return nil, fmt.Errorf("target shard %v has no primary tablet", shard) + } if si.IsPrimaryServing { return nil, fmt.Errorf("target shard %v is in serving state", shard) } diff --git a/go/vt/vtctl/workflow/resharder_test.go b/go/vt/vtctl/workflow/resharder_test.go new file mode 100644 index 00000000000..a7a73d23ba9 --- /dev/null +++ b/go/vt/vtctl/workflow/resharder_test.go @@ -0,0 +1,232 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workflow + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" +) + +const eol = "$" + +func TestReshardCreate(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + workflowName := "wf1" + tableName := "t1" + sourceKeyspaceName := "targetks" + targetKeyspaceName := "targetks" + tabletTypes := []topodatapb.TabletType{ + topodatapb.TabletType_PRIMARY, + topodatapb.TabletType_REPLICA, + topodatapb.TabletType_RDONLY, + } + tabletTypesStr := topoproto.MakeStringTypeCSV(tabletTypes) + schema := map[string]*tabletmanagerdatapb.SchemaDefinition{ + tableName: { + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ + { + Name: tableName, + Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName), + }, + }, + }, + } + + var binlogSource = &binlogdatapb.BinlogSource{ + Keyspace: sourceKeyspaceName, + Shard: "0", + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select * from t1", + }}, + }, + } + + testcases := []struct { + name string + sourceKeyspace, targetKeyspace *testKeyspace + preFunc func(env *testEnv) + want *vtctldatapb.WorkflowStatusResponse + wantErr string + }{ + { + name: "basic", + sourceKeyspace: &testKeyspace{ + KeyspaceName: sourceKeyspaceName, + ShardNames: []string{"0"}, + }, + targetKeyspace: &testKeyspace{ + KeyspaceName: targetKeyspaceName, + ShardNames: []string{"-80", "80-"}, + }, + want: &vtctldatapb.WorkflowStatusResponse{ + ShardStreams: map[string]*vtctldatapb.WorkflowStatusResponse_ShardStreams{ + "targetks/-80": { + Streams: []*vtctldatapb.WorkflowStatusResponse_ShardStreamState{ + { + Id: 1, + Tablet: &topodatapb.TabletAlias{Cell: defaultCellName, Uid: startingTargetTabletUID}, + SourceShard: "targetks/0", Position: position, Status: "Running", Info: "VStream Lag: 0s", + }, + }, + }, + "targetks/80-": { + Streams: []*vtctldatapb.WorkflowStatusResponse_ShardStreamState{ + { + Id: 1, + Tablet: &topodatapb.TabletAlias{Cell: defaultCellName, Uid: startingTargetTabletUID + tabletUIDStep}, + SourceShard: "targetks/0", Position: position, Status: "Running", Info: "VStream Lag: 0s", + }, + }, + }, + }, + TrafficState: "Reads Not Switched. Writes Not Switched", + }, + }, + { + name: "no primary", + sourceKeyspace: &testKeyspace{ + KeyspaceName: sourceKeyspaceName, + ShardNames: []string{"0"}, + }, + targetKeyspace: &testKeyspace{ + KeyspaceName: targetKeyspaceName, + ShardNames: []string{"-80", "80-"}, + }, + preFunc: func(env *testEnv) { + _, err := env.ts.UpdateShardFields(ctx, targetKeyspaceName, "-80", func(si *topo.ShardInfo) error { + si.PrimaryAlias = nil + return nil + }) + require.NoError(t, err) + }, + wantErr: "buildResharder: target shard -80 has no primary tablet", + }, + } + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + require.NotNil(t, tc.sourceKeyspace) + require.NotNil(t, tc.targetKeyspace) + + env := newTestEnv(t, ctx, defaultCellName, tc.sourceKeyspace, tc.targetKeyspace) + defer env.close() + env.tmc.schema = schema + + req := &vtctldatapb.ReshardCreateRequest{ + Keyspace: targetKeyspaceName, + Workflow: workflowName, + TabletTypes: tabletTypes, + SourceShards: tc.sourceKeyspace.ShardNames, + TargetShards: tc.targetKeyspace.ShardNames, + Cells: []string{env.cell}, + } + + for i := range tc.sourceKeyspace.ShardNames { + tabletUID := startingSourceTabletUID + (tabletUIDStep * i) + env.tmc.expectVRQuery( + tabletUID, + fmt.Sprintf("select workflow, source, cell, tablet_types from _vt.vreplication where db_name='vt_%s' and message != 'FROZEN'", targetKeyspaceName), + &sqltypes.Result{}, + ) + env.tmc.expectVRQuery( + tabletUID, + "select distinct table_name from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = 1", + &sqltypes.Result{}, + ) + env.tmc.expectVRQuery( + tabletUID, + fmt.Sprintf("select id, workflow, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, message, tags, workflow_type, workflow_sub_type, time_heartbeat, defer_secondary_keys, component_throttled, time_throttled, rows_copied from _vt.vreplication where workflow = '%s' and db_name = 'vt_%s'", + workflowName, targetKeyspaceName), + &sqltypes.Result{}, + ) + env.tmc.expectVRQuery( + tabletUID, + "select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (1) and id in (select max(id) from _vt.copy_state where vrepl_id in (1) group by vrepl_id, table_name)", + &sqltypes.Result{}, + ) + } + + for i, target := range tc.targetKeyspace.ShardNames { + tabletUID := startingTargetTabletUID + (tabletUIDStep * i) + env.tmc.expectVRQuery( + tabletUID, + fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s'", targetKeyspaceName), + &sqltypes.Result{}, + ) + env.tmc.expectVRQuery( + tabletUID, + insertPrefix+ + `\('`+workflowName+`', 'keyspace:\\"`+targetKeyspaceName+`\\" shard:\\"0\\" filter:{rules:{match:\\"/.*\\" filter:\\"`+target+`\\"}}', '', [0-9]*, [0-9]*, '`+ + env.cell+`', '`+tabletTypesStr+`', [0-9]*, 0, 'Stopped', 'vt_`+targetKeyspaceName+`', 4, 0, false\)`+eol, + &sqltypes.Result{}, + ) + env.tmc.expectVRQuery( + tabletUID, + "select distinct table_name from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = 1", + &sqltypes.Result{}, + ) + env.tmc.expectVRQuery( + tabletUID, + fmt.Sprintf("select id, workflow, source, pos, stop_pos, max_replication_lag, state, db_name, time_updated, transaction_timestamp, message, tags, workflow_type, workflow_sub_type, time_heartbeat, defer_secondary_keys, component_throttled, time_throttled, rows_copied from _vt.vreplication where workflow = '%s' and db_name = 'vt_%s'", + workflowName, targetKeyspaceName), + sqltypes.MakeTestResult( + sqltypes.MakeTestFields( + "id|workflow|source|pos|stop_pos|max_replication_log|state|db_name|time_updated|transaction_timestamp|message|tags|workflow_type|workflow_sub_type|time_heartbeat|defer_secondary_keys|component_throttled|time_throttled|rows_copied", + "int64|varchar|blob|varchar|varchar|int64|varchar|varchar|int64|int64|varchar|varchar|int64|int64|int64|int64|varchar|int64|int64", + ), + fmt.Sprintf("1|%s|%s|MySQL56/%s|NULL|0|Running|vt_%s|1686577659|0|||1|0|0|0||0|10", workflowName, binlogSource, position, sourceKeyspaceName), + ), + ) + env.tmc.expectVRQuery( + tabletUID, + "select table_name, lastpk from _vt.copy_state where vrepl_id = 1 and id in (select max(id) from _vt.copy_state where vrepl_id = 1 group by vrepl_id, table_name)", + &sqltypes.Result{}, + ) + } + + if tc.preFunc != nil { + tc.preFunc(env) + } + + res, err := env.ws.ReshardCreate(ctx, req) + if tc.wantErr != "" { + require.EqualError(t, err, tc.wantErr) + return + } + require.NoError(t, err) + if tc.want != nil { + require.Equal(t, tc.want, res) + } + }) + } +} diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index cd7c3481eea..43d1f1a2b05 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -1578,7 +1578,7 @@ func (s *Server) ReshardCreate(ctx context.Context, req *vtctldatapb.ReshardCrea if err := s.ts.ValidateSrvKeyspace(ctx, keyspace, strings.Join(cells, ",")); err != nil { err2 := vterrors.Wrapf(err, "SrvKeyspace for keyspace %s is corrupt for cell(s) %s", keyspace, cells) - log.Errorf("%w", err2) + log.Errorf("%v", err2) return nil, err } tabletTypesStr := topoproto.MakeStringTypeCSV(req.TabletTypes)