From 81c1d8be6e490a43c130bf1b9bf029ea11fe294c Mon Sep 17 00:00:00 2001 From: Urvi Date: Wed, 8 May 2024 16:19:36 -0700 Subject: [PATCH] exp/services/ledgerexporter: Change default compression algorithm from gzip to zstd --- exp/services/ledgerexporter/config.toml | 1 - .../ledgerexporter/internal/app_test.go | 8 +- .../ledgerexporter/internal/config_test.go | 1 - .../internal/exportmanager_test.go | 40 ++++----- .../ledgerexporter/internal/uploader.go | 11 +-- .../ledgerexporter/internal/uploader_test.go | 12 +-- go.mod | 2 +- .../ledgerbackend/buffered_storage_backend.go | 14 +--- .../buffered_storage_backend_test.go | 19 ++--- support/compressxdr/compress_xdr.go | 73 ++++++++++++----- support/compressxdr/compress_xdr_test.go | 20 ++--- support/compressxdr/compressor.go | 40 +++++++++ support/compressxdr/gzip_compress_xdr.go | 55 ------------- support/compressxdr/mocks.go | 1 - support/datastore/ledgerbatch_config.go | 4 +- support/datastore/ledgerbatch_config_test.go | 2 +- support/datastore/resumablemanager_test.go | 81 ++++++++----------- 17 files changed, 180 insertions(+), 204 deletions(-) create mode 100644 support/compressxdr/compressor.go delete mode 100644 support/compressxdr/gzip_compress_xdr.go diff --git a/exp/services/ledgerexporter/config.toml b/exp/services/ledgerexporter/config.toml index bc522ff6a8..a56087427c 100644 --- a/exp/services/ledgerexporter/config.toml +++ b/exp/services/ledgerexporter/config.toml @@ -9,7 +9,6 @@ destination_bucket_path = "exporter-test/ledgers" [exporter_config] ledgers_per_file = 1 files_per_partition = 64000 - file_suffix = ".xdr.gz" [stellar_core_config] stellar_core_binary_path = "/usr/local/bin/stellar-core" diff --git a/exp/services/ledgerexporter/internal/app_test.go b/exp/services/ledgerexporter/internal/app_test.go index e2db0e28d9..2063fd21a2 100644 --- a/exp/services/ledgerexporter/internal/app_test.go +++ b/exp/services/ledgerexporter/internal/app_test.go @@ -41,7 +41,7 @@ func TestApplyResumeInvalidDataStoreLedgersPerFileBoundary(t *testing.T) { StartLedger: 3, EndLedger: 9, Resume: true, - LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50, FileSuffix: ".xdr.gz"}, + LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50}, } mockResumableManager := &datastore.MockResumableManager{} // simulate the datastore has inconsistent data, @@ -61,7 +61,7 @@ func TestApplyResumeWithPartialRemoteDataPresent(t *testing.T) { StartLedger: 10, EndLedger: 99, Resume: true, - LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50, FileSuffix: ".xdr.gz"}, + LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50}, } mockResumableManager := &datastore.MockResumableManager{} // simulates a data store that had ledger files populated up to seq=49, so the first absent ledger would be 50 @@ -80,7 +80,7 @@ func TestApplyResumeWithNoRemoteDataPresent(t *testing.T) { StartLedger: 10, EndLedger: 99, Resume: true, - LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50, FileSuffix: ".xdr.gz"}, + LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50}, } mockResumableManager := &datastore.MockResumableManager{} // simulates a data store that had no data in the requested range @@ -102,7 +102,7 @@ func TestApplyResumeWithNoRemoteDataAndRequestFromGenesis(t *testing.T) { StartLedger: 2, EndLedger: 99, Resume: true, - LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50, FileSuffix: ".xdr.gz"}, + LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50}, } mockResumableManager := &datastore.MockResumableManager{} // simulates a data store that had no data in the requested range diff --git a/exp/services/ledgerexporter/internal/config_test.go b/exp/services/ledgerexporter/internal/config_test.go index 036c121455..86f6cfb5b3 100644 --- a/exp/services/ledgerexporter/internal/config_test.go +++ b/exp/services/ledgerexporter/internal/config_test.go @@ -22,7 +22,6 @@ func TestNewConfigResumeEnabled(t *testing.T) { require.Equal(t, config.DataStoreConfig.Type, "ABC") require.Equal(t, config.LedgerBatchConfig.FilesPerPartition, uint32(1)) require.Equal(t, config.LedgerBatchConfig.LedgersPerFile, uint32(3)) - require.Equal(t, config.LedgerBatchConfig.FileSuffix, ".xdr.gz") require.True(t, config.Resume) url, ok := config.DataStoreConfig.Params["destination_bucket_path"] require.True(t, ok) diff --git a/exp/services/ledgerexporter/internal/exportmanager_test.go b/exp/services/ledgerexporter/internal/exportmanager_test.go index 01e3595373..74d54b3022 100644 --- a/exp/services/ledgerexporter/internal/exportmanager_test.go +++ b/exp/services/ledgerexporter/internal/exportmanager_test.go @@ -15,6 +15,7 @@ import ( "github.com/stellar/go/ingest/ledgerbackend" "github.com/stellar/go/support/collections/set" + "github.com/stellar/go/support/compressxdr" "github.com/stellar/go/support/datastore" ) @@ -39,7 +40,7 @@ func (s *ExportManagerSuite) TearDownTest() { } func (s *ExportManagerSuite) TestInvalidExportConfig() { - config := datastore.LedgerBatchConfig{LedgersPerFile: 0, FilesPerPartition: 10, FileSuffix: ".xdr.gz"} + config := datastore.LedgerBatchConfig{LedgersPerFile: 0, FilesPerPartition: 10} registry := prometheus.NewRegistry() queue := NewUploadQueue(1, registry) _, err := NewExportManager(config, &s.mockBackend, queue, registry) @@ -47,7 +48,7 @@ func (s *ExportManagerSuite) TestInvalidExportConfig() { } func (s *ExportManagerSuite) TestRun() { - config := datastore.LedgerBatchConfig{LedgersPerFile: 64, FilesPerPartition: 10, FileSuffix: ".xdr.gz"} + config := datastore.LedgerBatchConfig{LedgersPerFile: 64, FilesPerPartition: 10} registry := prometheus.NewRegistry() queue := NewUploadQueue(1, registry) exporter, err := NewExportManager(config, &s.mockBackend, queue, registry) @@ -97,7 +98,7 @@ func (s *ExportManagerSuite) TestRun() { } func (s *ExportManagerSuite) TestRunContextCancel() { - config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 1, FileSuffix: ".xdr.gz"} + config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 1} registry := prometheus.NewRegistry() queue := NewUploadQueue(1, registry) exporter, err := NewExportManager(config, &s.mockBackend, queue, registry) @@ -126,7 +127,7 @@ func (s *ExportManagerSuite) TestRunContextCancel() { } func (s *ExportManagerSuite) TestRunWithCanceledContext() { - config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 10, FileSuffix: ".xdr.gz"} + config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 10} registry := prometheus.NewRegistry() queue := NewUploadQueue(1, registry) exporter, err := NewExportManager(config, &s.mockBackend, queue, registry) @@ -143,32 +144,31 @@ func (s *ExportManagerSuite) TestGetObjectKeyFromSequenceNumber() { filesPerPartition uint32 ledgerSeq uint32 ledgersPerFile uint32 - fileSuffix string expectedKey string }{ - {0, 5, 1, ".xdr.gz", "5.xdr.gz"}, - {0, 5, 10, ".xdr.gz", "0-9.xdr.gz"}, - {2, 10, 100, ".xdr.gz", "0-199/0-99.xdr.gz"}, - {2, 150, 50, ".xdr.gz", "100-199/150-199.xdr.gz"}, - {2, 300, 200, ".xdr.gz", "0-399/200-399.xdr.gz"}, - {2, 1, 1, ".xdr.gz", "0-1/1.xdr.gz"}, - {4, 10, 100, ".xdr.gz", "0-399/0-99.xdr.gz"}, - {4, 250, 50, ".xdr.gz", "200-399/250-299.xdr.gz"}, - {1, 300, 200, ".xdr.gz", "200-399.xdr.gz"}, - {1, 1, 1, ".xdr.gz", "1.xdr.gz"}, + {0, 5, 1, "5"}, + {0, 5, 10, "0-9"}, + {2, 10, 100, "0-199/0-99"}, + {2, 150, 50, "100-199/150-199"}, + {2, 300, 200, "0-399/200-399"}, + {2, 1, 1, "0-1/1"}, + {4, 10, 100, "0-399/0-99"}, + {4, 250, 50, "200-399/250-299"}, + {1, 300, 200, "200-399"}, + {1, 1, 1, "1"}, } for _, tc := range testCases { s.T().Run(fmt.Sprintf("LedgerSeq-%d-LedgersPerFile-%d", tc.ledgerSeq, tc.ledgersPerFile), func(t *testing.T) { - config := datastore.LedgerBatchConfig{FilesPerPartition: tc.filesPerPartition, LedgersPerFile: tc.ledgersPerFile, FileSuffix: tc.fileSuffix} + config := datastore.LedgerBatchConfig{FilesPerPartition: tc.filesPerPartition, LedgersPerFile: tc.ledgersPerFile} key := config.GetObjectKeyFromSequenceNumber(tc.ledgerSeq) - require.Equal(t, tc.expectedKey, key) + require.Equal(t, tc.expectedKey+".xdr."+compressxdr.DefaultCompressor.GetName(), key) }) } } func (s *ExportManagerSuite) TestAddLedgerCloseMeta() { - config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 10, FileSuffix: ".xdr.gz"} + config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 10} registry := prometheus.NewRegistry() queue := NewUploadQueue(1, registry) exporter, err := NewExportManager(config, &s.mockBackend, queue, registry) @@ -206,7 +206,7 @@ func (s *ExportManagerSuite) TestAddLedgerCloseMeta() { } func (s *ExportManagerSuite) TestAddLedgerCloseMetaContextCancel() { - config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 10, FileSuffix: ".xdr.gz"} + config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 10} registry := prometheus.NewRegistry() queue := NewUploadQueue(1, registry) exporter, err := NewExportManager(config, &s.mockBackend, queue, registry) @@ -224,7 +224,7 @@ func (s *ExportManagerSuite) TestAddLedgerCloseMetaContextCancel() { } func (s *ExportManagerSuite) TestAddLedgerCloseMetaKeyMismatch() { - config := datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 1, FileSuffix: ".xdr.gz"} + config := datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 1} registry := prometheus.NewRegistry() queue := NewUploadQueue(1, registry) exporter, err := NewExportManager(config, &s.mockBackend, queue, registry) diff --git a/exp/services/ledgerexporter/internal/uploader.go b/exp/services/ledgerexporter/internal/uploader.go index ca07965b85..38bd84bbab 100644 --- a/exp/services/ledgerexporter/internal/uploader.go +++ b/exp/services/ledgerexporter/internal/uploader.go @@ -83,15 +83,10 @@ func (u Uploader) Upload(ctx context.Context, metaArchive *datastore.LedgerMetaA startTime := time.Now() numLedgers := strconv.FormatUint(uint64(metaArchive.GetLedgerCount()), 10) - // TODO: Add compression config and optimize best compression algorithm - // JIRA https://stellarorg.atlassian.net/browse/HUBBLE-368 - xdrEncoder, err := compressxdr.NewXDREncoder(compressxdr.GZIP, &metaArchive.Data) - if err != nil { - return err - } + xdrEncoder := compressxdr.NewXDREncoder(compressxdr.DefaultCompressor, &metaArchive.Data) writerTo := &writerToRecorder{ - WriterTo: xdrEncoder, + WriterTo: &xdrEncoder, } ok, err := u.dataStore.PutFileIfNotExists(ctx, metaArchive.GetObjectKey(), writerTo) if err != nil { @@ -109,7 +104,7 @@ func (u Uploader) Upload(ctx context.Context, metaArchive *datastore.LedgerMetaA "already_exists": alreadyExists, }).Observe(float64(writerTo.totalUncompressed)) u.objectSizeMetrics.With(prometheus.Labels{ - "compression": compressxdr.GZIP, + "compression": xdrEncoder.Compressor.GetName(), "ledgers": numLedgers, "already_exists": alreadyExists, }).Observe(float64(writerTo.totalCompressed)) diff --git a/exp/services/ledgerexporter/internal/uploader_test.go b/exp/services/ledgerexporter/internal/uploader_test.go index ee39245486..9f5998e148 100644 --- a/exp/services/ledgerexporter/internal/uploader_test.go +++ b/exp/services/ledgerexporter/internal/uploader_test.go @@ -62,11 +62,10 @@ func (s *UploaderSuite) testUpload(putOkReturnVal bool) { expectedCompressedLength := capturedBuf.Len() var decodedArchive datastore.LedgerMetaArchive - xdrDecoder, err := compressxdr.NewXDRDecoder(compressxdr.GZIP, &decodedArchive.Data) - require.NoError(s.T(), err) + xdrDecoder := compressxdr.NewXDRDecoder(compressxdr.DefaultCompressor, &decodedArchive.Data) decoder := xdrDecoder - _, err = decoder.ReadFrom(&capturedBuf) + _, err := decoder.ReadFrom(&capturedBuf) require.NoError(s.T(), err) // require that the decoded data matches the original test data @@ -98,7 +97,7 @@ func (s *UploaderSuite) testUpload(putOkReturnVal bool) { metric, err = dataUploader.objectSizeMetrics.MetricVec.GetMetricWith(prometheus.Labels{ "ledgers": "100", - "compression": compressxdr.GZIP, + "compression": decoder.Compressor.GetName(), "already_exists": strconv.FormatBool(alreadyExists), }) require.NoError(s.T(), err) @@ -114,7 +113,7 @@ func (s *UploaderSuite) testUpload(putOkReturnVal bool) { ) metric, err = dataUploader.objectSizeMetrics.MetricVec.GetMetricWith(prometheus.Labels{ "ledgers": "100", - "compression": compressxdr.GZIP, + "compression": decoder.Compressor.GetName(), "already_exists": strconv.FormatBool(!alreadyExists), }) require.NoError(s.T(), err) @@ -172,6 +171,7 @@ func (s *UploaderSuite) testUploadPutError(putOkReturnVal bool) { dataUploader := NewUploader(&s.mockDataStore, queue, registry) err := dataUploader.Upload(context.Background(), archive) require.Equal(s.T(), fmt.Sprintf("error uploading %s: error in PutFileIfNotExists", key), err.Error()) + decoder := compressxdr.NewXDRDecoder(compressxdr.DefaultCompressor, nil) for _, alreadyExists := range []string{"true", "false"} { metric, err := dataUploader.uploadDurationMetric.MetricVec.GetMetricWith(prometheus.Labels{ @@ -185,7 +185,7 @@ func (s *UploaderSuite) testUploadPutError(putOkReturnVal bool) { getMetricValue(metric).GetSummary().GetSampleCount(), ) - for _, compression := range []string{compressxdr.GZIP, "none"} { + for _, compression := range []string{decoder.Compressor.GetName(), "none"} { metric, err = dataUploader.objectSizeMetrics.MetricVec.GetMetricWith(prometheus.Labels{ "ledgers": "100", "compression": compression, diff --git a/go.mod b/go.mod index fb02c94507..e703f2bdfd 100644 --- a/go.mod +++ b/go.mod @@ -122,7 +122,7 @@ require ( github.com/imkira/go-interpol v1.1.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect - github.com/klauspost/compress v1.17.0 // indirect + github.com/klauspost/compress v1.17.0 github.com/kr/text v0.2.0 // indirect github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect diff --git a/ingest/ledgerbackend/buffered_storage_backend.go b/ingest/ledgerbackend/buffered_storage_backend.go index b28e68b63c..f7e3880d7b 100644 --- a/ingest/ledgerbackend/buffered_storage_backend.go +++ b/ingest/ledgerbackend/buffered_storage_backend.go @@ -20,7 +20,6 @@ var _ LedgerBackend = (*BufferedStorageBackend)(nil) type BufferedStorageBackendConfig struct { LedgerBatchConfig datastore.LedgerBatchConfig - CompressionType string DataStore datastore.DataStore BufferSize uint32 NumWorkers uint32 @@ -65,19 +64,8 @@ func NewBufferedStorageBackend(ctx context.Context, config BufferedStorageBacken return nil, errors.New("ledgersPerFile must be > 0") } - if config.LedgerBatchConfig.FileSuffix == "" { - return nil, errors.New("no file suffix provided in LedgerBatchConfig") - } - - if config.CompressionType == "" { - return nil, errors.New("no compression type provided in config") - } - ledgerMetaArchive := datastore.NewLedgerMetaArchive("", 0, 0) - decoder, err := compressxdr.NewXDRDecoder(config.CompressionType, nil) - if err != nil { - return nil, err - } + decoder := compressxdr.NewXDRDecoder(compressxdr.DefaultCompressor, nil) bsBackend := &BufferedStorageBackend{ config: config, diff --git a/ingest/ledgerbackend/buffered_storage_backend_test.go b/ingest/ledgerbackend/buffered_storage_backend_test.go index f132bc9419..58ffc2cf06 100644 --- a/ingest/ledgerbackend/buffered_storage_backend_test.go +++ b/ingest/ledgerbackend/buffered_storage_backend_test.go @@ -28,14 +28,12 @@ func createBufferedStorageBackendConfigForTesting() BufferedStorageBackendConfig ledgerBatchConfig := datastore.LedgerBatchConfig{ LedgersPerFile: 1, FilesPerPartition: 64000, - FileSuffix: ".xdr.gz", } dataStore := new(datastore.MockDataStore) return BufferedStorageBackendConfig{ LedgerBatchConfig: ledgerBatchConfig, - CompressionType: compressxdr.GZIP, DataStore: dataStore, BufferSize: 100, NumWorkers: 5, @@ -47,7 +45,7 @@ func createBufferedStorageBackendConfigForTesting() BufferedStorageBackendConfig func createBufferedStorageBackendForTesting() BufferedStorageBackend { config := createBufferedStorageBackendConfigForTesting() ledgerMetaArchive := datastore.NewLedgerMetaArchive("", 0, 0) - decoder, _ := compressxdr.NewXDRDecoder(config.CompressionType, nil) + decoder := compressxdr.NewXDRDecoder(compressxdr.DefaultCompressor, nil) return BufferedStorageBackend{ config: config, @@ -66,10 +64,10 @@ func createMockdataStore(t *testing.T, start, end, partitionSize, count uint32) if count > 1 { endFileSeq := i + count - 1 readCloser = createLCMBatchReader(i, endFileSeq, count) - objectName = fmt.Sprintf("0-%d/%d-%d.xdr.gz", partition, i, endFileSeq) + objectName = fmt.Sprintf("0-%d/%d-%d.xdr.zstd", partition, i, endFileSeq) } else { readCloser = createLCMBatchReader(i, i, count) - objectName = fmt.Sprintf("0-%d/%d.xdr.gz", partition, i) + objectName = fmt.Sprintf("0-%d/%d.xdr.zstd", partition, i) } mockDataStore.On("GetFile", mock.Anything, objectName).Return(readCloser, nil) } @@ -104,7 +102,7 @@ func createTestLedgerCloseMetaBatch(startSeq, endSeq, count uint32) xdr.LedgerCl func createLCMBatchReader(start, end, count uint32) io.ReadCloser { testData := createTestLedgerCloseMetaBatch(start, end, count) - encoder, _ := compressxdr.NewXDREncoder(compressxdr.GZIP, testData) + encoder := compressxdr.NewXDREncoder(compressxdr.DefaultCompressor, testData) var buf bytes.Buffer encoder.WriteTo(&buf) capturedBuf := buf.Bytes() @@ -120,7 +118,6 @@ func TestNewBufferedStorageBackend(t *testing.T) { assert.NoError(t, err) assert.Equal(t, bsb.dataStore, config.DataStore) - assert.Equal(t, ".xdr.gz", bsb.config.LedgerBatchConfig.FileSuffix) assert.Equal(t, uint32(1), bsb.config.LedgerBatchConfig.LedgersPerFile) assert.Equal(t, uint32(64000), bsb.config.LedgerBatchConfig.FilesPerPartition) assert.Equal(t, uint32(100), bsb.config.BufferSize) @@ -440,7 +437,7 @@ func TestLedgerBufferClose(t *testing.T) { mockDataStore := new(datastore.MockDataStore) partition := ledgerPerFileCount*partitionSize - 1 - objectName := fmt.Sprintf("0-%d/%d.xdr.gz", partition, 3) + objectName := fmt.Sprintf("0-%d/%d.xdr.zstd", partition, 3) afterPrepareRange := make(chan struct{}) mockDataStore.On("GetFile", mock.Anything, objectName).Return(io.NopCloser(&bytes.Buffer{}), context.Canceled).Run(func(args mock.Arguments) { <-afterPrepareRange @@ -472,7 +469,7 @@ func TestLedgerBufferBoundedObjectNotFound(t *testing.T) { mockDataStore := new(datastore.MockDataStore) partition := ledgerPerFileCount*partitionSize - 1 - objectName := fmt.Sprintf("0-%d/%d.xdr.gz", partition, 3) + objectName := fmt.Sprintf("0-%d/%d.xdr.zstd", partition, 3) mockDataStore.On("GetFile", mock.Anything, objectName).Return(io.NopCloser(&bytes.Buffer{}), os.ErrNotExist).Once() t.Cleanup(func() { mockDataStore.AssertExpectations(t) @@ -498,7 +495,7 @@ func TestLedgerBufferUnboundedObjectNotFound(t *testing.T) { mockDataStore := new(datastore.MockDataStore) partition := ledgerPerFileCount*partitionSize - 1 - objectName := fmt.Sprintf("0-%d/%d.xdr.gz", partition, 3) + objectName := fmt.Sprintf("0-%d/%d.xdr.zstd", partition, 3) iteration := &atomic.Int32{} cancelAfter := int32(bsb.config.RetryLimit) + 2 mockDataStore.On("GetFile", mock.Anything, objectName).Return(io.NopCloser(&bytes.Buffer{}), os.ErrNotExist).Run(func(args mock.Arguments) { @@ -530,7 +527,7 @@ func TestLedgerBufferRetryLimit(t *testing.T) { mockDataStore := new(datastore.MockDataStore) partition := ledgerPerFileCount*partitionSize - 1 - objectName := fmt.Sprintf("0-%d/%d.xdr.gz", partition, 3) + objectName := fmt.Sprintf("0-%d/%d.xdr.zstd", partition, 3) mockDataStore.On("GetFile", mock.Anything, objectName). Return(io.NopCloser(&bytes.Buffer{}), fmt.Errorf("transient error")). Times(int(bsb.config.RetryLimit) + 1) diff --git a/support/compressxdr/compress_xdr.go b/support/compressxdr/compress_xdr.go index c43a1bca77..59c9009ea4 100644 --- a/support/compressxdr/compress_xdr.go +++ b/support/compressxdr/compress_xdr.go @@ -3,36 +3,67 @@ package compressxdr import ( "io" - "github.com/stellar/go/support/errors" + xdr3 "github.com/stellar/go-xdr/xdr3" ) -const ( - GZIP = "gzip" -) +func NewXDREncoder(compressor Compressor, xdrPayload interface{}) XDREncoder { + return XDREncoder{Compressor: compressor, XdrPayload: xdrPayload} +} + +func NewXDRDecoder(compressor Compressor, xdrPayload interface{}) XDRDecoder { + return XDRDecoder{Compressor: compressor, XdrPayload: xdrPayload} + +} + +// XDREncoder combines compression with XDR encoding +type XDREncoder struct { + Compressor Compressor + XdrPayload interface{} +} -type XDREncoder interface { - WriteTo(w io.Writer) (int64, error) +// WriteTo writes the XDR compressed encoded data +func (e *XDREncoder) WriteTo(w io.Writer) (int64, error) { + zw, err := e.Compressor.NewWriter(w) + if err != nil { + return 0, err + } + defer zw.Close() + + n, err := xdr3.Marshal(zw, e.XdrPayload) + return int64(n), err } -type XDRDecoder interface { - ReadFrom(r io.Reader) (int64, error) - Unzip(r io.Reader) ([]byte, error) +// XDRDecoder combines decompression with XDR decoding +type XDRDecoder struct { + Compressor Compressor + XdrPayload interface{} } -func NewXDREncoder(compressionType string, xdrPayload interface{}) (XDREncoder, error) { - switch compressionType { - case GZIP: - return &XDRGzipEncoder{XdrPayload: xdrPayload}, nil - default: - return nil, errors.Errorf("invalid compression type %s, not supported", compressionType) +// ReadFrom reads XDR compressed encoded data +func (d *XDRDecoder) ReadFrom(r io.Reader) (int64, error) { + zr, err := d.Compressor.NewReader(r) + if err != nil { + return 0, err } + defer zr.Close() + + n, err := xdr3.Unmarshal(zr, d.XdrPayload) + return int64(n), err } -func NewXDRDecoder(compressionType string, xdrPayload interface{}) (XDRDecoder, error) { - switch compressionType { - case GZIP: - return &XDRGzipDecoder{XdrPayload: xdrPayload}, nil - default: - return nil, errors.Errorf("invalid compression type %s, not supported", compressionType) +// Unzip returns the decompressed bytes. +func (d *XDRDecoder) Unzip(r io.Reader) ([]byte, error) { + zr, err := d.Compressor.NewReader(r) + if err != nil { + return nil, err } + + defer zr.Close() + + objectBytes, err := io.ReadAll(zr) + if err != nil { + return nil, err + } + + return objectBytes, nil } diff --git a/support/compressxdr/compress_xdr_test.go b/support/compressxdr/compress_xdr_test.go index da94ab25ae..735fa275d2 100644 --- a/support/compressxdr/compress_xdr_test.go +++ b/support/compressxdr/compress_xdr_test.go @@ -20,21 +20,19 @@ func createTestLedgerCloseMetaBatch(startSeq, endSeq uint32, count int) xdr.Ledg } } -func TestEncodeDecodeLedgerCloseMetaBatchGzip(t *testing.T) { +func TestEncodeDecodeLedgerCloseMetaBatch(t *testing.T) { testData := createTestLedgerCloseMetaBatch(1000, 1005, 6) // Encode the test data - encoder, err := NewXDREncoder(GZIP, testData) - require.NoError(t, err) + encoder := NewXDREncoder(DefaultCompressor, testData) var buf bytes.Buffer - _, err = encoder.WriteTo(&buf) + _, err := encoder.WriteTo(&buf) require.NoError(t, err) // Decode the encoded data lcmBatch := xdr.LedgerCloseMetaBatch{} - decoder, err := NewXDRDecoder(GZIP, &lcmBatch) - require.NoError(t, err) + decoder := NewXDRDecoder(DefaultCompressor, &lcmBatch) _, err = decoder.ReadFrom(&buf) require.NoError(t, err) @@ -49,22 +47,20 @@ func TestEncodeDecodeLedgerCloseMetaBatchGzip(t *testing.T) { } } -func TestDecodeUnzipGzip(t *testing.T) { +func TestDecodeUnzip(t *testing.T) { expectedBinary := []byte{0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0} testData := createTestLedgerCloseMetaBatch(2, 2, 1) // Encode the test data - encoder, err := NewXDREncoder(GZIP, testData) - require.NoError(t, err) + encoder := NewXDREncoder(DefaultCompressor, testData) var buf bytes.Buffer - _, err = encoder.WriteTo(&buf) + _, err := encoder.WriteTo(&buf) require.NoError(t, err) // Decode the encoded data lcmBatch := xdr.LedgerCloseMetaBatch{} - decoder, err := NewXDRDecoder(GZIP, &lcmBatch) - require.NoError(t, err) + decoder := NewXDRDecoder(DefaultCompressor, &lcmBatch) binary, err := decoder.Unzip(&buf) require.NoError(t, err) diff --git a/support/compressxdr/compressor.go b/support/compressxdr/compressor.go new file mode 100644 index 0000000000..03b3f9e9c5 --- /dev/null +++ b/support/compressxdr/compressor.go @@ -0,0 +1,40 @@ +package compressxdr + +import ( + "github.com/klauspost/compress/zstd" + "io" +) + +var DefaultCompressor = &ZstdCompressor{} + +// Compressor represents a compression algorithm. +type Compressor interface { + // NewWriter creates a new compressor for writing compressed data. + NewWriter(w io.Writer) (io.WriteCloser, error) + // NewReader creates a new decompressor for reading compressed data. + NewReader(r io.Reader) (io.ReadCloser, error) + // GetName returns the name of the compression algorithm. + GetName() string +} + +// ZstdCompressor is an implementation of the Compressor interface for Zstd compression. +type ZstdCompressor struct{} + +// GetName returns the name of the compression algorithm. +func (z *ZstdCompressor) GetName() string { + return "zstd" +} + +// NewWriter creates a new Zstd writer. +func (z *ZstdCompressor) NewWriter(w io.Writer) (io.WriteCloser, error) { + return zstd.NewWriter(w) +} + +// NewReader creates a new Zstd reader. +func (z *ZstdCompressor) NewReader(r io.Reader) (io.ReadCloser, error) { + zr, err := zstd.NewReader(r) + if err != nil { + return nil, err + } + return zr.IOReadCloser(), err +} diff --git a/support/compressxdr/gzip_compress_xdr.go b/support/compressxdr/gzip_compress_xdr.go deleted file mode 100644 index 9171b87ffd..0000000000 --- a/support/compressxdr/gzip_compress_xdr.go +++ /dev/null @@ -1,55 +0,0 @@ -package compressxdr - -import ( - "compress/gzip" - "io" - - xdr3 "github.com/stellar/go-xdr/xdr3" -) - -type XDRGzipEncoder struct { - XdrPayload interface{} -} - -func (g *XDRGzipEncoder) WriteTo(w io.Writer) (int64, error) { - gw := gzip.NewWriter(w) - n, err := xdr3.Marshal(gw, g.XdrPayload) - if err != nil { - return int64(n), err - } - return int64(n), gw.Close() -} - -type XDRGzipDecoder struct { - XdrPayload interface{} -} - -func (d *XDRGzipDecoder) ReadFrom(r io.Reader) (int64, error) { - gr, err := gzip.NewReader(r) - if err != nil { - return 0, err - } - defer gr.Close() - - n, err := xdr3.Unmarshal(gr, d.XdrPayload) - if err != nil { - return int64(n), err - } - return int64(n), nil -} - -func (d *XDRGzipDecoder) Unzip(r io.Reader) ([]byte, error) { - gzipReader, err := gzip.NewReader(r) - if err != nil { - return nil, err - } - - defer gzipReader.Close() - - objectBytes, err := io.ReadAll(gzipReader) - if err != nil { - return nil, err - } - - return objectBytes, nil -} diff --git a/support/compressxdr/mocks.go b/support/compressxdr/mocks.go index 2525ca4477..50c440da20 100644 --- a/support/compressxdr/mocks.go +++ b/support/compressxdr/mocks.go @@ -20,4 +20,3 @@ func (m *MockXDRDecoder) Unzip(r io.Reader) ([]byte, error) { return args.Get(0).([]byte), args.Error(1) } -var _ XDRDecoder = &MockXDRDecoder{} diff --git a/support/datastore/ledgerbatch_config.go b/support/datastore/ledgerbatch_config.go index eca8bbe737..990551ffde 100644 --- a/support/datastore/ledgerbatch_config.go +++ b/support/datastore/ledgerbatch_config.go @@ -2,12 +2,12 @@ package datastore import ( "fmt" + "github.com/stellar/go/support/compressxdr" ) type LedgerBatchConfig struct { LedgersPerFile uint32 `toml:"ledgers_per_file"` FilesPerPartition uint32 `toml:"files_per_partition"` - FileSuffix string `toml:"file_suffix"` } func (ec LedgerBatchConfig) GetSequenceNumberStartBoundary(ledgerSeq uint32) uint32 { @@ -40,7 +40,7 @@ func (ec LedgerBatchConfig) GetObjectKeyFromSequenceNumber(ledgerSeq uint32) str if fileStart != fileEnd { objectKey += fmt.Sprintf("-%d", fileEnd) } - objectKey += ec.FileSuffix + objectKey += fmt.Sprintf(".xdr.%s", compressxdr.DefaultCompressor.GetName()) return objectKey } diff --git a/support/datastore/ledgerbatch_config_test.go b/support/datastore/ledgerbatch_config_test.go index 9b2e466677..e3f6184aeb 100644 --- a/support/datastore/ledgerbatch_config_test.go +++ b/support/datastore/ledgerbatch_config_test.go @@ -29,7 +29,7 @@ func TestGetObjectKeyFromSequenceNumber(t *testing.T) { for _, tc := range testCases { t.Run(fmt.Sprintf("LedgerSeq-%d-LedgersPerFile-%d", tc.ledgerSeq, tc.ledgersPerFile), func(t *testing.T) { - config := LedgerBatchConfig{FilesPerPartition: tc.filesPerPartition, LedgersPerFile: tc.ledgersPerFile, FileSuffix: tc.fileSuffix} + config := LedgerBatchConfig{FilesPerPartition: tc.filesPerPartition, LedgersPerFile: tc.ledgersPerFile} key := config.GetObjectKeyFromSequenceNumber(tc.ledgerSeq) require.Equal(t, tc.expectedKey, key) }) diff --git a/support/datastore/resumablemanager_test.go b/support/datastore/resumablemanager_test.go index 34279682ef..6147e0346d 100644 --- a/support/datastore/resumablemanager_test.go +++ b/support/datastore/resumablemanager_test.go @@ -32,7 +32,6 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), - FileSuffix: ".xdr.gz", }, networkName: "test", errorSnippet: "archive error", @@ -47,7 +46,6 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), - FileSuffix: ".xdr.gz", }, networkName: "test", }, @@ -60,7 +58,6 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), - FileSuffix: ".xdr.gz", }, networkName: "test", }, @@ -73,7 +70,6 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), - FileSuffix: ".xdr.gz", }, networkName: "test", errorSnippet: "datastore error happened", @@ -87,7 +83,6 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), - FileSuffix: ".xdr.gz", }, networkName: "test", }, @@ -100,7 +95,6 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), - FileSuffix: ".xdr.gz", }, networkName: "test", }, @@ -113,7 +107,6 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), - FileSuffix: ".xdr.gz", }, networkName: "test", }, @@ -126,7 +119,6 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), - FileSuffix: ".xdr.gz", }, networkName: "test", }, @@ -139,7 +131,6 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), - FileSuffix: ".xdr.gz", }, networkName: "test", errorSnippet: "Invalid start value", @@ -153,7 +144,6 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), - FileSuffix: ".xdr.gz", }, networkName: "test2", latestLedger: uint32(2000), @@ -167,7 +157,6 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), - FileSuffix: ".xdr.gz", }, networkName: "test3", latestLedger: uint32(3000), @@ -181,7 +170,6 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), - FileSuffix: ".xdr.gz", }, networkName: "test4", latestLedger: uint32(4000), @@ -195,7 +183,6 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), - FileSuffix: ".xdr.gz", }, networkName: "test5", latestLedger: uint32(5000), @@ -208,58 +195,58 @@ func TestResumability(t *testing.T) { mockDataStore := &MockDataStore{} //"End ledger same as start, data store has it" - mockDataStore.On("Exists", ctx, "0-9.xdr.gz").Return(true, nil).Once() + mockDataStore.On("Exists", ctx, "0-9.xdr.zstd").Return(true, nil).Once() //"End ledger same as start, data store does not have it" - mockDataStore.On("Exists", ctx, "10-19.xdr.gz").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "10-19.xdr.zstd").Return(false, nil).Once() //"binary search encounters an error during datastore retrieval", - mockDataStore.On("Exists", ctx, "20-29.xdr.gz").Return(false, errors.New("datastore error happened")).Once() + mockDataStore.On("Exists", ctx, "20-29.xdr.zstd").Return(false, errors.New("datastore error happened")).Once() //"Data store is beyond boundary aligned start ledger" - mockDataStore.On("Exists", ctx, "30-39.xdr.gz").Return(true, nil).Once() - mockDataStore.On("Exists", ctx, "40-49.xdr.gz").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "30-39.xdr.zstd").Return(true, nil).Once() + mockDataStore.On("Exists", ctx, "40-49.xdr.zstd").Return(false, nil).Once() //"Data store is beyond non boundary aligned start ledger" - mockDataStore.On("Exists", ctx, "70-79.xdr.gz").Return(true, nil).Once() - mockDataStore.On("Exists", ctx, "80-89.xdr.gz").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "70-79.xdr.zstd").Return(true, nil).Once() + mockDataStore.On("Exists", ctx, "80-89.xdr.zstd").Return(false, nil).Once() //"Data store is beyond start and end ledger" - mockDataStore.On("Exists", ctx, "260-269.xdr.gz").Return(true, nil).Once() - mockDataStore.On("Exists", ctx, "270-279.xdr.gz").Return(true, nil).Once() + mockDataStore.On("Exists", ctx, "260-269.xdr.zstd").Return(true, nil).Once() + mockDataStore.On("Exists", ctx, "270-279.xdr.zstd").Return(true, nil).Once() //"Data store is not beyond start ledger" - mockDataStore.On("Exists", ctx, "110-119.xdr.gz").Return(false, nil).Once() - mockDataStore.On("Exists", ctx, "100-109.xdr.gz").Return(false, nil).Once() - mockDataStore.On("Exists", ctx, "90-99.xdr.gz").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "110-119.xdr.zstd").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "100-109.xdr.zstd").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "90-99.xdr.zstd").Return(false, nil).Once() //"No end ledger provided, data store not beyond start" uses latest from network="test2" - mockDataStore.On("Exists", ctx, "1630-1639.xdr.gz").Return(false, nil).Once() - mockDataStore.On("Exists", ctx, "1390-1399.xdr.gz").Return(false, nil).Once() - mockDataStore.On("Exists", ctx, "1260-1269.xdr.gz").Return(false, nil).Once() - mockDataStore.On("Exists", ctx, "1200-1209.xdr.gz").Return(false, nil).Once() - mockDataStore.On("Exists", ctx, "1160-1169.xdr.gz").Return(false, nil).Once() - mockDataStore.On("Exists", ctx, "1170-1179.xdr.gz").Return(false, nil).Once() - mockDataStore.On("Exists", ctx, "1150-1159.xdr.gz").Return(false, nil).Once() - mockDataStore.On("Exists", ctx, "1140-1149.xdr.gz").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "1630-1639.xdr.zstd").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "1390-1399.xdr.zstd").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "1260-1269.xdr.zstd").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "1200-1209.xdr.zstd").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "1160-1169.xdr.zstd").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "1170-1179.xdr.zstd").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "1150-1159.xdr.zstd").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "1140-1149.xdr.zstd").Return(false, nil).Once() //"No end ledger provided, data store is beyond start" uses latest from network="test3" - mockDataStore.On("Exists", ctx, "2630-2639.xdr.gz").Return(false, nil).Once() - mockDataStore.On("Exists", ctx, "2390-2399.xdr.gz").Return(false, nil).Once() - mockDataStore.On("Exists", ctx, "2260-2269.xdr.gz").Return(false, nil).Once() - mockDataStore.On("Exists", ctx, "2250-2259.xdr.gz").Return(false, nil).Once() - mockDataStore.On("Exists", ctx, "2240-2249.xdr.gz").Return(true, nil).Once() - mockDataStore.On("Exists", ctx, "2230-2239.xdr.gz").Return(true, nil).Once() - mockDataStore.On("Exists", ctx, "2200-2209.xdr.gz").Return(true, nil).Once() + mockDataStore.On("Exists", ctx, "2630-2639.xdr.zstd").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "2390-2399.xdr.zstd").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "2260-2269.xdr.zstd").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "2250-2259.xdr.zstd").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "2240-2249.xdr.zstd").Return(true, nil).Once() + mockDataStore.On("Exists", ctx, "2230-2239.xdr.zstd").Return(true, nil).Once() + mockDataStore.On("Exists", ctx, "2200-2209.xdr.zstd").Return(true, nil).Once() //"No end ledger provided, data store is beyond start and archive network latest, and partially into checkpoint frequency padding" uses latest from network="test4" - mockDataStore.On("Exists", ctx, "3630-3639.xdr.gz").Return(true, nil).Once() - mockDataStore.On("Exists", ctx, "3880-3889.xdr.gz").Return(true, nil).Once() - mockDataStore.On("Exists", ctx, "4000-4009.xdr.gz").Return(true, nil).Once() - mockDataStore.On("Exists", ctx, "4060-4069.xdr.gz").Return(true, nil).Once() - mockDataStore.On("Exists", ctx, "4090-4099.xdr.gz").Return(false, nil).Once() - mockDataStore.On("Exists", ctx, "4080-4089.xdr.gz").Return(false, nil).Once() - mockDataStore.On("Exists", ctx, "4070-4079.xdr.gz").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "3630-3639.xdr.zstd").Return(true, nil).Once() + mockDataStore.On("Exists", ctx, "3880-3889.xdr.zstd").Return(true, nil).Once() + mockDataStore.On("Exists", ctx, "4000-4009.xdr.zstd").Return(true, nil).Once() + mockDataStore.On("Exists", ctx, "4060-4069.xdr.zstd").Return(true, nil).Once() + mockDataStore.On("Exists", ctx, "4090-4099.xdr.zstd").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "4080-4089.xdr.zstd").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "4070-4079.xdr.zstd").Return(false, nil).Once() for _, tt := range tests { t.Run(tt.name, func(t *testing.T) {