Skip to content

Commit

Permalink
VReplication: Add VTGate VStreamFlag to include journal events in the…
Browse files Browse the repository at this point in the history
… stream (#16737)

Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord authored Sep 17, 2024
1 parent 069651a commit 53a6ea4
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 87 deletions.
1 change: 1 addition & 0 deletions examples/local/vstream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func main() {
// MinimizeSkew: false,
// HeartbeatInterval: 60, //seconds
// StopOnReshard: true,
// IncludeReshardJournalEvents: true,
}
reader, err := conn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
if err != nil {
Expand Down
10 changes: 9 additions & 1 deletion go/test/endtoend/vreplication/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,10 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) {
Match: "/customer.*",
}},
}
flags := &vtgatepb.VStreamFlags{}
flags := &vtgatepb.VStreamFlags{
IncludeReshardJournalEvents: true,
}
journalEvents := 0

// Stream events but stop once we have a VGTID with positions for the old/original shards.
var newVGTID *binlogdatapb.VGtid
Expand Down Expand Up @@ -678,6 +681,9 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) {
default:
require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard))
}
case binlogdatapb.VEventType_JOURNAL:
require.True(t, ev.Journal.MigrationType == binlogdatapb.MigrationType_SHARDS)
journalEvents++
}
}
default:
Expand All @@ -694,6 +700,8 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) {
// We should have a mix of events across the old and new shards.
require.Greater(t, oldShardRowEvents, 0)
require.Greater(t, newShardRowEvents, 0)
// We should have seen a reshard journal event.
require.Greater(t, journalEvents, 0)

// The number of row events streamed by the VStream API should match the number of rows inserted.
customerResult := execVtgateQuery(t, vtgateConn, ks, "select count(*) from customer")
Expand Down
127 changes: 70 additions & 57 deletions go/vt/proto/vtgate/vtgate.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 34 additions & 0 deletions go/vt/proto/vtgate/vtgate_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 53a6ea4

Please sign in to comment.