From 92cf5933a258142f3818e43d729da571830f6b90 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 15 Nov 2024 12:23:12 -0500 Subject: [PATCH 1/6] For larger payloads, stream the internal contents For larger payloads (> ZstdInMemoryDecompressorMaxSize) we were already streaming the internal events as we decompressed the payload, but in the vstreamer we were still reading the entire contents into memory before sending them to the consumer (vplayer). With this we stream the internal contents all the way from the binlog consumer to the vstream consumer so that we do not need to hold the entire contents, which can be 10s of GiBs, in memory all at once. Signed-off-by: Matt Lord --- go/mysql/binlog_event_compression.go | 7 ++++ .../tabletserver/vstreamer/vstreamer.go | 33 ++++++++++++++++--- 2 files changed, 36 insertions(+), 4 deletions(-) diff --git a/go/mysql/binlog_event_compression.go b/go/mysql/binlog_event_compression.go index 1cb38d5cb16..0eb96da843b 100644 --- a/go/mysql/binlog_event_compression.go +++ b/go/mysql/binlog_event_compression.go @@ -98,6 +98,11 @@ type TransactionPayload struct { payload []byte reader io.Reader iterator func() (BinlogEvent, error) + // StreamingContents tells the consumer that we are streaming the + // decompressed payload and they should also stream the events. + // This ensures that neither the producer nor the consumer are + // holding the entire payload's contents in memory. + StreamingContents bool } // IsTransactionPayload returns true if a compressed transaction @@ -292,6 +297,8 @@ func (tp *TransactionPayload) decompress() error { } compressedTrxPayloadsUsingStream.Add(1) tp.reader = streamDecoder + // Signal the consumer to also stream the contents. + tp.StreamingContents = true return nil } diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index ea7f75cdc38..0d70675d2ea 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -375,7 +375,7 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog } return fmt.Errorf("unexpected server EOF") } - vevents, err := vs.parseEvent(ev) + vevents, err := vs.parseEvent(ev, bufferAndTransmit) if err != nil { vs.vse.errorCounts.Add("ParseEvent", 1) return err @@ -416,7 +416,11 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog } // parseEvent parses an event from the binlog and converts it to a list of VEvents. -func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, error) { +// The bufferAndTransmit function must be passed if the event is a TransactionPayloadEvent +// as for larger payloads (> ZstdInMemoryDecompressorMaxSize) the internal events need +// to be streamed directly here in order to avoid holding the entire payload's contents, +// which can be 10s of GiBs, all in memory. +func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent, bufferAndTransmit func(vevent *binlogdatapb.VEvent) error) ([]*binlogdatapb.VEvent, error) { if !ev.IsValid() { return nil, fmt.Errorf("can't parse binlog event: invalid data: %#v", ev) } @@ -672,12 +676,33 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e } return nil, err } - tpvevents, err := vs.parseEvent(tpevent) + tpvevents, err := vs.parseEvent(tpevent, nil) // Parse the internal event if err != nil { return nil, vterrors.Wrap(err, "failed to parse transaction payload's internal event") } - vevents = append(vevents, tpvevents...) + if tp.StreamingContents { + //log.Errorf("DEBUG: processing transaction payload's internal events as a stream") + // Transmit each internal event individually to avoid buffering + // the large transaction's entire payload of events in memory, as + // the uncompressed size can be 10s or even 100s of GiBs in size. + for _, tpvevent := range tpvevents { + tpvevent.Timestamp = int64(ev.Timestamp()) + tpvevent.CurrentTime = time.Now().UnixNano() + //log.Errorf("DEBUG: streaming transaction payload's internal event: %v", tpevent) + if err := bufferAndTransmit(tpvevent); err != nil { + if err == io.EOF { + return nil, nil + } + vs.vse.errorCounts.Add("TransactionPayloadBufferAndTransmit", 1) + return nil, fmt.Errorf("error sending compressed transaction payload's internal event: %v", err) + } + } + } else { // Process the payload's internal events all at once + //log.Errorf("DEBUG: processing transaction payload's internal events all at once: %v", tpvevents) + vevents = append(vevents, tpvevents...) + } } + vs.vse.vstreamerCompressedTransactionsDecoded.Add(1) } for _, vevent := range vevents { From c705498558d84d81c49c71500194cf76a8c3db39 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 15 Nov 2024 12:33:10 -0500 Subject: [PATCH 2/6] Check for nil bufferAndTransmit function Signed-off-by: Matt Lord --- go/vt/vttablet/tabletserver/vstreamer/vstreamer.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index 0d70675d2ea..4f16bc8d5c4 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -681,14 +681,17 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent, bufferAndTransmit func(vev return nil, vterrors.Wrap(err, "failed to parse transaction payload's internal event") } if tp.StreamingContents { - //log.Errorf("DEBUG: processing transaction payload's internal events as a stream") + // log.Errorf("DEBUG: processing transaction payload's internal events as a stream") // Transmit each internal event individually to avoid buffering // the large transaction's entire payload of events in memory, as // the uncompressed size can be 10s or even 100s of GiBs in size. + if bufferAndTransmit == nil { + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[bug] cannot stream compressed transaction payload's internal events as no bufferAndTransmit function was provided") + } for _, tpvevent := range tpvevents { tpvevent.Timestamp = int64(ev.Timestamp()) tpvevent.CurrentTime = time.Now().UnixNano() - //log.Errorf("DEBUG: streaming transaction payload's internal event: %v", tpevent) + // log.Errorf("DEBUG: streaming transaction payload's internal event: %v", tpevent) if err := bufferAndTransmit(tpvevent); err != nil { if err == io.EOF { return nil, nil From 4b96ef85a4bb2747229878a572b54905ca3e9924 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 15 Nov 2024 17:04:53 -0500 Subject: [PATCH 3/6] Remove debug logs Signed-off-by: Matt Lord --- go/vt/vttablet/tabletserver/vstreamer/vstreamer.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index 4f16bc8d5c4..de4fc047893 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -681,7 +681,6 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent, bufferAndTransmit func(vev return nil, vterrors.Wrap(err, "failed to parse transaction payload's internal event") } if tp.StreamingContents { - // log.Errorf("DEBUG: processing transaction payload's internal events as a stream") // Transmit each internal event individually to avoid buffering // the large transaction's entire payload of events in memory, as // the uncompressed size can be 10s or even 100s of GiBs in size. @@ -691,7 +690,6 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent, bufferAndTransmit func(vev for _, tpvevent := range tpvevents { tpvevent.Timestamp = int64(ev.Timestamp()) tpvevent.CurrentTime = time.Now().UnixNano() - // log.Errorf("DEBUG: streaming transaction payload's internal event: %v", tpevent) if err := bufferAndTransmit(tpvevent); err != nil { if err == io.EOF { return nil, nil @@ -701,7 +699,6 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent, bufferAndTransmit func(vev } } } else { // Process the payload's internal events all at once - //log.Errorf("DEBUG: processing transaction payload's internal events all at once: %v", tpvevents) vevents = append(vevents, tpvevents...) } } From 71e2f5b170d6b8657fbd18c301265e7e6b842fdb Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 15 Nov 2024 17:08:30 -0500 Subject: [PATCH 4/6] Minor changes from self review Signed-off-by: Matt Lord --- go/vt/vttablet/tabletserver/vstreamer/vstreamer.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index de4fc047893..59db723ff2b 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -419,7 +419,7 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog // The bufferAndTransmit function must be passed if the event is a TransactionPayloadEvent // as for larger payloads (> ZstdInMemoryDecompressorMaxSize) the internal events need // to be streamed directly here in order to avoid holding the entire payload's contents, -// which can be 10s of GiBs, all in memory. +// which can be 10s or even 100s of GiBs, all in memory. func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent, bufferAndTransmit func(vevent *binlogdatapb.VEvent) error) ([]*binlogdatapb.VEvent, error) { if !ev.IsValid() { return nil, fmt.Errorf("can't parse binlog event: invalid data: %#v", ev) @@ -685,7 +685,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent, bufferAndTransmit func(vev // the large transaction's entire payload of events in memory, as // the uncompressed size can be 10s or even 100s of GiBs in size. if bufferAndTransmit == nil { - return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[bug] cannot stream compressed transaction payload's internal events as no bufferAndTransmit function was provided") + return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "[bug] cannot stream compressed transaction payload's internal events as no bufferAndTransmit function was provided") } for _, tpvevent := range tpvevents { tpvevent.Timestamp = int64(ev.Timestamp()) @@ -695,14 +695,13 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent, bufferAndTransmit func(vev return nil, nil } vs.vse.errorCounts.Add("TransactionPayloadBufferAndTransmit", 1) - return nil, fmt.Errorf("error sending compressed transaction payload's internal event: %v", err) + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "error sending compressed transaction payload's internal event: %v", err) } } } else { // Process the payload's internal events all at once vevents = append(vevents, tpvevents...) } } - vs.vse.vstreamerCompressedTransactionsDecoded.Add(1) } for _, vevent := range vevents { From 887ede8306fe9a9a82935d9edef024e0d830167f Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sun, 1 Dec 2024 22:42:11 +0100 Subject: [PATCH 5/6] Rerun CI because of change in required test names Signed-off-by: Rohit Nayak From 642219ad9bcbace029acadc8f48927cfc3bf406b Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 3 Dec 2024 08:30:57 -0500 Subject: [PATCH 6/6] Check new flag in unit test Signed-off-by: Matt Lord --- go/mysql/binlog_event_mysql56_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/go/mysql/binlog_event_mysql56_test.go b/go/mysql/binlog_event_mysql56_test.go index 861d98c6e4f..5844779de63 100644 --- a/go/mysql/binlog_event_mysql56_test.go +++ b/go/mysql/binlog_event_mysql56_test.go @@ -186,9 +186,11 @@ func TestMysql56DecodeTransactionPayload(t *testing.T) { } } if tc.inMemory { + require.False(t, tp.StreamingContents) require.Equal(t, memDecodingCnt+1, compressedTrxPayloadsInMem.Get()) require.Equal(t, tc.want, eventStrs) } else { + require.True(t, tp.StreamingContents) require.Equal(t, streamDecodingCnt+1, compressedTrxPayloadsUsingStream.Get()) require.Len(t, eventStrs, len(tc.want)) totalSize := 0