Skip to content

Commit

Permalink
Add VStreamFlag to include journal events in the stream
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Sep 10, 2024
1 parent a513601 commit 0e38eb8
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 76 deletions.
1 change: 1 addition & 0 deletions examples/local/vstream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func main() {
// MinimizeSkew: false,
// HeartbeatInterval: 60, //seconds
// StopOnReshard: true,
// IncludeReshardJournalEvents: true,
}
reader, err := conn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
if err != nil {
Expand Down
127 changes: 70 additions & 57 deletions go/vt/proto/vtgate/vtgate.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 34 additions & 0 deletions go/vt/proto/vtgate/vtgate_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 24 additions & 19 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ type vstream struct {
// default behavior is to automatically migrate the resharded streams from the old to the new shards
stopOnReshard bool

// This flag is set by the client, default is false.
// If true then the reshard journal events are sent in the stream irrespective of the stopOnReshard flag.
includeReshardJournalEvents bool

// mutex used to synchronize access to skew detection parameters
skewMu sync.Mutex
// channel is created whenever there is a skew detected. closing it implies the current skew has been fixed
Expand Down Expand Up @@ -169,22 +173,23 @@ func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.Ta
return fmt.Errorf("unable to get topo server")
}
vs := &vstream{
vgtid: vgtid,
tabletType: tabletType,
optCells: flags.Cells,
filter: filter,
send: send,
resolver: vsm.resolver,
journaler: make(map[int64]*journalEvent),
minimizeSkew: flags.GetMinimizeSkew(),
stopOnReshard: flags.GetStopOnReshard(),
skewTimeoutSeconds: maxSkewTimeoutSeconds,
timestamps: make(map[string]int64),
vsm: vsm,
eventCh: make(chan []*binlogdatapb.VEvent),
heartbeatInterval: flags.GetHeartbeatInterval(),
ts: ts,
copyCompletedShard: make(map[string]struct{}),
vgtid: vgtid,
tabletType: tabletType,
optCells: flags.Cells,
filter: filter,
send: send,
resolver: vsm.resolver,
journaler: make(map[int64]*journalEvent),
minimizeSkew: flags.GetMinimizeSkew(),
stopOnReshard: flags.GetStopOnReshard(),
includeReshardJournalEvents: flags.GetIncludeReshardJournalEvents(),
skewTimeoutSeconds: maxSkewTimeoutSeconds,
timestamps: make(map[string]int64),
vsm: vsm,
eventCh: make(chan []*binlogdatapb.VEvent),
heartbeatInterval: flags.GetHeartbeatInterval(),
ts: ts,
copyCompletedShard: make(map[string]struct{}),
tabletPickerOptions: discovery.TabletPickerOptions{
CellPreference: flags.GetCellPreference(),
TabletOrder: flags.GetTabletOrder(),
Expand Down Expand Up @@ -677,8 +682,9 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
case binlogdatapb.VEventType_JOURNAL:
journal := event.Journal
// Journal events are not sent to clients by default, but only when
// StopOnReshard is set.
if vs.stopOnReshard && journal.MigrationType == binlogdatapb.MigrationType_SHARDS {
// IncludeReshardJournalEvents or StopOnReshard is set.
if (vs.includeReshardJournalEvents || vs.stopOnReshard) &&
journal.MigrationType == binlogdatapb.MigrationType_SHARDS {
sendevents = append(sendevents, event)
// Read any subsequent events until we get the VGTID->COMMIT events that
// always follow the JOURNAL event which is generated as a result of
Expand Down Expand Up @@ -726,7 +732,6 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
}
lag := event.CurrentTime/1e9 - event.Timestamp
vs.vsm.vstreamsLag.Set(labels, lag)

}
if len(sendevents) != 0 {
eventss = append(eventss, sendevents)
Expand Down
2 changes: 2 additions & 0 deletions proto/vtgate.proto
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ message VStreamFlags {
string tablet_order = 6;
// When set, all new row events from the `heartbeat` table, for all shards, in the sidecardb will be streamed.
bool stream_keyspace_heartbeats = 7;
// Include reshard journal events in the stream.
bool include_reshard_journal_events = 8;
}

// VStreamRequest is the payload for VStream.
Expand Down

0 comments on commit 0e38eb8

Please sign in to comment.