diff --git a/go/test/endtoend/vreplication/migrate_test.go b/go/test/endtoend/vreplication/migrate_test.go index 57ec8238d2b..c6518f0fdec 100644 --- a/go/test/endtoend/vreplication/migrate_test.go +++ b/go/test/endtoend/vreplication/migrate_test.go @@ -18,10 +18,13 @@ package vreplication import ( "fmt" + "strings" "testing" "github.com/tidwall/gjson" + "vitess.io/vitess/go/test/endtoend/cluster" + "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql" @@ -165,11 +168,18 @@ func TestVtctlMigrate(t *testing.T) { // However now we need to create an external Vitess cluster. For this we need a different VTDATAROOT and // hence the VTDATAROOT env variable gets overwritten. // Each time we need to create vt processes in the "other" cluster we need to set the appropriate VTDATAROOT -func TestVtctldMigrate(t *testing.T) { +func TestVtctldMigrateUnsharded(t *testing.T) { vc = NewVitessCluster(t, nil) + oldDefaultReplicas := defaultReplicas + oldDefaultRdonly := defaultRdonly defaultReplicas = 0 defaultRdonly = 0 + defer func() { + defaultReplicas = oldDefaultReplicas + defaultRdonly = oldDefaultRdonly + }() + defer vc.TearDown() defaultCell := vc.Cells[vc.CellNames[0]] @@ -299,3 +309,80 @@ func TestVtctldMigrate(t *testing.T) { require.Errorf(t, err, "there is no vitess cluster named ext1") }) } + +// TestVtctldMigrate adds a test for a sharded cluster to validate a fix for a bug where the target keyspace name +// doesn't match that of the source cluster. The test migrates from a cluster with keyspace customer to an "external" +// cluster with keyspace rating. +func TestVtctldMigrateSharded(t *testing.T) { + oldDefaultReplicas := defaultReplicas + oldDefaultRdonly := defaultRdonly + defaultReplicas = 1 + defaultRdonly = 1 + defer func() { + defaultReplicas = oldDefaultReplicas + defaultRdonly = oldDefaultRdonly + }() + + setSidecarDBName("_vt") + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables + vc = setupCluster(t) + vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) + defer vtgateConn.Close() + defer vc.TearDown() + setupCustomerKeyspace(t) + createMoveTablesWorkflow(t, "customer,Lead,datze,customer2") + tstWorkflowSwitchReadsAndWrites(t) + tstWorkflowComplete(t) + + var err error + // create external cluster + extCell := "extcell1" + extCells := []string{extCell} + extVc := NewVitessCluster(t, &clusterOptions{ + cells: extCells, + clusterConfig: externalClusterConfig, + }) + defer extVc.TearDown() + + setupExtKeyspace(t, extVc, "rating", extCell) + err = cluster.WaitForHealthyShard(extVc.VtctldClient, "rating", "-80") + require.NoError(t, err) + err = cluster.WaitForHealthyShard(extVc.VtctldClient, "rating", "80-") + require.NoError(t, err) + verifyClusterHealth(t, extVc) + extVtgateConn := getConnection(t, extVc.ClusterConfig.hostname, extVc.ClusterConfig.vtgateMySQLPort) + defer extVtgateConn.Close() + + currentWorkflowType = binlogdatapb.VReplicationWorkflowType_Migrate + var output string + if output, err = extVc.VtctldClient.ExecuteCommandWithOutput("Mount", "register", "--name=external", "--topo-type=etcd2", + fmt.Sprintf("--topo-server=localhost:%d", vc.ClusterConfig.topoPort), "--topo-root=/vitess/global"); err != nil { + require.FailNow(t, "Mount command failed with %+v : %s\n", err, output) + } + ksWorkflow := "rating.e1" + if output, err = extVc.VtctldClient.ExecuteCommandWithOutput("Migrate", + "--target-keyspace", "rating", "--workflow", "e1", + "create", "--source-keyspace", "customer", "--mount-name", "external", "--all-tables", "--cells=zone1", + "--tablet-types=primary,replica"); err != nil { + require.FailNow(t, "Migrate command failed with %+v : %s\n", err, output) + } + waitForWorkflowState(t, extVc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) + // this is because currently doVtctldclientVDiff is using the global vc :-( and we want to run a diff on the extVc cluster + vc = extVc + doVtctldclientVDiff(t, "rating", "e1", "zone1", nil) +} + +func setupExtKeyspace(t *testing.T, vc *VitessCluster, ksName, cellName string) { + numReplicas := 1 + shards := []string{"-80", "80-"} + if _, err := vc.AddKeyspace(t, []*Cell{vc.Cells[cellName]}, ksName, strings.Join(shards, ","), + customerVSchema, customerSchema, numReplicas, 0, 1200, nil); err != nil { + t.Fatal(err) + } + vtgate := vc.Cells[cellName].Vtgates[0] + for _, shard := range shards { + err := cluster.WaitForHealthyShard(vc.VtctldClient, ksName, shard) + require.NoError(t, err) + require.NoError(t, vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", ksName, shard), numReplicas, waitTimeout)) + } +} diff --git a/go/vt/vtctl/workflow/materializer.go b/go/vt/vtctl/workflow/materializer.go index ea8b75c41c8..3d0ca674e02 100644 --- a/go/vt/vtctl/workflow/materializer.go +++ b/go/vt/vtctl/workflow/materializer.go @@ -227,7 +227,20 @@ func (mz *materializer) generateBinlogSources(ctx context.Context, targetShard * for _, mappedCol := range mappedCols { subExprs = append(subExprs, mappedCol) } - vindexName := fmt.Sprintf("%s.%s", mz.ms.TargetKeyspace, cv.Name) + var vindexName string + if mz.workflowType == binlogdatapb.VReplicationWorkflowType_Migrate { + // For a Migrate, if the TargetKeyspace name is different from the SourceKeyspace name, we need to use the + // SourceKeyspace name to determine the vindex since the TargetKeyspace name is not known to the source. + // Note: it is expected that the source and target keyspaces have the same vindex name and data type. + keyspace := mz.ms.TargetKeyspace + if mz.ms.ExternalCluster != "" { + keyspace = mz.ms.SourceKeyspace + } + vindexName = fmt.Sprintf("%s.%s", keyspace, cv.Name) + } else { + vindexName = fmt.Sprintf("%s.%s", mz.ms.TargetKeyspace, cv.Name) + } + subExprs = append(subExprs, sqlparser.NewStrLiteral(vindexName)) subExprs = append(subExprs, sqlparser.NewStrLiteral(key.KeyRangeString(targetShard.KeyRange))) inKeyRange := &sqlparser.FuncExpr{ diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 8443b2098f5..f4fb4a354fb 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -2168,7 +2168,6 @@ func (s *Server) WorkflowStatus(ctx context.Context, req *vtctldatapb.WorkflowSt if err != nil { return nil, err } - // The stream key is target keyspace/tablet alias, e.g. 0/test-0000000100. // We sort the keys for intuitive and consistent output. streamKeys := make([]string, 0, len(workflow.ShardStreams)) @@ -2224,9 +2223,13 @@ func (s *Server) WorkflowStatus(ctx context.Context, req *vtctldatapb.WorkflowSt return resp, nil } -// GetCopyProgress returns the progress of all tables being copied in the -// workflow. +// GetCopyProgress returns the progress of all tables being copied in the workflow. func (s *Server) GetCopyProgress(ctx context.Context, ts *trafficSwitcher, state *State) (*copyProgress, error) { + if ts.workflowType == binlogdatapb.VReplicationWorkflowType_Migrate { + // The logic below expects the source primaries to be in the same cluster as the target. + // For now we don't report progress for Migrate workflows. + return nil, nil + } getTablesQuery := "select distinct table_name from _vt.copy_state cs, _vt.vreplication vr where vr.id = cs.vrepl_id and vr.id = %d" getRowCountQuery := "select table_name, table_rows, data_length from information_schema.tables where table_schema = %s and table_name in (%s)" tables := make(map[string]bool) diff --git a/go/vt/wrangler/materializer.go b/go/vt/wrangler/materializer.go index f777ddae2e8..7e24945cde7 100644 --- a/go/vt/wrangler/materializer.go +++ b/go/vt/wrangler/materializer.go @@ -1398,7 +1398,19 @@ func (mz *materializer) generateInserts(ctx context.Context, sourceShards []*top for _, mappedCol := range mappedCols { subExprs = append(subExprs, mappedCol) } - vindexName := fmt.Sprintf("%s.%s", mz.ms.TargetKeyspace, cv.Name) + var vindexName string + if mz.getWorkflowType() == binlogdatapb.VReplicationWorkflowType_Migrate { + // For a Migrate, if the TargetKeyspace name is different from the SourceKeyspace name, we need to use the + // SourceKeyspace name to determine the vindex since the TargetKeyspace name is not known to the source. + // Note: it is expected that the source and target keyspaces have the same vindex name and data type. + keyspace := mz.ms.TargetKeyspace + if mz.ms.ExternalCluster != "" { + keyspace = mz.ms.SourceKeyspace + } + vindexName = fmt.Sprintf("%s.%s", keyspace, cv.Name) + } else { + vindexName = fmt.Sprintf("%s.%s", mz.ms.TargetKeyspace, cv.Name) + } subExprs = append(subExprs, sqlparser.NewStrLiteral(vindexName)) subExprs = append(subExprs, sqlparser.NewStrLiteral("{{.keyrange}}")) inKeyRange := &sqlparser.FuncExpr{ diff --git a/test/config.json b/test/config.json index d7abad8452b..185201cf3e0 100644 --- a/test/config.json +++ b/test/config.json @@ -1051,7 +1051,7 @@ }, "vreplication_materialize": { "File": "unused.go", - "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestMaterialize"], + "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "Materialize"], "Command": [], "Manual": false, "Shard": "vreplication_partial_movetables_and_materialize",