From 69ed3c90aca2788260343c04269362b5fbd5a3d1 Mon Sep 17 00:00:00 2001 From: "vitess-bot[bot]" <108069721+vitess-bot[bot]@users.noreply.github.com> Date: Fri, 29 Mar 2024 09:58:01 -0700 Subject: [PATCH] [release-18.0] VReplication: Move the Reshard v2 workflow to vtctldclient (#15579) (#15583) Signed-off-by: Matt Lord Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com> Co-authored-by: Matt Lord --- .../vreplication/common/switchtraffic.go | 1 + go/streamlog/streamlog_flaky_test.go | 8 +- go/test/endtoend/cluster/vttablet_process.go | 1 + go/test/endtoend/vreplication/helper_test.go | 13 +- .../vreplication/movetables_buffering_test.go | 3 +- .../partial_movetables_seq_test.go | 9 +- .../vreplication/partial_movetables_test.go | 8 +- .../resharding_workflows_v2_test.go | 131 ++++++++++-------- go/vt/vtctl/workflow/server.go | 30 ++-- go/vt/vtctl/workflow/traffic_switcher.go | 44 +++--- .../tabletmanager/rpc_vreplication.go | 2 +- test/config.json | 17 ++- 12 files changed, 151 insertions(+), 116 deletions(-) diff --git a/go/cmd/vtctldclient/command/vreplication/common/switchtraffic.go b/go/cmd/vtctldclient/command/vreplication/common/switchtraffic.go index 019367fe82b..4004afc0ac0 100644 --- a/go/cmd/vtctldclient/command/vreplication/common/switchtraffic.go +++ b/go/cmd/vtctldclient/command/vreplication/common/switchtraffic.go @@ -91,6 +91,7 @@ func commandSwitchTraffic(cmd *cobra.Command, args []string) error { req := &vtctldatapb.WorkflowSwitchTrafficRequest{ Keyspace: BaseOptions.TargetKeyspace, Workflow: BaseOptions.Workflow, + Cells: SwitchTrafficOptions.Cells, TabletTypes: SwitchTrafficOptions.TabletTypes, MaxReplicationLagAllowed: protoutil.DurationToProto(SwitchTrafficOptions.MaxReplicationLagAllowed), Timeout: protoutil.DurationToProto(SwitchTrafficOptions.Timeout), diff --git a/go/streamlog/streamlog_flaky_test.go b/go/streamlog/streamlog_flaky_test.go index 9c0b0366a1d..17d3c4148e2 100644 --- a/go/streamlog/streamlog_flaky_test.go +++ b/go/streamlog/streamlog_flaky_test.go @@ -213,7 +213,7 @@ func TestFile(t *testing.T) { logger.Send(&logMessage{"test 2"}) // Allow time for propagation - time.Sleep(10 * time.Millisecond) + time.Sleep(100 * time.Millisecond) want := "test 1\ntest 2\n" contents, _ := os.ReadFile(logPath) @@ -227,7 +227,7 @@ func TestFile(t *testing.T) { os.Rename(logPath, rotatedPath) logger.Send(&logMessage{"test 3"}) - time.Sleep(10 * time.Millisecond) + time.Sleep(100 * time.Millisecond) want = "test 1\ntest 2\ntest 3\n" contents, _ = os.ReadFile(rotatedPath) @@ -241,10 +241,10 @@ func TestFile(t *testing.T) { if err := syscall.Kill(syscall.Getpid(), syscall.SIGUSR2); err != nil { t.Logf("failed to send streamlog rotate signal: %v", err) } - time.Sleep(10 * time.Millisecond) + time.Sleep(100 * time.Millisecond) logger.Send(&logMessage{"test 4"}) - time.Sleep(10 * time.Millisecond) + time.Sleep(100 * time.Millisecond) want = "test 1\ntest 2\ntest 3\n" contents, _ = os.ReadFile(rotatedPath) diff --git a/go/test/endtoend/cluster/vttablet_process.go b/go/test/endtoend/cluster/vttablet_process.go index bc90a2d4ae2..70bfce452d4 100644 --- a/go/test/endtoend/cluster/vttablet_process.go +++ b/go/test/endtoend/cluster/vttablet_process.go @@ -670,6 +670,7 @@ func VttabletProcessInstance(port, grpcPort, tabletUID int, cell, shard, keyspac Binary: "vttablet", FileToLogQueries: path.Join(tmpDirectory, fmt.Sprintf("/vt_%010d_querylog.txt", tabletUID)), Directory: path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/vt_%010d", tabletUID)), + Cell: cell, TabletPath: fmt.Sprintf("%s-%010d", cell, tabletUID), ServiceMap: "grpc-queryservice,grpc-tabletmanager,grpc-updatestream,grpc-throttler", LogDir: tmpDirectory, diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index 839ffdfa306..b109cc0e996 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -45,9 +45,11 @@ import ( "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) const ( @@ -318,6 +320,9 @@ func waitForWorkflowState(t *testing.T, vc *VitessCluster, ksWorkflow string, wa } } } + if wantState == binlogdatapb.VReplicationWorkflowState_Running.String() && attributeValue.Get("Pos").String() == "" { + done = false + } } else { done = false } @@ -351,7 +356,7 @@ func waitForWorkflowState(t *testing.T, vc *VitessCluster, ksWorkflow string, wa // as a CSV have secondary keys. This is useful when testing the // --defer-secondary-keys flag to confirm that the secondary keys // were re-added by the time the workflow hits the running phase. -// For a Reshard workflow, where no tables are specififed, pass +// For a Reshard workflow, where no tables are specified, pass // an empty string for the tables and all tables in the target // keyspace will be checked. func confirmTablesHaveSecondaryKeys(t *testing.T, tablets []*cluster.VttabletProcess, ksName string, tables string) { @@ -371,6 +376,12 @@ func confirmTablesHaveSecondaryKeys(t *testing.T, tablets []*cluster.VttabletPro } } for _, tablet := range tablets { + // Be sure that the schema is up to date. + err := vc.VtctldClient.ExecuteCommand("ReloadSchema", topoproto.TabletAliasString(&topodatapb.TabletAlias{ + Cell: tablet.Cell, + Uid: uint32(tablet.TabletUID), + })) + require.NoError(t, err) for _, table := range tableArr { if schema.IsInternalOperationTableName(table) { continue diff --git a/go/test/endtoend/vreplication/movetables_buffering_test.go b/go/test/endtoend/vreplication/movetables_buffering_test.go index 4e4b7cada97..e81e507f9f4 100644 --- a/go/test/endtoend/vreplication/movetables_buffering_test.go +++ b/go/test/endtoend/vreplication/movetables_buffering_test.go @@ -8,7 +8,6 @@ import ( "github.com/stretchr/testify/require" "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/wrangler" ) func TestMoveTablesBuffering(t *testing.T) { @@ -17,7 +16,7 @@ func TestMoveTablesBuffering(t *testing.T) { defer vtgateConn.Close() defer vc.TearDown(t) - currentWorkflowType = wrangler.MoveTablesWorkflow + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables setupMinimalCustomerKeyspace(t) tables := "loadtest" err := tstWorkflowExec(t, defaultCellName, workflowName, sourceKs, targetKs, diff --git a/go/test/endtoend/vreplication/partial_movetables_seq_test.go b/go/test/endtoend/vreplication/partial_movetables_seq_test.go index 6a1ed92cb9c..5d4a2c2d696 100644 --- a/go/test/endtoend/vreplication/partial_movetables_seq_test.go +++ b/go/test/endtoend/vreplication/partial_movetables_seq_test.go @@ -28,7 +28,6 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/wrangler" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) @@ -236,12 +235,12 @@ func (wf *workflow) create() { cell := wf.tc.defaultCellName switch typ { case "movetables": - currentWorkflowType = wrangler.MoveTablesWorkflow + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables sourceShards := strings.Join(wf.options.sourceShards, ",") err = tstWorkflowExec(t, cell, wf.name, wf.fromKeyspace, wf.toKeyspace, strings.Join(wf.options.tables, ","), workflowActionCreate, "", sourceShards, "", false) case "reshard": - currentWorkflowType = wrangler.ReshardWorkflow + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_Reshard sourceShards := strings.Join(wf.options.sourceShards, ",") targetShards := strings.Join(wf.options.targetShards, ",") if targetShards == "" { @@ -395,7 +394,7 @@ func TestPartialMoveTablesWithSequences(t *testing.T) { // Switch all traffic for the shard wf80Dash.switchTraffic() - expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\nStart State: Reads Not Switched. Writes Not Switched\nCurrent State: Reads partially switched, for shards: %s. Writes partially switched, for shards: %s\n\n", + expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\n\nStart State: Reads Not Switched. Writes Not Switched\nCurrent State: Reads partially switched, for shards: %s. Writes partially switched, for shards: %s\n\n", targetKs, wfName, shard, shard) require.Equal(t, expectedSwitchOutput, lastOutput) currentCustomerCount = getCustomerCount(t, "") @@ -455,7 +454,7 @@ func TestPartialMoveTablesWithSequences(t *testing.T) { wfDash80.create() wfDash80.switchTraffic() - expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\nStart State: Reads partially switched, for shards: 80-. Writes partially switched, for shards: 80-\nCurrent State: All Reads Switched. All Writes Switched\n\n", + expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\n\nStart State: Reads partially switched, for shards: 80-. Writes partially switched, for shards: 80-\nCurrent State: All Reads Switched. All Writes Switched\n\n", targetKs, wfName) require.Equal(t, expectedSwitchOutput, lastOutput) diff --git a/go/test/endtoend/vreplication/partial_movetables_test.go b/go/test/endtoend/vreplication/partial_movetables_test.go index 5583232fbdc..9719474afef 100644 --- a/go/test/endtoend/vreplication/partial_movetables_test.go +++ b/go/test/endtoend/vreplication/partial_movetables_test.go @@ -27,7 +27,6 @@ import ( "github.com/tidwall/gjson" "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/wrangler" ) // testCancel() starts and cancels a partial MoveTables for one of the shards which will be actually moved later on. @@ -103,6 +102,7 @@ func TestPartialMoveTablesBasic(t *testing.T) { defer vtgateConn.Close() defer vc.TearDown(t) setupMinimalCustomerKeyspace(t) + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables // Move customer table from unsharded product keyspace to // sharded customer keyspace. @@ -132,7 +132,7 @@ func TestPartialMoveTablesBasic(t *testing.T) { testCancel(t) - currentWorkflowType = wrangler.MoveTablesWorkflow + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables wfName := "partial80Dash" sourceKs := "customer" targetKs := "customer2" @@ -215,7 +215,7 @@ func TestPartialMoveTablesBasic(t *testing.T) { // Switch all traffic for the shard require.NoError(t, tstWorkflowExec(t, "", wfName, "", targetKs, "", workflowActionSwitchTraffic, "", "", "", false)) - expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\nStart State: Reads Not Switched. Writes Not Switched\nCurrent State: Reads partially switched, for shards: %s. Writes partially switched, for shards: %s\n\n", + expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\n\nStart State: Reads Not Switched. Writes Not Switched\nCurrent State: Reads partially switched, for shards: %s. Writes partially switched, for shards: %s\n\n", targetKs, wfName, shard, shard) require.Equal(t, expectedSwitchOutput, lastOutput) @@ -293,7 +293,7 @@ func TestPartialMoveTablesBasic(t *testing.T) { // Switch all traffic for the shard require.NoError(t, tstWorkflowExec(t, "", wfName, "", targetKs, "", workflowActionSwitchTraffic, "", "", "", false)) - expectedSwitchOutput = fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\nStart State: Reads partially switched, for shards: 80-. Writes partially switched, for shards: 80-\nCurrent State: All Reads Switched. All Writes Switched\n\n", + expectedSwitchOutput = fmt.Sprintf("SwitchTraffic was successful for workflow %s.%s\n\nStart State: Reads partially switched, for shards: 80-. Writes partially switched, for shards: 80-\nCurrent State: All Reads Switched. All Writes Switched\n\n", targetKs, wfName) require.Equal(t, expectedSwitchOutput, lastOutput) diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index 338310fdf14..93f43e9d1b6 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -17,6 +17,7 @@ limitations under the License. package vreplication import ( + "encoding/json" "fmt" "net" "strconv" @@ -58,7 +59,7 @@ var ( sourceTab, sourceReplicaTab, sourceRdonlyTab *cluster.VttabletProcess lastOutput string - currentWorkflowType wrangler.VReplicationWorkflowType + currentWorkflowType binlogdatapb.VReplicationWorkflowType ) func createReshardWorkflow(t *testing.T, sourceShards, targetShards string) error { @@ -93,54 +94,55 @@ func tstWorkflowAction(t *testing.T, action, tabletTypes, cells string) error { func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables, action, tabletTypes, sourceShards, targetShards string, atomicCopy bool) error { var args []string - if currentWorkflowType == wrangler.MoveTablesWorkflow { + if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables { args = append(args, "MoveTables") } else { args = append(args, "Reshard") } - args = append(args, "--") + args = append(args, "--workflow", workflow, "--target-keyspace", targetKs, action) - if BypassLagCheck { - args = append(args, "--max_replication_lag_allowed=2542087h") - } - if atomicCopy { - args = append(args, "--atomic-copy") - } switch action { case workflowActionCreate: - if currentWorkflowType == wrangler.MoveTablesWorkflow { - args = append(args, "--source", sourceKs) + if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables { + args = append(args, "--source-keyspace", sourceKs) if tables != "" { args = append(args, "--tables", tables) } else { - args = append(args, "--all") + args = append(args, "--all-tables") } if sourceShards != "" { - args = append(args, "--source_shards", sourceShards) + args = append(args, "--source-shards", sourceShards) } } else { - args = append(args, "--source_shards", sourceShards, "--target_shards", targetShards) + args = append(args, "--source-shards", sourceShards, "--target-shards", targetShards) } // Test new experimental --defer-secondary-keys flag switch currentWorkflowType { - case wrangler.MoveTablesWorkflow, wrangler.MigrateWorkflow, wrangler.ReshardWorkflow: - if !atomicCopy { - args = append(args, "--defer-secondary-keys") - } - args = append(args, "--initialize-target-sequences") // Only used for MoveTables + case binlogdatapb.VReplicationWorkflowType_MoveTables, binlogdatapb.VReplicationWorkflowType_Migrate, binlogdatapb.VReplicationWorkflowType_Reshard: + args = append(args, "--defer-secondary-keys") + } + } + if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables && action == workflowActionSwitchTraffic { + args = append(args, "--initialize-target-sequences") + } + if action == workflowActionSwitchTraffic || action == workflowActionReverseTraffic { + if BypassLagCheck { + args = append(args, "--max-replication-lag-allowed=2542087h") } + args = append(args, "--timeout=90s") } - if cells != "" { + if (action == workflowActionCreate || action == workflowActionSwitchTraffic || action == workflowActionReverseTraffic) && cells != "" { args = append(args, "--cells", cells) } - if tabletTypes != "" { - args = append(args, "--tablet_types", tabletTypes) + if action != workflowActionComplete && tabletTypes != "" { + args = append(args, "--tablet-types", tabletTypes) } - args = append(args, "--timeout", time.Minute.String()) - ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow) - args = append(args, action, ksWorkflow) - output, err := vc.VtctlClient.ExecuteCommandWithOutput(args...) + args = append(args, "--action_timeout=10m") // At this point something is up so fail the test + if debugMode { + t.Logf("Executing workflow command: vtctldclient %v", strings.Join(args, " ")) + } + output, err := vc.VtctldClient.ExecuteCommandWithOutput(args...) lastOutput = output if err != nil { return fmt.Errorf("%s: %s", err, output) @@ -263,8 +265,8 @@ func checkStates(t *testing.T, startState, endState string) { require.Contains(t, lastOutput, fmt.Sprintf("Current State: %s", endState)) } -func getCurrentState(t *testing.T) string { - if err := tstWorkflowAction(t, "GetState", "", ""); err != nil { +func getCurrentStatus(t *testing.T) string { + if err := tstWorkflowAction(t, "status", "", ""); err != nil { return err.Error() } return strings.TrimSpace(strings.Trim(lastOutput, "\n")) @@ -315,7 +317,7 @@ func testVSchemaForSequenceAfterMoveTables(t *testing.T) { // at this point the unsharded product and sharded customer keyspaces are created by previous tests // use MoveTables to move customer2 from product to customer using - currentWorkflowType = wrangler.MoveTablesWorkflow + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables err := tstWorkflowExec(t, defaultCellName, "wf2", sourceKs, targetKs, "customer2", workflowActionCreate, "", "", "", false) require.NoError(t, err) @@ -331,13 +333,13 @@ func testVSchemaForSequenceAfterMoveTables(t *testing.T) { require.NoError(t, err) // sanity check - output, err := vc.VtctlClient.ExecuteCommandWithOutput("GetVSchema", "product") + output, err := vc.VtctldClient.ExecuteCommandWithOutput("GetVSchema", "product") require.NoError(t, err) assert.NotContains(t, output, "customer2\"", "customer2 still found in keyspace product") waitForRowCount(t, vtgateConn, "customer", "customer2", 3) // check that customer2 has the sequence tag - output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetVSchema", "customer") + output, err = vc.VtctldClient.ExecuteCommandWithOutput("GetVSchema", "customer") require.NoError(t, err) assert.Contains(t, output, "\"sequence\": \"customer_seq2\"", "customer2 sequence missing in keyspace customer") @@ -365,12 +367,12 @@ func testVSchemaForSequenceAfterMoveTables(t *testing.T) { require.NoError(t, err) // sanity check - output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetVSchema", "product") + output, err = vc.VtctldClient.ExecuteCommandWithOutput("GetVSchema", "product") require.NoError(t, err) assert.Contains(t, output, "customer2\"", "customer2 not found in keyspace product ") // check that customer2 still has the sequence tag - output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetVSchema", "product") + output, err = vc.VtctldClient.ExecuteCommandWithOutput("GetVSchema", "product") require.NoError(t, err) assert.Contains(t, output, "\"sequence\": \"customer_seq2\"", "customer2 still found in keyspace product") @@ -406,7 +408,7 @@ func testReplicatingWithPKEnumCols(t *testing.T) { } func testReshardV2Workflow(t *testing.T) { - currentWorkflowType = wrangler.ReshardWorkflow + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_Reshard // create internal tables on the original customer shards that should be // ignored and not show up on the new shards @@ -415,9 +417,6 @@ func testReshardV2Workflow(t *testing.T) { createAdditionalCustomerShards(t, "-40,40-80,80-c0,c0-") createReshardWorkflow(t, "-80,80-", "-40,40-80,80-c0,c0-") - if !strings.Contains(lastOutput, "Workflow started successfully") { - t.Fail() - } validateReadsRouteToSource(t, "replica") validateWritesRouteToSource(t) @@ -433,16 +432,14 @@ func testReshardV2Workflow(t *testing.T) { } func testMoveTablesV2Workflow(t *testing.T) { - currentWorkflowType = wrangler.MoveTablesWorkflow + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables // test basic forward and reverse flows setupCustomerKeyspace(t) // The purge table should get skipped/ignored // If it's not then we'll get an error as the table doesn't exist in the vschema createMoveTablesWorkflow(t, "customer,loadtest,vdiff_order,reftable,_vt_PURGE_4f9194b43b2011eb8a0104ed332e05c2_20221210194431") - if !strings.Contains(lastOutput, "Workflow started successfully") { - t.Fail() - } + waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) validateReadsRouteToSource(t, "replica") validateWritesRouteToSource(t) @@ -457,26 +454,47 @@ func testMoveTablesV2Workflow(t *testing.T) { testRestOfWorkflow(t) - listAllArgs := []string{"workflow", "customer", "listall"} - output, _ := vc.VtctlClient.ExecuteCommandWithOutput(listAllArgs...) - require.Contains(t, output, "No workflows found in keyspace customer") + listOutputContainsWorkflow := func(output string, workflow string) bool { + workflows := []string{} + err := json.Unmarshal([]byte(output), &workflows) + require.NoError(t, err) + for _, w := range workflows { + if w == workflow { + return true + } + } + return false + } + listOutputIsEmpty := func(output string) bool { + workflows := []string{} + err := json.Unmarshal([]byte(output), &workflows) + require.NoError(t, err) + return len(workflows) == 0 + } + + listAllArgs := []string{"workflow", "--keyspace", "customer", "list"} + output, err := vc.VtctldClient.ExecuteCommandWithOutput(listAllArgs...) + require.NoError(t, err) + require.True(t, listOutputIsEmpty(output)) testVSchemaForSequenceAfterMoveTables(t) createMoveTablesWorkflow(t, "Lead,Lead-1") - output, _ = vc.VtctlClient.ExecuteCommandWithOutput(listAllArgs...) - require.Contains(t, output, "Following workflow(s) found in keyspace customer: wf1") + output, err = vc.VtctldClient.ExecuteCommandWithOutput(listAllArgs...) + require.NoError(t, err) + require.True(t, listOutputContainsWorkflow(output, "wf1")) - err := tstWorkflowCancel(t) + err = tstWorkflowCancel(t) require.NoError(t, err) - output, _ = vc.VtctlClient.ExecuteCommandWithOutput(listAllArgs...) - require.Contains(t, output, "No workflows found in keyspace customer") + output, err = vc.VtctldClient.ExecuteCommandWithOutput(listAllArgs...) + require.NoError(t, err) + require.True(t, listOutputIsEmpty(output)) } func testPartialSwitches(t *testing.T) { // nothing switched - require.Equal(t, getCurrentState(t), wrangler.WorkflowStateNotSwitched) + require.Contains(t, getCurrentStatus(t), wrangler.WorkflowStateNotSwitched) tstWorkflowSwitchReads(t, "replica,rdonly", "zone1") nextState := "Reads partially switched. Replica switched in cells: zone1. Rdonly switched in cells: zone1. Writes Not Switched" checkStates(t, wrangler.WorkflowStateNotSwitched, nextState) @@ -498,7 +516,7 @@ func testPartialSwitches(t *testing.T) { checkStates(t, nextState, nextState) // idempotency keyspace := "product" - if currentWorkflowType == wrangler.ReshardWorkflow { + if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_Reshard { keyspace = "customer" } waitForLowLag(t, keyspace, "wf1_reverse") @@ -535,7 +553,7 @@ func testRestOfWorkflow(t *testing.T) { // this function is called for both MoveTables and Reshard, so the reverse workflows exist in different keyspaces keyspace := "product" - if currentWorkflowType == wrangler.ReshardWorkflow { + if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_Reshard { keyspace = "customer" } waitForLowLag(t, keyspace, "wf1_reverse") @@ -712,8 +730,11 @@ func switchReadsNew(t *testing.T, workflowType, cells, ksWorkflow string, revers if reverse { command = "ReverseTraffic" } - output, err := vc.VtctlClient.ExecuteCommandWithOutput(workflowType, "--", "--cells="+cells, - "--tablet_types=rdonly,replica", command, ksWorkflow) + parts := strings.Split(ksWorkflow, ".") + require.Len(t, parts, 2) + ks, wf := parts[0], parts[1] + output, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", wf, "--target-keyspace", ks, command, + "--cells", cells, "--tablet-types=rdonly,replica") require.NoError(t, err, fmt.Sprintf("SwitchReads Error: %s: %s", err, output)) if output != "" { fmt.Printf("SwitchReads output: %s\n", output) @@ -836,7 +857,7 @@ func createAdditionalCustomerShards(t *testing.T, shards string) { } func tstApplySchemaOnlineDDL(t *testing.T, sql string, keyspace string) { - err := vc.VtctlClient.ExecuteCommand("ApplySchema", "--", "--ddl_strategy=online", + err := vc.VtctldClient.ExecuteCommand("ApplySchema", "--ddl-strategy=online", "--sql", sql, keyspace) require.NoError(t, err, fmt.Sprintf("ApplySchema Error: %s", err)) } diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index f4e52c14a81..cd7c3481eea 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -2966,13 +2966,16 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc } } + cellsStr := strings.Join(req.Cells, ",") + // Consistently handle errors by logging and returning them. handleError := func(message string, err error) (*[]string, error) { - ts.Logger().Error(err) - return nil, err + werr := vterrors.Wrapf(err, message) + ts.Logger().Error(werr) + return nil, werr } - log.Infof("Switching reads: %s.%s tablet types: %s, cells: %s, workflow state: %s", ts.targetKeyspace, ts.workflow, roTypesToSwitchStr, ts.optCells, state.String()) + log.Infof("Switching reads: %s.%s tablet types: %s, cells: %s, workflow state: %s", ts.targetKeyspace, ts.workflow, roTypesToSwitchStr, cellsStr, state.String()) if !switchReplica && !switchRdonly { return handleError("invalid tablet types", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "tablet types must be REPLICA or RDONLY: %s", roTypesToSwitchStr)) } @@ -2984,11 +2987,6 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc return handleError("invalid request", vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "requesting reversal of SwitchReads for RDONLYs but RDONLY reads have not been switched")) } } - var cells = req.Cells - // If no cells were provided in the command then use the value from the workflow. - if len(cells) == 0 && ts.optCells != "" { - cells = strings.Split(strings.TrimSpace(ts.optCells), ",") - } // If there are no rdonly tablets in the cells ask to switch rdonly tablets as well so that routing rules // are updated for rdonly as well. Otherwise vitess will not know that the workflow has completed and will @@ -2996,7 +2994,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc // rdonly tablets. if switchReplica && !switchRdonly { var err error - rdonlyTabletsExist, err := topotools.DoCellsHaveRdonlyTablets(ctx, s.ts, cells) + rdonlyTabletsExist, err := topotools.DoCellsHaveRdonlyTablets(ctx, s.ts, req.Cells) if err != nil { return nil, err } @@ -3034,20 +3032,20 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { if ts.isPartialMigration { ts.Logger().Infof("Partial migration, skipping switchTableReads as traffic is all or nothing per shard and overridden for reads AND writes in the ShardRoutingRule created when switching writes.") - } else if err := sw.switchTableReads(ctx, cells, roTabletTypes, direction); err != nil { + } else if err := sw.switchTableReads(ctx, req.Cells, roTabletTypes, direction); err != nil { return handleError("failed to switch read traffic for the tables", err) } return sw.logs(), nil } - ts.Logger().Infof("About to switchShardReads: %+v, %+s, %+v", cells, roTypesToSwitchStr, direction) - if err := sw.switchShardReads(ctx, cells, roTabletTypes, direction); err != nil { + ts.Logger().Infof("About to switchShardReads: cells: %s, tablet types: %s, direction: %d", cellsStr, roTypesToSwitchStr, direction) + if err := sw.switchShardReads(ctx, req.Cells, roTabletTypes, direction); err != nil { return handleError("failed to switch read traffic for the shards", err) } - ts.Logger().Infof("switchShardReads Completed: %+v, %+s, %+v", cells, roTypesToSwitchStr, direction) - if err := s.ts.ValidateSrvKeyspace(ctx, ts.targetKeyspace, strings.Join(cells, ",")); err != nil { + ts.Logger().Infof("switchShardReads Completed: cells: %s, tablet types: %s, direction: %d", cellsStr, roTypesToSwitchStr, direction) + if err := s.ts.ValidateSrvKeyspace(ctx, ts.targetKeyspace, cellsStr); err != nil { err2 := vterrors.Wrapf(err, "after switching shard reads, found SrvKeyspace for %s is corrupt in cell %s", - ts.targetKeyspace, strings.Join(cells, ",")) + ts.targetKeyspace, cellsStr) return handleError("failed to validate SrvKeyspace record", err2) } return sw.logs(), nil @@ -3066,7 +3064,7 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit // Consistently handle errors by logging and returning them. handleError := func(message string, err error) (int64, *[]string, error) { - werr := vterrors.Errorf(vtrpcpb.Code_INTERNAL, fmt.Sprintf("%s: %v", message, err)) + werr := vterrors.Wrapf(err, message) ts.Logger().Error(werr) return 0, nil, werr } diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index ce18e55fc51..a102c3162f3 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -543,15 +543,12 @@ func (ts *trafficSwitcher) dropSourceShards(ctx context.Context) error { } func (ts *trafficSwitcher) switchShardReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, direction TrafficSwitchDirection) error { - var fromShards, toShards []*topo.ShardInfo - if direction == DirectionForward { - fromShards, toShards = ts.SourceShards(), ts.TargetShards() - } else { - fromShards, toShards = ts.TargetShards(), ts.SourceShards() - } - if err := ts.TopoServer().ValidateSrvKeyspace(ctx, ts.TargetKeyspaceName(), strings.Join(cells, ",")); err != nil { + cellsStr := strings.Join(cells, ",") + log.Infof("switchShardReads: cells: %s, tablet types: %+v, direction %d", cellsStr, servedTypes, direction) + fromShards, toShards := ts.SourceShards(), ts.TargetShards() + if err := ts.TopoServer().ValidateSrvKeyspace(ctx, ts.TargetKeyspaceName(), cellsStr); err != nil { err2 := vterrors.Wrapf(err, "Before switching shard reads, found SrvKeyspace for %s is corrupt in cell %s", - ts.TargetKeyspaceName(), strings.Join(cells, ",")) + ts.TargetKeyspaceName(), cellsStr) log.Errorf("%w", err2) return err2 } @@ -567,9 +564,9 @@ func (ts *trafficSwitcher) switchShardReads(ctx context.Context, cells []string, return err } } - if err := ts.TopoServer().ValidateSrvKeyspace(ctx, ts.TargetKeyspaceName(), strings.Join(cells, ",")); err != nil { + if err := ts.TopoServer().ValidateSrvKeyspace(ctx, ts.TargetKeyspaceName(), cellsStr); err != nil { err2 := vterrors.Wrapf(err, "after switching shard reads, found SrvKeyspace for %s is corrupt in cell %s", - ts.TargetKeyspaceName(), strings.Join(cells, ",")) + ts.TargetKeyspaceName(), cellsStr) log.Errorf("%w", err2) return err2 } @@ -577,7 +574,7 @@ func (ts *trafficSwitcher) switchShardReads(ctx context.Context, cells []string, } func (ts *trafficSwitcher) switchTableReads(ctx context.Context, cells []string, servedTypes []topodatapb.TabletType, direction TrafficSwitchDirection) error { - log.Infof("switchTableReads: servedTypes: %+v, direction %t", servedTypes, direction) + log.Infof("switchTableReads: cells: %s, tablet types: %+v, direction %d", strings.Join(cells, ","), servedTypes, direction) rules, err := topotools.GetRoutingRules(ctx, ts.TopoServer()) if err != nil { return err @@ -935,7 +932,7 @@ func (ts *trafficSwitcher) waitForCatchup(ctx context.Context, filteredReplicati }); err != nil { return err } - // all targets have caught up, record their positions for setting up reverse workflows + // All targets have caught up, record their positions for setting up reverse workflows. return ts.ForAllTargets(func(target *MigrationTarget) error { var err error target.Position, err = ts.TabletManagerClient().PrimaryPosition(ctx, target.GetPrimary().Tablet) @@ -1001,7 +998,7 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat err = ts.changeShardsAccess(ctx, ts.SourceKeyspaceName(), ts.SourceShards(), allowWrites) } if err != nil { - ts.Logger().Errorf("Cancel migration failed:", err) + ts.Logger().Errorf("Cancel migration failed: %v", err) } sm.CancelMigration(ctx) @@ -1360,7 +1357,7 @@ func (ts *trafficSwitcher) findSequenceUsageInKeyspace(vschema *vschemapb.Keyspa for _, table := range ts.Tables() { vs, ok := vschema.Tables[table] - if !ok || vs == nil || vs.AutoIncrement == nil || vs.AutoIncrement.Sequence == "" { + if !ok || vs.GetAutoIncrement().GetSequence() == "" { continue } sm := &sequenceMetadata{ @@ -1402,7 +1399,7 @@ func (ts *trafficSwitcher) findSequenceUsageInKeyspace(vschema *vschemapb.Keyspa // the primary tablet serving the sequence to refresh/reset its cache to // be sure that it does not provide a value that is less than the current max. func (ts *trafficSwitcher) initializeTargetSequences(ctx context.Context, sequencesByBackingTable map[string]*sequenceMetadata) error { - initSequenceTable := func(ictx context.Context, sequenceTableName string, sequenceMetadata *sequenceMetadata) error { + initSequenceTable := func(ictx context.Context, sequenceMetadata *sequenceMetadata) error { // Now we need to run this query on the target shards in order // to get the max value and set the next id for the sequence to // a higher value. @@ -1449,6 +1446,10 @@ func (ts *trafficSwitcher) initializeTargetSequences(ctx context.Context, sequen return ictx.Err() default: } + if len(shardResults) == 0 { // This should never happen + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "did not get any results for the max used sequence value for target table %s.%s in order to initialize the backing sequence table", + ts.targetKeyspace, sequenceMetadata.usingTableName) + } // Sort the values to find the max value across all shards. sort.Slice(shardResults, func(i, j int) bool { return shardResults[i] < shardResults[j] @@ -1483,12 +1484,7 @@ func (ts *trafficSwitcher) initializeTargetSequences(ctx context.Context, sequen ) // Now execute this on the primary tablet of the unsharded keyspace // housing the backing table. - primaryTablet, ierr := ts.TopoServer().GetTablet(ictx, sequenceShard.PrimaryAlias) - if ierr != nil { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get the primary tablet for %s.%s using alias %s: %v", - sequenceShard.Keyspace(), sequenceShard.ShardName(), sequenceShard.PrimaryAlias, ierr) - } - qr, ierr := ts.ws.tmc.ExecuteFetchAsApp(ictx, primaryTablet.Tablet, true, &tabletmanagerdatapb.ExecuteFetchAsAppRequest{ + qr, ierr := ts.ws.tmc.ExecuteFetchAsApp(ictx, sequenceTablet.Tablet, true, &tabletmanagerdatapb.ExecuteFetchAsAppRequest{ Query: []byte(query.Query), MaxRows: 1, }) @@ -1523,10 +1519,10 @@ func (ts *trafficSwitcher) initializeTargetSequences(ctx context.Context, sequen } initGroup, gctx := errgroup.WithContext(ctx) - for sequenceTableName, sequenceMetadata := range sequencesByBackingTable { - sequenceTableName, sequenceMetadata := sequenceTableName, sequenceMetadata // https://golang.org/doc/faq#closures_and_goroutines + for _, sequenceMetadata := range sequencesByBackingTable { + sequenceMetadata := sequenceMetadata // https://golang.org/doc/faq#closures_and_goroutines initGroup.Go(func() error { - return initSequenceTable(gctx, sequenceTableName, sequenceMetadata) + return initSequenceTable(gctx, sequenceMetadata) }) } return initGroup.Wait() diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication.go b/go/vt/vttablet/tabletmanager/rpc_vreplication.go index b18caa1063f..d2d10979cad 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication.go @@ -63,7 +63,7 @@ func (tm *TabletManager) CreateVReplicationWorkflow(ctx context.Context, req *ta } // Use the local cell if none are specified. if len(req.Cells) == 0 || strings.TrimSpace(req.Cells[0]) == "" { - req.Cells = append(req.Cells, tm.Tablet().Alias.Cell) + req.Cells = []string{tm.Tablet().Alias.Cell} } wfState := binlogdatapb.VReplicationWorkflowState_Stopped.String() tabletTypesStr := topoproto.MakeStringTypeCSV(req.TabletTypes) diff --git a/test/config.json b/test/config.json index 6d2c59038d2..088f3ac6de0 100644 --- a/test/config.json +++ b/test/config.json @@ -1229,12 +1229,21 @@ "RetryMax": 1, "Tags": [] }, + "vreplication_vtctldclient_cli": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVtctldclientCLI", "-timeout", "20m"], + "Command": [], + "Manual": false, + "Shard": "vreplication_cli_migrate_vdiff2_convert_tz", + "RetryMax": 1, + "Tags": [] + }, "vreplication_vtctl_migrate": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVtctlMigrate", "-timeout", "30m"], "Command": [], "Manual": false, - "Shard": "vreplication_migrate_vdiff2_convert_tz", + "Shard": "vreplication_cli_migrate_vdiff2_convert_tz", "RetryMax": 1, "Tags": [] }, @@ -1243,7 +1252,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVtctldMigrate", "-timeout", "30m"], "Command": [], "Manual": false, - "Shard": "vreplication_migrate_vdiff2_convert_tz", + "Shard": "vreplication_cli_migrate_vdiff2_convert_tz", "RetryMax": 1, "Tags": [] }, @@ -1252,7 +1261,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVDiff2", "-timeout", "20m"], "Command": [], "Manual": false, - "Shard": "vreplication_migrate_vdiff2_convert_tz", + "Shard": "vreplication_cli_migrate_vdiff2_convert_tz", "RetryMax": 1, "Tags": [] }, @@ -1261,7 +1270,7 @@ "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestMoveTablesTZ"], "Command": [], "Manual": false, - "Shard": "vreplication_migrate_vdiff2_convert_tz", + "Shard": "vreplication_cli_migrate_vdiff2_convert_tz", "RetryMax": 1, "Tags": [] },