diff --git a/go/test/endtoend/vreplication/vstream_test.go b/go/test/endtoend/vreplication/vstream_test.go index 95fdb383a46..62bb645f78e 100644 --- a/go/test/endtoend/vreplication/vstream_test.go +++ b/go/test/endtoend/vreplication/vstream_test.go @@ -775,13 +775,15 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) { vgtid := &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: "/.*", // Match all keyspaces just to be more realistic. + // Only stream the keyspace that we're resharding. Otherwise the client stream + // will continue to run with only the tablet stream from the global keyspace. + Keyspace: ks, }}} filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ - // Only stream the customer table and its sequence backing table. - Match: "/customer.*", + // Stream all tables. + Match: "/.*", }}, } flags := &vtgatepb.VStreamFlags{ @@ -805,15 +807,13 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) { switch shard { case "-80", "80-": oldShardRowEvents++ - case "0": - // We expect some for the sequence backing table, but don't care. default: require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard)) } case binlogdatapb.VEventType_VGTID: newVGTID = ev.GetVgtid() - if len(newVGTID.GetShardGtids()) == 3 { - // We want a VGTID with a position for the global shard and the old shards. + // We want a VGTID with a ShardGtid for both of the old shards. + if len(newVGTID.GetShardGtids()) == 2 { canStop := true for _, sg := range newVGTID.GetShardGtids() { if sg.GetGtid() == "" { @@ -837,8 +837,8 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) { } }() - // Confirm that we have shard GTIDs for the global shard and the old/original shards. - require.Len(t, newVGTID.GetShardGtids(), 3) + // Confirm that we have shard GTIDs for the old/original shards. + require.Len(t, newVGTID.GetShardGtids(), 2) t.Logf("Position at end of first stream: %+v", newVGTID.GetShardGtids()) // Switch the traffic to the new shards. @@ -846,8 +846,10 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) { // Now start a new VStream from our previous VGTID which only has the old/original shards. expectedJournalEvents := 2 // One for each old shard: -80,80- + var streamStopped bool // We expect the stream to end with io.EOF from the reshard runResumeStream := func() { journalEvents = 0 + streamStopped = false t.Logf("Streaming from position: %+v", newVGTID.GetShardGtids()) reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, newVGTID, filter, flags) require.NoError(t, err) @@ -861,7 +863,7 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) { case binlogdatapb.VEventType_ROW: shard := ev.RowEvent.Shard switch shard { - case "0", "-80", "80-": + case "-80", "80-": default: require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard)) } @@ -871,12 +873,10 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) { require.Equal(t, binlogdatapb.VEventType_BEGIN, evs[i-1].Type, "JOURNAL event not preceded by BEGIN event") require.Equal(t, binlogdatapb.VEventType_VGTID, evs[i+1].Type, "JOURNAL event not followed by VGTID event") require.Equal(t, binlogdatapb.VEventType_COMMIT, evs[i+2].Type, "JOURNAL event not followed by COMMIT event") - if journalEvents == expectedJournalEvents { - return - } } } case io.EOF: + streamStopped = true return default: require.FailNow(t, fmt.Sprintf("VStream returned unexpected error: %v", err)) @@ -897,6 +897,8 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) { // using StopOnReshard. require.Equal(t, expectedJournalEvents, journalEvents, "did not get expected journal events on resume vstream #%d", i) + // Confirm that the stream stopped on the reshard. + require.True(t, streamStopped, "the vstream did not stop with io.EOF as expected") } } diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index f56e1f963f8..8913e98b1c4 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -693,7 +693,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha // We're going to be ending the tablet stream, so we ensure a reasonable // minimum amount of time is alloted for clients to Recv the journal event // before the stream's context is cancelled (which would cause the grpc - // Send or Recv to fail). If the client doesn't (grpc) Recv the journal + // SendMsg or RecvMsg to fail). If the client doesn't Recv the journal // event before the stream ends then they'll have to resume from the last // ShardGtid they received before the journal event. endTimer := time.NewTimer(stopOnReshardDelay)