diff --git a/examples/local/vstream_client.go b/examples/local/vstream_client.go index 98d2129f898..e1d672eac5e 100644 --- a/examples/local/vstream_client.go +++ b/examples/local/vstream_client.go @@ -23,13 +23,13 @@ import ( "log" "time" - vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" - - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" _ "vitess.io/vitess/go/vt/vtctl/grpcvtctlclient" _ "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn" "vitess.io/vitess/go/vt/vtgate/vtgateconn" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" ) /* @@ -73,15 +73,18 @@ func main() { } defer conn.Close() flags := &vtgatepb.VStreamFlags{ - //MinimizeSkew: false, - //HeartbeatInterval: 60, //seconds + // MinimizeSkew: false, + // HeartbeatInterval: 60, //seconds + // StopOnReshard: true, } reader, err := conn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags) + if err != nil { + log.Fatal(err) + } for { e, err := reader.Recv() switch err { case nil: - _ = e fmt.Printf("%v\n", e) case io.EOF: fmt.Printf("stream ended\n") diff --git a/go/test/endtoend/vreplication/vstream_test.go b/go/test/endtoend/vreplication/vstream_test.go index e13c3e24e80..62bb645f78e 100644 --- a/go/test/endtoend/vreplication/vstream_test.go +++ b/go/test/endtoend/vreplication/vstream_test.go @@ -28,13 +28,13 @@ import ( "github.com/stretchr/testify/require" "vitess.io/vitess/go/vt/log" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" - vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" _ "vitess.io/vitess/go/vt/vtctl/grpcvtctlclient" _ "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn" - "vitess.io/vitess/go/vt/vtgate/vtgateconn" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" ) // Validates that we have a working VStream API @@ -603,8 +603,7 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) { // Stream events but stop once we have a VGTID with positions for the old/original shards. var newVGTID *binlogdatapb.VGtid func() { - var reader vtgateconn.VStreamReader - reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags) + reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags) require.NoError(t, err) for { evs, err := reader.Recv() @@ -658,8 +657,7 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) { // Now start a new VStream from our previous VGTID which only has the old/original shards. func() { - var reader vtgateconn.VStreamReader - reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, newVGTID, filter, flags) + reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, newVGTID, filter, flags) require.NoError(t, err) for { evs, err := reader.Recv() @@ -694,8 +692,8 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) { }() // We should have a mix of events across the old and new shards. - require.NotZero(t, oldShardRowEvents) - require.NotZero(t, newShardRowEvents) + require.Greater(t, oldShardRowEvents, 0) + require.Greater(t, newShardRowEvents, 0) // The number of row events streamed by the VStream API should match the number of rows inserted. customerResult := execVtgateQuery(t, vtgateConn, ks, "select count(*) from customer") @@ -704,6 +702,206 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) { require.Equal(t, customerCount, int64(oldShardRowEvents+newShardRowEvents)) } +// TestMultiVStreamsKeyspaceStopOnReshard confirms that journal events are received +// when resuming a VStream after a reshard. +func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) { + ctx := context.Background() + ks := "testks" + wf := "multiVStreamsKeyspaceReshard" + baseTabletID := 100 + tabletType := topodatapb.TabletType_PRIMARY.String() + oldShards := "-80,80-" + newShards := "-40,40-80,80-c0,c0-" + oldShardRowEvents, journalEvents := 0, 0 + vc = NewVitessCluster(t, nil) + defer vc.TearDown() + defaultCell := vc.Cells[vc.CellNames[0]] + ogdr := defaultReplicas + defaultReplicas = 0 // Because of CI resource constraints we can only run this test with primary tablets + defer func(dr int) { defaultReplicas = dr }(ogdr) + + // For our sequences etc. + _, err := vc.AddKeyspace(t, []*Cell{defaultCell}, "global", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID, nil) + require.NoError(t, err) + + // Setup the keyspace with our old/original shards. + keyspace, err := vc.AddKeyspace(t, []*Cell{defaultCell}, ks, oldShards, vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+1000, nil) + require.NoError(t, err) + + // Add the new shards. + err = vc.AddShards(t, []*Cell{defaultCell}, keyspace, newShards, defaultReplicas, defaultRdonly, baseTabletID+2000, targetKsOpts) + require.NoError(t, err) + + vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) + defer vtgateConn.Close() + verifyClusterHealth(t, vc) + + vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort)) + require.NoError(t, err) + defer vstreamConn.Close() + + // Ensure that we're starting with a clean slate. + _, err = vtgateConn.ExecuteFetch(fmt.Sprintf("delete from %s.customer", ks), 1000, false) + require.NoError(t, err) + + // Coordinate go-routines. + streamCtx, streamCancel := context.WithTimeout(ctx, 1*time.Minute) + defer streamCancel() + done := make(chan struct{}) + + // First goroutine that keeps inserting rows into the table being streamed until the + // stream context is cancelled. + go func() { + id := 1 + for { + select { + case <-streamCtx.Done(): + // Give the VStream a little catch-up time before telling it to stop + // via the done channel. + time.Sleep(10 * time.Second) + close(done) + return + default: + insertRow(ks, "customer", id) + time.Sleep(250 * time.Millisecond) + id++ + } + } + }() + + // Create the Reshard workflow and wait for it to finish the copy phase. + reshardAction(t, "Create", wf, ks, oldShards, newShards, defaultCellName, tabletType) + waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", ks, wf), binlogdatapb.VReplicationWorkflowState_Running.String()) + + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + // Only stream the keyspace that we're resharding. Otherwise the client stream + // will continue to run with only the tablet stream from the global keyspace. + Keyspace: ks, + }}} + + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + // Stream all tables. + Match: "/.*", + }}, + } + flags := &vtgatepb.VStreamFlags{ + StopOnReshard: true, + } + + // Stream events but stop once we have a VGTID with positions for the old/original shards. + var newVGTID *binlogdatapb.VGtid + func() { + reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags) + require.NoError(t, err) + for { + evs, err := reader.Recv() + + switch err { + case nil: + for _, ev := range evs { + switch ev.Type { + case binlogdatapb.VEventType_ROW: + shard := ev.GetRowEvent().GetShard() + switch shard { + case "-80", "80-": + oldShardRowEvents++ + default: + require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard)) + } + case binlogdatapb.VEventType_VGTID: + newVGTID = ev.GetVgtid() + // We want a VGTID with a ShardGtid for both of the old shards. + if len(newVGTID.GetShardGtids()) == 2 { + canStop := true + for _, sg := range newVGTID.GetShardGtids() { + if sg.GetGtid() == "" { + canStop = false + } + } + if canStop { + return + } + } + } + } + default: + require.FailNow(t, fmt.Sprintf("VStream returned unexpected error: %v", err)) + } + select { + case <-streamCtx.Done(): + return + default: + } + } + }() + + // Confirm that we have shard GTIDs for the old/original shards. + require.Len(t, newVGTID.GetShardGtids(), 2) + t.Logf("Position at end of first stream: %+v", newVGTID.GetShardGtids()) + + // Switch the traffic to the new shards. + reshardAction(t, "SwitchTraffic", wf, ks, oldShards, newShards, defaultCellName, tabletType) + + // Now start a new VStream from our previous VGTID which only has the old/original shards. + expectedJournalEvents := 2 // One for each old shard: -80,80- + var streamStopped bool // We expect the stream to end with io.EOF from the reshard + runResumeStream := func() { + journalEvents = 0 + streamStopped = false + t.Logf("Streaming from position: %+v", newVGTID.GetShardGtids()) + reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, newVGTID, filter, flags) + require.NoError(t, err) + for { + evs, err := reader.Recv() + + switch err { + case nil: + for i, ev := range evs { + switch ev.Type { + case binlogdatapb.VEventType_ROW: + shard := ev.RowEvent.Shard + switch shard { + case "-80", "80-": + default: + require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard)) + } + case binlogdatapb.VEventType_JOURNAL: + t.Logf("Journal event: %+v", ev) + journalEvents++ + require.Equal(t, binlogdatapb.VEventType_BEGIN, evs[i-1].Type, "JOURNAL event not preceded by BEGIN event") + require.Equal(t, binlogdatapb.VEventType_VGTID, evs[i+1].Type, "JOURNAL event not followed by VGTID event") + require.Equal(t, binlogdatapb.VEventType_COMMIT, evs[i+2].Type, "JOURNAL event not followed by COMMIT event") + } + } + case io.EOF: + streamStopped = true + return + default: + require.FailNow(t, fmt.Sprintf("VStream returned unexpected error: %v", err)) + } + select { + case <-done: + return + default: + } + } + } + + // Multiple VStream clients should be able to resume from where they left off and + // get the reshard journal event. + for i := 1; i <= expectedJournalEvents; i++ { + runResumeStream() + // We should have seen the journal event for each shard in the stream due to + // using StopOnReshard. + require.Equal(t, expectedJournalEvents, journalEvents, + "did not get expected journal events on resume vstream #%d", i) + // Confirm that the stream stopped on the reshard. + require.True(t, streamStopped, "the vstream did not stop with io.EOF as expected") + } +} + func TestVStreamFailover(t *testing.T) { testVStreamWithFailover(t, true) } diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 34e1e4e4329..3ac6861dce5 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -690,7 +690,6 @@ func (ts *trafficSwitcher) createJournals(ctx context.Context, sourceWorkflows [ }) } - ts.Logger().Infof("Creating journal %v", journal) ts.Logger().Infof("Creating journal: %v", journal) statement := fmt.Sprintf("insert into _vt.resharding_journal "+ "(id, db_name, val) "+ diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index e0d195853cf..935a437c869 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -61,6 +61,10 @@ const maxSkewTimeoutSeconds = 10 * 60 // for a vstream const tabletPickerContextTimeout = 90 * time.Second +// stopOnReshardDelay is how long we wait, at a minimum, after sending a reshard journal event before +// ending the stream from the tablet. +const stopOnReshardDelay = 500 * time.Millisecond + // vstream contains the metadata for one VStream request. type vstream struct { // mu protects parts of vgtid, the semantics of a send, and journaler. @@ -608,7 +612,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } sendevents := make([]*binlogdatapb.VEvent, 0, len(events)) - for _, event := range events { + for i, event := range events { switch event.Type { case binlogdatapb.VEventType_FIELD: // Update table names and send. @@ -658,12 +662,22 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha if err := vs.alignStreams(ctx, event, sgtid.Keyspace, sgtid.Shard); err != nil { return err } - case binlogdatapb.VEventType_JOURNAL: journal := event.Journal - // Journal events are not sent to clients by default, but only when StopOnReshard is set + // Journal events are not sent to clients by default, but only when + // StopOnReshard is set. if vs.stopOnReshard && journal.MigrationType == binlogdatapb.MigrationType_SHARDS { sendevents = append(sendevents, event) + // Read any subsequent events until we get the VGTID->COMMIT events that + // always follow the JOURNAL event which is generated as a result of + // an autocommit insert into the _vt.resharding_journal table on the + // tablet. + for j := i + 1; j < len(events); j++ { + sendevents = append(sendevents, events[j]) + if events[j].Type == binlogdatapb.VEventType_COMMIT { + break + } + } eventss = append(eventss, sendevents) if err := vs.sendAll(ctx, sgtid, eventss); err != nil { return err @@ -676,12 +690,22 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha return err } if je != nil { - // Wait till all other participants converge and return EOF. + // We're going to be ending the tablet stream, so we ensure a reasonable + // minimum amount of time is alloted for clients to Recv the journal event + // before the stream's context is cancelled (which would cause the grpc + // SendMsg or RecvMsg to fail). If the client doesn't Recv the journal + // event before the stream ends then they'll have to resume from the last + // ShardGtid they received before the journal event. + endTimer := time.NewTimer(stopOnReshardDelay) + defer endTimer.Stop() + // Wait until all other participants converge and then return EOF after + // the minimum delay has passed. journalDone = je.done select { case <-ctx.Done(): return ctx.Err() case <-journalDone: + <-endTimer.C return io.EOF } } @@ -954,6 +978,9 @@ func (vs *vstream) keyspaceHasBeenResharded(ctx context.Context, keyspace string return false, err } + vs.mu.Lock() + defer vs.mu.Unlock() + // First check the typical case, where the VGTID shards match the serving shards. // In that case it's NOT possible that an applicable reshard has happened because // the VGTID contains shards that are all serving.