Skip to content

Commit

Permalink
Make ZstdInMemoryDecompressorMaxSize configurable
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Nov 12, 2024
1 parent f6067e0 commit 91f29f7
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 8 deletions.
14 changes: 7 additions & 7 deletions go/mysql/binlog_event_compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,6 @@ const (
// Length of the binlog event header for internal events within
// the transaction payload.
headerLen = binlogEventLenOffset + eventLenBytes

// At what size should we switch from the in-memory buffer
// decoding to streaming mode which is much slower, but does
// not require everything be done in memory.
zstdInMemoryDecompressorMaxSize = 128 << (10 * 2) // 128MiB
)

var (
Expand All @@ -75,6 +70,11 @@ var (
compressedTrxPayloadsInMem = stats.NewCounter("CompressedTransactionPayloadsInMemory", "The number of compressed binlog transaction payloads that were processed in memory")
compressedTrxPayloadsUsingStream = stats.NewCounter("CompressedTransactionPayloadsViaStream", "The number of compressed binlog transaction payloads that were processed using a stream")

// At what size should we switch from the in-memory buffer
// decoding to streaming mode which is much slower, but does
// not require everything be done in memory all at once.
ZstdInMemoryDecompressorMaxSize = uint64(128 << (10 * 2)) // 128MiB

// A concurrent stateless decoder that caches decompressors. This is
// used for smaller payloads that we want to handle entirely using
// in-memory buffers via DecodeAll.
Expand Down Expand Up @@ -284,7 +284,7 @@ func (tp *TransactionPayload) decompress() error {

// Switch to slower but less memory intensive stream mode for
// larger payloads.
if tp.uncompressedSize > zstdInMemoryDecompressorMaxSize {
if tp.uncompressedSize > ZstdInMemoryDecompressorMaxSize {
in := bytes.NewReader(tp.payload)
streamDecoder, err := statefulDecoderPool.Get(in)
if err != nil {
Expand Down Expand Up @@ -366,7 +366,7 @@ func (dp *decoderPool) Get(reader io.Reader) (*zstd.Decoder, error) {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] expected *zstd.Decoder but got %T", pooled)
}
} else {
d, err := zstd.NewReader(nil, zstd.WithDecoderMaxMemory(zstdInMemoryDecompressorMaxSize))
d, err := zstd.NewReader(nil, zstd.WithDecoderMaxMemory(ZstdInMemoryDecompressorMaxSize))
if err != nil { // Should only happen e.g. due to ENOMEM
return nil, vterrors.Wrap(err, "failed to create stateful stream decoder")
}
Expand Down
2 changes: 1 addition & 1 deletion go/mysql/binlog_event_mysql56_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func TestMysql56DecodeTransactionPayload(t *testing.T) {
totalSize += len(eventStr)
require.True(t, strings.HasPrefix(eventStr, want))
}
require.Greater(t, totalSize, zstdInMemoryDecompressorMaxSize)
require.Greater(t, uint64(totalSize), ZstdInMemoryDecompressorMaxSize)
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vttablet/common/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/spf13/pflag"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/servenv"
)

Expand Down Expand Up @@ -94,4 +95,6 @@ func registerFlags(fs *pflag.FlagSet) {
fs.BoolVar(&vreplicationStoreCompressedGTID, "vreplication_store_compressed_gtid", vreplicationStoreCompressedGTID, "Store compressed gtids in the pos column of the sidecar database's vreplication table")

fs.IntVar(&vreplicationParallelInsertWorkers, "vreplication-parallel-insert-workers", vreplicationParallelInsertWorkers, "Number of parallel insertion workers to use during copy phase. Set <= 1 to disable parallelism, or > 1 to enable concurrent insertion during copy phase.")

fs.Uint64Var(&mysql.ZstdInMemoryDecompressorMaxSize, "binlog-in-memory-decompressor-max-size", mysql.ZstdInMemoryDecompressorMaxSize, "This value sets the sizea at which the faster all in-memory binlog compressed transaction handling will switch to the slower streaming mode. It also controls the maximum memory to be used when in streaming mode.")
}

0 comments on commit 91f29f7

Please sign in to comment.