Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VStreamer: For larger compressed transaction payloads, stream the internal contents #17239

7 changes: 7 additions & 0 deletions go/mysql/binlog_event_compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ type TransactionPayload struct {
payload []byte
reader io.Reader
iterator func() (BinlogEvent, error)
// StreamingContents tells the consumer that we are streaming the
// decompressed payload and they should also stream the events.
// This ensures that neither the producer nor the consumer are
// holding the entire payload's contents in memory.
StreamingContents bool
}

// IsTransactionPayload returns true if a compressed transaction
Expand Down Expand Up @@ -292,6 +297,8 @@ func (tp *TransactionPayload) decompress() error {
}
compressedTrxPayloadsUsingStream.Add(1)
tp.reader = streamDecoder
// Signal the consumer to also stream the contents.
tp.StreamingContents = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just always stream? (Meaning we would remove the conditional flag?)

I guess there's also a backwards-compatibility issue here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about that too. It's more efficient to do it in a batch though so for smaller payloads we do that (one batch) at both layers, binlog and vstreamer.

return nil
}

Expand Down
2 changes: 2 additions & 0 deletions go/mysql/binlog_event_mysql56_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,11 @@ func TestMysql56DecodeTransactionPayload(t *testing.T) {
}
}
if tc.inMemory {
require.False(t, tp.StreamingContents)
require.Equal(t, memDecodingCnt+1, compressedTrxPayloadsInMem.Get())
require.Equal(t, tc.want, eventStrs)
} else {
require.True(t, tp.StreamingContents)
require.Equal(t, streamDecodingCnt+1, compressedTrxPayloadsUsingStream.Get())
require.Len(t, eventStrs, len(tc.want))
totalSize := 0
Expand Down
32 changes: 28 additions & 4 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
}
return fmt.Errorf("unexpected server EOF")
}
vevents, err := vs.parseEvent(ev)
vevents, err := vs.parseEvent(ev, bufferAndTransmit)
if err != nil {
vs.vse.errorCounts.Add("ParseEvent", 1)
return err
Expand Down Expand Up @@ -416,7 +416,11 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
}

// parseEvent parses an event from the binlog and converts it to a list of VEvents.
func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, error) {
// The bufferAndTransmit function must be passed if the event is a TransactionPayloadEvent
// as for larger payloads (> ZstdInMemoryDecompressorMaxSize) the internal events need
// to be streamed directly here in order to avoid holding the entire payload's contents,
// which can be 10s or even 100s of GiBs, all in memory.
func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent, bufferAndTransmit func(vevent *binlogdatapb.VEvent) error) ([]*binlogdatapb.VEvent, error) {
if !ev.IsValid() {
return nil, fmt.Errorf("can't parse binlog event: invalid data: %#v", ev)
}
Expand Down Expand Up @@ -672,11 +676,31 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e
}
return nil, err
}
tpvevents, err := vs.parseEvent(tpevent)
tpvevents, err := vs.parseEvent(tpevent, nil) // Parse the internal event
if err != nil {
return nil, vterrors.Wrap(err, "failed to parse transaction payload's internal event")
}
vevents = append(vevents, tpvevents...)
if tp.StreamingContents {
// Transmit each internal event individually to avoid buffering
// the large transaction's entire payload of events in memory, as
// the uncompressed size can be 10s or even 100s of GiBs in size.
if bufferAndTransmit == nil {
return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "[bug] cannot stream compressed transaction payload's internal events as no bufferAndTransmit function was provided")
}
for _, tpvevent := range tpvevents {
tpvevent.Timestamp = int64(ev.Timestamp())
tpvevent.CurrentTime = time.Now().UnixNano()
if err := bufferAndTransmit(tpvevent); err != nil {
if err == io.EOF {
return nil, nil
}
vs.vse.errorCounts.Add("TransactionPayloadBufferAndTransmit", 1)
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "error sending compressed transaction payload's internal event: %v", err)
}
}
} else { // Process the payload's internal events all at once
vevents = append(vevents, tpvevents...)
}
}
vs.vse.vstreamerCompressedTransactionsDecoded.Add(1)
}
Expand Down
Loading