diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index d0d36cbbc28..4c72781df29 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -1456,7 +1456,7 @@ func reshardAction(t *testing.T, action, workflow, keyspaceName, sourceShards, t action, workflow, output) } if err != nil { - t.Fatalf("Reshard %s command failed with %+v\n", action, err) + t.Fatalf("Reshard %s command failed with %+v\nOutput: %s", action, err, output) } } diff --git a/go/test/endtoend/vreplication/vstream_test.go b/go/test/endtoend/vreplication/vstream_test.go index dee8243d5e9..e13c3e24e80 100644 --- a/go/test/endtoend/vreplication/vstream_test.go +++ b/go/test/endtoend/vreplication/vstream_test.go @@ -223,7 +223,7 @@ func insertRow(keyspace, table string, id int) { vtgateConn.ExecuteFetch("begin", 1000, false) _, err := vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s (name) values ('%s%d')", table, table, id), 1000, false) if err != nil { - log.Infof("error inserting row %d: %v", id, err) + log.Errorf("error inserting row %d: %v", id, err) } vtgateConn.ExecuteFetch("commit", 1000, false) } @@ -387,13 +387,15 @@ func testVStreamCopyMultiKeyspaceReshard(t *testing.T, baseTabletID int) numEven defer vc.TearDown() defaultCell := vc.Cells[vc.CellNames[0]] - vc.AddKeyspace(t, []*Cell{defaultCell}, "unsharded", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID+100, nil) + _, err := vc.AddKeyspace(t, []*Cell{defaultCell}, "unsharded", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID+100, nil) + require.NoError(t, err) vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) defer vtgateConn.Close() verifyClusterHealth(t, vc) - vc.AddKeyspace(t, []*Cell{defaultCell}, "sharded", "-80,80-", vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+200, nil) + _, err = vc.AddKeyspace(t, []*Cell{defaultCell}, "sharded", "-80,80-", vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+200, nil) + require.NoError(t, err) ctx := context.Background() vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort)) @@ -512,6 +514,196 @@ func testVStreamCopyMultiKeyspaceReshard(t *testing.T, baseTabletID int) numEven return ne } +// Validate that we can resume a VStream when the keyspace has been resharded +// while not streaming. Ensure that there we successfully transition from the +// old shards -- which are in the VGTID from the previous stream -- and that +// we miss no row events during the process. +func TestMultiVStreamsKeyspaceReshard(t *testing.T) { + ctx := context.Background() + ks := "testks" + wf := "multiVStreamsKeyspaceReshard" + baseTabletID := 100 + tabletType := topodatapb.TabletType_PRIMARY.String() + oldShards := "-80,80-" + newShards := "-40,40-80,80-c0,c0-" + oldShardRowEvents, newShardRowEvents := 0, 0 + vc = NewVitessCluster(t, nil) + defer vc.TearDown() + defaultCell := vc.Cells[vc.CellNames[0]] + ogdr := defaultReplicas + defaultReplicas = 0 // Because of CI resource constraints we can only run this test with primary tablets + defer func(dr int) { defaultReplicas = dr }(ogdr) + + // For our sequences etc. + _, err := vc.AddKeyspace(t, []*Cell{defaultCell}, "global", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID, nil) + require.NoError(t, err) + + // Setup the keyspace with our old/original shards. + keyspace, err := vc.AddKeyspace(t, []*Cell{defaultCell}, ks, oldShards, vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+1000, nil) + require.NoError(t, err) + + // Add the new shards. + err = vc.AddShards(t, []*Cell{defaultCell}, keyspace, newShards, defaultReplicas, defaultRdonly, baseTabletID+2000, targetKsOpts) + require.NoError(t, err) + + vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) + defer vtgateConn.Close() + verifyClusterHealth(t, vc) + + vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort)) + require.NoError(t, err) + defer vstreamConn.Close() + + // Ensure that we're starting with a clean slate. + _, err = vtgateConn.ExecuteFetch(fmt.Sprintf("delete from %s.customer", ks), 1000, false) + require.NoError(t, err) + + // Coordinate go-routines. + streamCtx, streamCancel := context.WithTimeout(ctx, 1*time.Minute) + defer streamCancel() + done := make(chan struct{}) + + // First goroutine that keeps inserting rows into the table being streamed until the + // stream context is cancelled. + go func() { + id := 1 + for { + select { + case <-streamCtx.Done(): + // Give the VStream a little catch-up time before telling it to stop + // via the done channel. + time.Sleep(10 * time.Second) + close(done) + return + default: + insertRow(ks, "customer", id) + time.Sleep(250 * time.Millisecond) + id++ + } + } + }() + + // Create the Reshard workflow and wait for it to finish the copy phase. + reshardAction(t, "Create", wf, ks, oldShards, newShards, defaultCellName, tabletType) + waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", ks, wf), binlogdatapb.VReplicationWorkflowState_Running.String()) + + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: "/.*", // Match all keyspaces just to be more realistic. + }}} + + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + // Only stream the customer table and its sequence backing table. + Match: "/customer.*", + }}, + } + flags := &vtgatepb.VStreamFlags{} + + // Stream events but stop once we have a VGTID with positions for the old/original shards. + var newVGTID *binlogdatapb.VGtid + func() { + var reader vtgateconn.VStreamReader + reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags) + require.NoError(t, err) + for { + evs, err := reader.Recv() + + switch err { + case nil: + for _, ev := range evs { + switch ev.Type { + case binlogdatapb.VEventType_ROW: + shard := ev.GetRowEvent().GetShard() + switch shard { + case "-80", "80-": + oldShardRowEvents++ + case "0": + // We expect some for the sequence backing table, but don't care. + default: + require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard)) + } + case binlogdatapb.VEventType_VGTID: + newVGTID = ev.GetVgtid() + if len(newVGTID.GetShardGtids()) == 3 { + // We want a VGTID with a position for the global shard and the old shards. + canStop := true + for _, sg := range newVGTID.GetShardGtids() { + if sg.GetGtid() == "" { + canStop = false + } + } + if canStop { + return + } + } + } + } + default: + require.FailNow(t, fmt.Sprintf("VStream returned unexpected error: %v", err)) + } + select { + case <-streamCtx.Done(): + return + default: + } + } + }() + + // Confirm that we have shard GTIDs for the global shard and the old/original shards. + require.Len(t, newVGTID.GetShardGtids(), 3) + + // Switch the traffic to the new shards. + reshardAction(t, "SwitchTraffic", wf, ks, oldShards, newShards, defaultCellName, tabletType) + + // Now start a new VStream from our previous VGTID which only has the old/original shards. + func() { + var reader vtgateconn.VStreamReader + reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, newVGTID, filter, flags) + require.NoError(t, err) + for { + evs, err := reader.Recv() + + switch err { + case nil: + for _, ev := range evs { + switch ev.Type { + case binlogdatapb.VEventType_ROW: + shard := ev.RowEvent.Shard + switch shard { + case "-80", "80-": + oldShardRowEvents++ + case "-40", "40-80", "80-c0", "c0-": + newShardRowEvents++ + case "0": + // Again, we expect some for the sequence backing table, but don't care. + default: + require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard)) + } + } + } + default: + require.FailNow(t, fmt.Sprintf("VStream returned unexpected error: %v", err)) + } + select { + case <-done: + return + default: + } + } + }() + + // We should have a mix of events across the old and new shards. + require.NotZero(t, oldShardRowEvents) + require.NotZero(t, newShardRowEvents) + + // The number of row events streamed by the VStream API should match the number of rows inserted. + customerResult := execVtgateQuery(t, vtgateConn, ks, "select count(*) from customer") + customerCount, err := customerResult.Rows[0][0].ToInt64() + require.NoError(t, err) + require.Equal(t, customerCount, int64(oldShardRowEvents+newShardRowEvents)) +} + func TestVStreamFailover(t *testing.T) { testVStreamWithFailover(t, true) } diff --git a/go/vt/topo/faketopo/faketopo.go b/go/vt/topo/faketopo/faketopo.go index 8601d28f5b6..69ccf08a969 100644 --- a/go/vt/topo/faketopo/faketopo.go +++ b/go/vt/topo/faketopo/faketopo.go @@ -21,12 +21,11 @@ import ( "strings" "sync" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" - "vitess.io/vitess/go/vt/log" topodatapb "vitess.io/vitess/go/vt/proto/topodata" - - "vitess.io/vitess/go/vt/topo" ) // FakeFactory implements the Factory interface. This is supposed to be used only for testing diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 08553969a50..d520bec1dbf 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -25,8 +25,11 @@ import ( "sync" "time" + "golang.org/x/exp/maps" + "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/discovery" + "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/srvtopo" @@ -497,24 +500,40 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha var err error cells := vs.getCells() - tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.Keyspace, sgtid.Shard, vs.tabletType.String(), vs.tabletPickerOptions, ignoreTablets...) + tpo := vs.tabletPickerOptions + resharded, err := vs.keyspaceHasBeenResharded(ctx, sgtid.Keyspace) if err != nil { - log.Errorf(err.Error()) - return err + return vterrors.Wrapf(err, "failed to determine if keyspace %s has been resharded", sgtid.Keyspace) + } + if resharded { + // The non-serving tablet in the old / non-serving shard will contain all of + // the GTIDs that we need before transitioning to the new shards along with + // the journal event that will then allow us to automatically transition to + // the new shards (provided the stop_on_reshard option is not set). + tpo.IncludeNonServingTablets = true } + tabletPickerErr := func(err error) error { + tperr := vterrors.Wrapf(err, "failed to find a %s tablet for VStream in %s/%s within the %s cell(s)", + vs.tabletType.String(), sgtid.GetKeyspace(), sgtid.GetShard(), strings.Join(cells, ",")) + log.Errorf("%v", tperr) + return tperr + } + tp, err := discovery.NewTabletPicker(ctx, vs.ts, cells, vs.vsm.cell, sgtid.GetKeyspace(), sgtid.GetShard(), vs.tabletType.String(), tpo, ignoreTablets...) + if err != nil { + return tabletPickerErr(err) + } // Create a child context with a stricter timeout when picking a tablet. // This will prevent hanging in the case no tablets are found. tpCtx, tpCancel := context.WithTimeout(ctx, tabletPickerContextTimeout) defer tpCancel() - tablet, err := tp.PickForStreaming(tpCtx) if err != nil { - log.Errorf(err.Error()) - return err + return tabletPickerErr(err) } - log.Infof("Picked tablet %s for for %s/%s/%s/%s", tablet.Alias.String(), strings.Join(cells, ","), - sgtid.Keyspace, sgtid.Shard, vs.tabletType.String()) + log.Infof("Picked a %s tablet for VStream in %s/%s within the %s cell(s)", + vs.tabletType.String(), sgtid.GetKeyspace(), sgtid.GetShard(), strings.Join(cells, ",")) + target := &querypb.Target{ Keyspace: sgtid.Keyspace, Shard: sgtid.Shard, @@ -737,7 +756,7 @@ func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, e if err := vs.getError(); err != nil { return err } - // convert all gtids to vgtids. This should be done here while holding the lock. + // Convert all gtids to vgtids. This should be done here while holding the lock. for j, event := range events { if event.Type == binlogdatapb.VEventType_GTID { // Update the VGtid and send that instead. @@ -921,3 +940,56 @@ func (vs *vstream) getJournalEvent(ctx context.Context, sgtid *binlogdatapb.Shar close(je.done) return je, nil } + +// keyspaceHasBeenResharded returns true if the keyspace's serving shard set has changed +// since the last VStream as indicated by the shard definitions provided in the VGTID. +func (vs *vstream) keyspaceHasBeenResharded(ctx context.Context, keyspace string) (bool, error) { + shards, err := vs.ts.FindAllShardsInKeyspace(ctx, keyspace, nil) + if err != nil || len(shards) == 0 { + return false, err + } + + // First check the typical case, where the VGTID shards match the serving shards. + // In that case it's NOT possible that an applicable reshard has happened because + // the VGTID contains shards that are all serving. + reshardPossible := false + ksShardGTIDs := make([]*binlogdatapb.ShardGtid, 0, len(vs.vgtid.ShardGtids)) + for _, s := range vs.vgtid.ShardGtids { + if s.GetKeyspace() == keyspace { + ksShardGTIDs = append(ksShardGTIDs, s) + } + } + for _, s := range ksShardGTIDs { + shard := shards[s.GetShard()] + if shard == nil { + return false, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "shard provided in VGTID, %s, not found in the %s keyspace", s.GetShard(), keyspace) + } + if !shard.GetIsPrimaryServing() { + reshardPossible = true + break + } + } + if !reshardPossible { + return false, nil + } + + // Now that we know there MAY have been an applicable reshard, let's make a + // definitive determination by looking at the shard keyranges. + // All we care about are the shard info records now. + sis := maps.Values(shards) + for i := range sis { + for j := range sis { + if sis[i].ShardName() == sis[j].ShardName() && key.KeyRangeEqual(sis[i].GetKeyRange(), sis[j].GetKeyRange()) { + // It's the same shard so skip it. + continue + } + if key.KeyRangeIntersect(sis[i].GetKeyRange(), sis[j].GetKeyRange()) { + // We have different shards with overlapping keyranges so we know + // that a reshard has happened. + return true, nil + } + } + } + + return false, nil +} diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index 4c1e9ec6764..6eec06a1bac 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -25,12 +25,12 @@ import ( "testing" "time" - "google.golang.org/protobuf/proto" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" "vitess.io/vitess/go/stats" + "vitess.io/vitess/go/test/utils" "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/topo" @@ -41,8 +41,6 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" - - "vitess.io/vitess/go/test/utils" ) var mu sync.Mutex @@ -1279,6 +1277,288 @@ func TestVStreamIdleHeartbeat(t *testing.T) { } } +func TestKeyspaceHasBeenSharded(t *testing.T) { + ctx := utils.LeakCheckContext(t) + + cell := "zone1" + ks := "testks" + + type testcase struct { + name string + oldshards []string + newshards []string + vgtid *binlogdatapb.VGtid + trafficSwitched bool + want bool + wantErr string + } + testcases := []testcase{ + { + name: "2 to 4, split both, traffic not switched", + oldshards: []string{ + "-80", + "80-", + }, + newshards: []string{ + "-40", + "40-80", + "80-c0", + "c0-", + }, + vgtid: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: ks, + Shard: "-80", + }, + { + Keyspace: ks, + Shard: "80-", + }, + }, + }, + trafficSwitched: false, + want: false, + }, + { + name: "2 to 4, split both, traffic not switched", + oldshards: []string{ + "-80", + "80-", + }, + newshards: []string{ + "-40", + "40-80", + "80-c0", + "c0-", + }, + vgtid: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: ks, + Shard: "-80", + }, + { + Keyspace: ks, + Shard: "80-", + }, + }, + }, + trafficSwitched: false, + want: false, + }, + { + name: "2 to 8, split both, traffic switched", + oldshards: []string{ + "-80", + "80-", + }, + newshards: []string{ + "-20", + "20-40", + "40-60", + "60-80", + "80-a0", + "a0-c0", + "c0-e0", + "e0-", + }, + vgtid: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: ks, + Shard: "-80", + }, + { + Keyspace: ks, + Shard: "80-", + }, + }, + }, + trafficSwitched: true, + want: true, + }, + { + name: "2 to 4, split only first shard, traffic switched", + oldshards: []string{ + "-80", + "80-", + }, + newshards: []string{ + "-20", + "20-40", + "40-60", + "60-80", + // 80- is not being resharded. + }, + vgtid: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: ks, + Shard: "-80", + }, + { + Keyspace: ks, + Shard: "80-", + }, + }, + }, + trafficSwitched: true, + want: true, + }, + { + name: "4 to 2, merge both shards, traffic switched", + oldshards: []string{ + "-40", + "40-80", + "80-c0", + "c0-", + }, + newshards: []string{ + "-80", + "80-", + }, + vgtid: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: ks, + Shard: "-40", + }, + { + Keyspace: ks, + Shard: "40-80", + }, + { + Keyspace: ks, + Shard: "80-c0", + }, + { + Keyspace: ks, + Shard: "c0-", + }, + }, + }, + trafficSwitched: true, + want: true, + }, + { + name: "4 to 3, merge second half, traffic not switched", + oldshards: []string{ + "-40", + "40-80", + "80-c0", + "c0-", + }, + newshards: []string{ + // -40 and 40-80 are not being resharded. + "80-", // Merge of 80-c0 and c0- + }, + vgtid: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: ks, + Shard: "-40", + }, + { + Keyspace: ks, + Shard: "40-80", + }, + { + Keyspace: ks, + Shard: "80-c0", + }, + { + Keyspace: ks, + Shard: "c0-", + }, + }, + }, + trafficSwitched: false, + want: false, + }, + { + name: "4 to 3, merge second half, traffic switched", + oldshards: []string{ + "-40", + "40-80", + "80-c0", + "c0-", + }, + newshards: []string{ + // -40 and 40-80 are not being resharded. + "80-", // Merge of 80-c0 and c0- + }, + vgtid: &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{ + { + Keyspace: ks, + Shard: "-40", + }, + { + Keyspace: ks, + Shard: "40-80", + }, + { + Keyspace: ks, + Shard: "80-c0", + }, + { + Keyspace: ks, + Shard: "c0-", + }, + }, + }, + trafficSwitched: true, + want: true, + }, + } + + addTablet := func(t *testing.T, ctx context.Context, host string, port int32, cell, ks, shard string, ts *topo.Server, hc *discovery.FakeHealthCheck, serving bool) { + tabletconn := hc.AddTestTablet(cell, host, port, ks, shard, topodatapb.TabletType_PRIMARY, serving, 0, nil) + err := ts.CreateTablet(ctx, tabletconn.Tablet()) + require.NoError(t, err) + var alias *topodatapb.TabletAlias + if serving { + alias = tabletconn.Tablet().Alias + } + _, err = ts.UpdateShardFields(ctx, ks, shard, func(si *topo.ShardInfo) error { + si.PrimaryAlias = alias + si.IsPrimaryServing = serving + return nil + }) + require.NoError(t, err) + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + hc := discovery.NewFakeHealthCheck(nil) + _ = createSandbox(ks) + st := getSandboxTopo(ctx, cell, ks, append(tc.oldshards, tc.newshards...)) + vsm := newTestVStreamManager(ctx, hc, st, cell) + vs := vstream{ + vgtid: tc.vgtid, + tabletType: topodatapb.TabletType_PRIMARY, + optCells: cell, + vsm: vsm, + ts: st.topoServer, + } + for i, shard := range tc.oldshards { + addTablet(t, ctx, fmt.Sprintf("1.1.0.%d", i), int32(1000+i), cell, ks, shard, st.topoServer, hc, !tc.trafficSwitched) + } + for i, shard := range tc.newshards { + addTablet(t, ctx, fmt.Sprintf("1.1.1.%d", i), int32(2000+i), cell, ks, shard, st.topoServer, hc, tc.trafficSwitched) + } + got, err := vs.keyspaceHasBeenResharded(ctx, ks) + if tc.wantErr != "" { + require.EqualError(t, err, tc.wantErr) + } else { + require.NoError(t, err) + } + require.Equal(t, tc.want, got) + }) + } +} + func newTestVStreamManager(ctx context.Context, hc discovery.HealthCheck, serv srvtopo.Server, cell string) *vstreamManager { gw := NewTabletGateway(ctx, hc, serv, cell) srvResolver := srvtopo.NewResolver(serv, gw, cell) diff --git a/test/config.json b/test/config.json index 14c05cb8df6..7d429ce6978 100644 --- a/test/config.json +++ b/test/config.json @@ -1121,6 +1121,15 @@ "RetryMax": 1, "Tags": [] }, + "multi_vstreams_keyspace_reshard": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestMultiVStreamsKeyspaceReshard", "-timeout", "15m"], + "Command": [], + "Manual": false, + "Shard": "vstream", + "RetryMax": 1, + "Tags": [] + }, "vstream_failover": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "VStreamFailover"],