Skip to content

Commit

Permalink
Improve e2e test
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Aug 24, 2024
1 parent a43ba6a commit 9159b4f
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 14 deletions.
28 changes: 15 additions & 13 deletions go/test/endtoend/vreplication/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,13 +775,15 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) {

vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: "/.*", // Match all keyspaces just to be more realistic.
// Only stream the keyspace that we're resharding. Otherwise the client stream
// will continue to run with only the tablet stream from the global keyspace.
Keyspace: ks,
}}}

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
// Only stream the customer table and its sequence backing table.
Match: "/customer.*",
// Stream all tables.
Match: "/.*",
}},
}
flags := &vtgatepb.VStreamFlags{
Expand All @@ -805,15 +807,13 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) {
switch shard {
case "-80", "80-":
oldShardRowEvents++
case "0":
// We expect some for the sequence backing table, but don't care.
default:
require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard))
}
case binlogdatapb.VEventType_VGTID:
newVGTID = ev.GetVgtid()
if len(newVGTID.GetShardGtids()) == 3 {
// We want a VGTID with a position for the global shard and the old shards.
// We want a VGTID with a ShardGtid for both of the old shards.
if len(newVGTID.GetShardGtids()) == 2 {
canStop := true
for _, sg := range newVGTID.GetShardGtids() {
if sg.GetGtid() == "" {
Expand All @@ -837,17 +837,19 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) {
}
}()

// Confirm that we have shard GTIDs for the global shard and the old/original shards.
require.Len(t, newVGTID.GetShardGtids(), 3)
// Confirm that we have shard GTIDs for the old/original shards.
require.Len(t, newVGTID.GetShardGtids(), 2)
t.Logf("Position at end of first stream: %+v", newVGTID.GetShardGtids())

// Switch the traffic to the new shards.
reshardAction(t, "SwitchTraffic", wf, ks, oldShards, newShards, defaultCellName, tabletType)

// Now start a new VStream from our previous VGTID which only has the old/original shards.
expectedJournalEvents := 2 // One for each old shard: -80,80-
var streamStopped bool // We expect the stream to end with io.EOF from the reshard
runResumeStream := func() {
journalEvents = 0
streamStopped = false
t.Logf("Streaming from position: %+v", newVGTID.GetShardGtids())
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, newVGTID, filter, flags)
require.NoError(t, err)
Expand All @@ -861,7 +863,7 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) {
case binlogdatapb.VEventType_ROW:
shard := ev.RowEvent.Shard
switch shard {
case "0", "-80", "80-":
case "-80", "80-":
default:
require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard))
}
Expand All @@ -871,12 +873,10 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) {
require.Equal(t, binlogdatapb.VEventType_BEGIN, evs[i-1].Type, "JOURNAL event not preceded by BEGIN event")
require.Equal(t, binlogdatapb.VEventType_VGTID, evs[i+1].Type, "JOURNAL event not followed by VGTID event")
require.Equal(t, binlogdatapb.VEventType_COMMIT, evs[i+2].Type, "JOURNAL event not followed by COMMIT event")
if journalEvents == expectedJournalEvents {
return
}
}
}
case io.EOF:
streamStopped = true
return
default:
require.FailNow(t, fmt.Sprintf("VStream returned unexpected error: %v", err))
Expand All @@ -897,6 +897,8 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) {
// using StopOnReshard.
require.Equal(t, expectedJournalEvents, journalEvents,
"did not get expected journal events on resume vstream #%d", i)
// Confirm that the stream stopped on the reshard.
require.True(t, streamStopped, "the vstream did not stop with io.EOF as expected")
}
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
// We're going to be ending the tablet stream, so we ensure a reasonable
// minimum amount of time is alloted for clients to Recv the journal event
// before the stream's context is cancelled (which would cause the grpc
// Send or Recv to fail). If the client doesn't (grpc) Recv the journal
// SendMsg or RecvMsg to fail). If the client doesn't Recv the journal
// event before the stream ends then they'll have to resume from the last
// ShardGtid they received before the journal event.
endTimer := time.NewTimer(stopOnReshardDelay)
Expand Down

0 comments on commit 9159b4f

Please sign in to comment.