From 91f29f74b797bed1ed734ca4c5298e2d63f1a695 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 12 Nov 2024 18:58:19 -0500 Subject: [PATCH] Make ZstdInMemoryDecompressorMaxSize configurable Signed-off-by: Matt Lord --- go/mysql/binlog_event_compression.go | 14 +++++++------- go/mysql/binlog_event_mysql56_test.go | 2 +- go/vt/vttablet/common/flags.go | 3 +++ 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/go/mysql/binlog_event_compression.go b/go/mysql/binlog_event_compression.go index 1cb38d5cb16..cebc5295708 100644 --- a/go/mysql/binlog_event_compression.go +++ b/go/mysql/binlog_event_compression.go @@ -58,11 +58,6 @@ const ( // Length of the binlog event header for internal events within // the transaction payload. headerLen = binlogEventLenOffset + eventLenBytes - - // At what size should we switch from the in-memory buffer - // decoding to streaming mode which is much slower, but does - // not require everything be done in memory. - zstdInMemoryDecompressorMaxSize = 128 << (10 * 2) // 128MiB ) var ( @@ -75,6 +70,11 @@ var ( compressedTrxPayloadsInMem = stats.NewCounter("CompressedTransactionPayloadsInMemory", "The number of compressed binlog transaction payloads that were processed in memory") compressedTrxPayloadsUsingStream = stats.NewCounter("CompressedTransactionPayloadsViaStream", "The number of compressed binlog transaction payloads that were processed using a stream") + // At what size should we switch from the in-memory buffer + // decoding to streaming mode which is much slower, but does + // not require everything be done in memory all at once. + ZstdInMemoryDecompressorMaxSize = uint64(128 << (10 * 2)) // 128MiB + // A concurrent stateless decoder that caches decompressors. This is // used for smaller payloads that we want to handle entirely using // in-memory buffers via DecodeAll. @@ -284,7 +284,7 @@ func (tp *TransactionPayload) decompress() error { // Switch to slower but less memory intensive stream mode for // larger payloads. - if tp.uncompressedSize > zstdInMemoryDecompressorMaxSize { + if tp.uncompressedSize > ZstdInMemoryDecompressorMaxSize { in := bytes.NewReader(tp.payload) streamDecoder, err := statefulDecoderPool.Get(in) if err != nil { @@ -366,7 +366,7 @@ func (dp *decoderPool) Get(reader io.Reader) (*zstd.Decoder, error) { return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] expected *zstd.Decoder but got %T", pooled) } } else { - d, err := zstd.NewReader(nil, zstd.WithDecoderMaxMemory(zstdInMemoryDecompressorMaxSize)) + d, err := zstd.NewReader(nil, zstd.WithDecoderMaxMemory(ZstdInMemoryDecompressorMaxSize)) if err != nil { // Should only happen e.g. due to ENOMEM return nil, vterrors.Wrap(err, "failed to create stateful stream decoder") } diff --git a/go/mysql/binlog_event_mysql56_test.go b/go/mysql/binlog_event_mysql56_test.go index f173e27e4af..cd755e829dc 100644 --- a/go/mysql/binlog_event_mysql56_test.go +++ b/go/mysql/binlog_event_mysql56_test.go @@ -191,7 +191,7 @@ func TestMysql56DecodeTransactionPayload(t *testing.T) { totalSize += len(eventStr) require.True(t, strings.HasPrefix(eventStr, want)) } - require.Greater(t, totalSize, zstdInMemoryDecompressorMaxSize) + require.Greater(t, uint64(totalSize), ZstdInMemoryDecompressorMaxSize) } } } diff --git a/go/vt/vttablet/common/flags.go b/go/vt/vttablet/common/flags.go index f9775b8af3e..b198d7a4bbb 100644 --- a/go/vt/vttablet/common/flags.go +++ b/go/vt/vttablet/common/flags.go @@ -21,6 +21,7 @@ import ( "github.com/spf13/pflag" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/servenv" ) @@ -94,4 +95,6 @@ func registerFlags(fs *pflag.FlagSet) { fs.BoolVar(&vreplicationStoreCompressedGTID, "vreplication_store_compressed_gtid", vreplicationStoreCompressedGTID, "Store compressed gtids in the pos column of the sidecar database's vreplication table") fs.IntVar(&vreplicationParallelInsertWorkers, "vreplication-parallel-insert-workers", vreplicationParallelInsertWorkers, "Number of parallel insertion workers to use during copy phase. Set <= 1 to disable parallelism, or > 1 to enable concurrent insertion during copy phase.") + + fs.Uint64Var(&mysql.ZstdInMemoryDecompressorMaxSize, "binlog-in-memory-decompressor-max-size", mysql.ZstdInMemoryDecompressorMaxSize, "This value sets the sizea at which the faster all in-memory binlog compressed transaction handling will switch to the slower streaming mode. It also controls the maximum memory to be used when in streaming mode.") }