Skip to content

Commit

Permalink
Handle this entirely in vstream manager
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Aug 23, 2024
1 parent 0cd4e18 commit 9dc4133
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 20 deletions.
24 changes: 19 additions & 5 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
}

sendevents := make([]*binlogdatapb.VEvent, 0, len(events))
for _, event := range events {
for i, event := range events {
switch event.Type {
case binlogdatapb.VEventType_FIELD:
// Update table names and send.
Expand Down Expand Up @@ -658,22 +658,36 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
if err := vs.alignStreams(ctx, event, sgtid.Keyspace, sgtid.Shard); err != nil {
return err
}

case binlogdatapb.VEventType_JOURNAL:
journal := event.Journal
// Journal events are not sent to clients by default, but only when
// StopOnReshard is set.
if vs.stopOnReshard && journal.MigrationType == binlogdatapb.MigrationType_SHARDS {
sendevents = append(sendevents, event)
// Include our own commit event to complete the BEGIN->JOURNAL-COMMIT
// sequence in the stream.
sendevents := append(sendevents, &binlogdatapb.VEvent{Type: binlogdatapb.VEventType_COMMIT})
// Read any subsequent events until we get the VGTID->COMMIT events that
// always follow the JOURNAL event which is generated as a result of
// an autocommit insert into the _vt.resharding_journal table on the
// tablet. This batch of events we're currently processing may not
// contain these events.
for j := i + 1; j < len(events); j++ {
sendevents = append(sendevents, events[j])
if events[j].Type == binlogdatapb.VEventType_COMMIT {
break
}
}
eventss = append(eventss, sendevents)
if err := vs.sendAll(ctx, sgtid, eventss); err != nil {
return err
}
eventss = nil
sendevents = nil
// We're going to be stopping the stream anyway, so we pause to give clients
// time to recv the journal event before the stream's context is cancelled
// (which causes the grpc SendMsg to fail).
// If the client doesn't (grpc) Recv the journal event before the stream
// ends then they'll have to resume from the last ShardGtid they received
// before the journal event.
time.Sleep(2 * time.Second)
}
je, err := vs.getJournalEvent(ctx, sgtid, journal)
if err != nil {
Expand Down
13 changes: 8 additions & 5 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,7 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog

// Only the following patterns are possible:
// BEGIN->ROWs or Statements->GTID->COMMIT. In the case of large transactions, this can be broken into chunks.
// BEGIN->JOURNAL
// ->GTID->COMMIT. This is a special case where the journal is sent immediately as some consumers stop on reshard events.
// BEGIN->JOURNAL->GTID->COMMIT
// GTID->DDL
// GTID->OTHER
// HEARTBEAT is issued if there's inactivity, which is likely
Expand All @@ -227,12 +226,15 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
vevent.Shard = vs.vse.shard

switch vevent.Type {
case binlogdatapb.VEventType_GTID, binlogdatapb.VEventType_BEGIN, binlogdatapb.VEventType_FIELD:
case binlogdatapb.VEventType_GTID, binlogdatapb.VEventType_BEGIN, binlogdatapb.VEventType_FIELD,
binlogdatapb.VEventType_JOURNAL:
// We never have to send GTID, BEGIN, FIELD events on their own.
// A JOURNAL event is always preceded by a BEGIN and followed by a COMMIT.
// So, we don't have to send it right away.
bufferedEvents = append(bufferedEvents, vevent)
case binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_DDL, binlogdatapb.VEventType_OTHER,
binlogdatapb.VEventType_HEARTBEAT, binlogdatapb.VEventType_VERSION, binlogdatapb.VEventType_JOURNAL:
// COMMIT, DDL, JOURNAL, OTHER and HEARTBEAT must be immediately sent.
binlogdatapb.VEventType_HEARTBEAT, binlogdatapb.VEventType_VERSION:
// COMMIT, DDL, OTHER and HEARTBEAT must be immediately sent.
// Although unlikely, it's possible to get a HEARTBEAT in the middle
// of a transaction. If so, we still send the partial transaction along
// with the heartbeat.
Expand Down Expand Up @@ -627,6 +629,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e
Type: binlogdatapb.VEventType_VERSION,
}
vevents = append(vevents, vevent)

} else {
vevents, err = vs.processRowEvent(vevents, plan, rows)
}
Expand Down
16 changes: 6 additions & 10 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1731,16 +1731,12 @@ func TestJournal(t *testing.T) {
"commit",
},
// External table events don't get sent.
output: [][]string{
{
`begin`,
`type:JOURNAL journal:{id:1 migration_type:SHARDS}`,
},
{
`gtid`,
`commit`,
},
},
output: [][]string{{
`begin`,
`type:JOURNAL journal:{id:1 migration_type:SHARDS}`,
`gtid`,
`commit`,
}},
}}
runCases(t, nil, testcases, "", nil)
}
Expand Down

0 comments on commit 9dc4133

Please sign in to comment.