From 9dc41330ba660ee97f1fac701e2862d9e8c15f45 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 23 Aug 2024 17:09:28 -0400 Subject: [PATCH] Handle this entirely in vstream manager Signed-off-by: Matt Lord --- go/vt/vtgate/vstream_manager.go | 24 +++++++++++++++---- .../tabletserver/vstreamer/vstreamer.go | 13 ++++++---- .../tabletserver/vstreamer/vstreamer_test.go | 16 +++++-------- 3 files changed, 33 insertions(+), 20 deletions(-) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 946ed3790c1..75b939115eb 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -608,7 +608,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } sendevents := make([]*binlogdatapb.VEvent, 0, len(events)) - for _, event := range events { + for i, event := range events { switch event.Type { case binlogdatapb.VEventType_FIELD: // Update table names and send. @@ -658,22 +658,36 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha if err := vs.alignStreams(ctx, event, sgtid.Keyspace, sgtid.Shard); err != nil { return err } - case binlogdatapb.VEventType_JOURNAL: journal := event.Journal // Journal events are not sent to clients by default, but only when // StopOnReshard is set. if vs.stopOnReshard && journal.MigrationType == binlogdatapb.MigrationType_SHARDS { sendevents = append(sendevents, event) - // Include our own commit event to complete the BEGIN->JOURNAL-COMMIT - // sequence in the stream. - sendevents := append(sendevents, &binlogdatapb.VEvent{Type: binlogdatapb.VEventType_COMMIT}) + // Read any subsequent events until we get the VGTID->COMMIT events that + // always follow the JOURNAL event which is generated as a result of + // an autocommit insert into the _vt.resharding_journal table on the + // tablet. This batch of events we're currently processing may not + // contain these events. + for j := i + 1; j < len(events); j++ { + sendevents = append(sendevents, events[j]) + if events[j].Type == binlogdatapb.VEventType_COMMIT { + break + } + } eventss = append(eventss, sendevents) if err := vs.sendAll(ctx, sgtid, eventss); err != nil { return err } eventss = nil sendevents = nil + // We're going to be stopping the stream anyway, so we pause to give clients + // time to recv the journal event before the stream's context is cancelled + // (which causes the grpc SendMsg to fail). + // If the client doesn't (grpc) Recv the journal event before the stream + // ends then they'll have to resume from the last ShardGtid they received + // before the journal event. + time.Sleep(2 * time.Second) } je, err := vs.getJournalEvent(ctx, sgtid, journal) if err != nil { diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index 9f94d3d1fed..824f79e20f1 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -210,8 +210,7 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog // Only the following patterns are possible: // BEGIN->ROWs or Statements->GTID->COMMIT. In the case of large transactions, this can be broken into chunks. - // BEGIN->JOURNAL - // ->GTID->COMMIT. This is a special case where the journal is sent immediately as some consumers stop on reshard events. + // BEGIN->JOURNAL->GTID->COMMIT // GTID->DDL // GTID->OTHER // HEARTBEAT is issued if there's inactivity, which is likely @@ -227,12 +226,15 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog vevent.Shard = vs.vse.shard switch vevent.Type { - case binlogdatapb.VEventType_GTID, binlogdatapb.VEventType_BEGIN, binlogdatapb.VEventType_FIELD: + case binlogdatapb.VEventType_GTID, binlogdatapb.VEventType_BEGIN, binlogdatapb.VEventType_FIELD, + binlogdatapb.VEventType_JOURNAL: // We never have to send GTID, BEGIN, FIELD events on their own. + // A JOURNAL event is always preceded by a BEGIN and followed by a COMMIT. + // So, we don't have to send it right away. bufferedEvents = append(bufferedEvents, vevent) case binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_DDL, binlogdatapb.VEventType_OTHER, - binlogdatapb.VEventType_HEARTBEAT, binlogdatapb.VEventType_VERSION, binlogdatapb.VEventType_JOURNAL: - // COMMIT, DDL, JOURNAL, OTHER and HEARTBEAT must be immediately sent. + binlogdatapb.VEventType_HEARTBEAT, binlogdatapb.VEventType_VERSION: + // COMMIT, DDL, OTHER and HEARTBEAT must be immediately sent. // Although unlikely, it's possible to get a HEARTBEAT in the middle // of a transaction. If so, we still send the partial transaction along // with the heartbeat. @@ -627,6 +629,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e Type: binlogdatapb.VEventType_VERSION, } vevents = append(vevents, vevent) + } else { vevents, err = vs.processRowEvent(vevents, plan, rows) } diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go index d53c3a8af49..df565b8f18b 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go @@ -1731,16 +1731,12 @@ func TestJournal(t *testing.T) { "commit", }, // External table events don't get sent. - output: [][]string{ - { - `begin`, - `type:JOURNAL journal:{id:1 migration_type:SHARDS}`, - }, - { - `gtid`, - `commit`, - }, - }, + output: [][]string{{ + `begin`, + `type:JOURNAL journal:{id:1 migration_type:SHARDS}`, + `gtid`, + `commit`, + }}, }} runCases(t, nil, testcases, "", nil) }