diff --git a/exp/services/ledgerexporter/config.toml b/exp/services/ledgerexporter/config.toml index bc522ff6a8..a56087427c 100644 --- a/exp/services/ledgerexporter/config.toml +++ b/exp/services/ledgerexporter/config.toml @@ -9,7 +9,6 @@ destination_bucket_path = "exporter-test/ledgers" [exporter_config] ledgers_per_file = 1 files_per_partition = 64000 - file_suffix = ".xdr.gz" [stellar_core_config] stellar_core_binary_path = "/usr/local/bin/stellar-core" diff --git a/exp/services/ledgerexporter/internal/app_test.go b/exp/services/ledgerexporter/internal/app_test.go index e2db0e28d9..2063fd21a2 100644 --- a/exp/services/ledgerexporter/internal/app_test.go +++ b/exp/services/ledgerexporter/internal/app_test.go @@ -41,7 +41,7 @@ func TestApplyResumeInvalidDataStoreLedgersPerFileBoundary(t *testing.T) { StartLedger: 3, EndLedger: 9, Resume: true, - LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50, FileSuffix: ".xdr.gz"}, + LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50}, } mockResumableManager := &datastore.MockResumableManager{} // simulate the datastore has inconsistent data, @@ -61,7 +61,7 @@ func TestApplyResumeWithPartialRemoteDataPresent(t *testing.T) { StartLedger: 10, EndLedger: 99, Resume: true, - LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50, FileSuffix: ".xdr.gz"}, + LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50}, } mockResumableManager := &datastore.MockResumableManager{} // simulates a data store that had ledger files populated up to seq=49, so the first absent ledger would be 50 @@ -80,7 +80,7 @@ func TestApplyResumeWithNoRemoteDataPresent(t *testing.T) { StartLedger: 10, EndLedger: 99, Resume: true, - LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50, FileSuffix: ".xdr.gz"}, + LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50}, } mockResumableManager := &datastore.MockResumableManager{} // simulates a data store that had no data in the requested range @@ -102,7 +102,7 @@ func TestApplyResumeWithNoRemoteDataAndRequestFromGenesis(t *testing.T) { StartLedger: 2, EndLedger: 99, Resume: true, - LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50, FileSuffix: ".xdr.gz"}, + LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50}, } mockResumableManager := &datastore.MockResumableManager{} // simulates a data store that had no data in the requested range diff --git a/exp/services/ledgerexporter/internal/config_test.go b/exp/services/ledgerexporter/internal/config_test.go index 036c121455..86f6cfb5b3 100644 --- a/exp/services/ledgerexporter/internal/config_test.go +++ b/exp/services/ledgerexporter/internal/config_test.go @@ -22,7 +22,6 @@ func TestNewConfigResumeEnabled(t *testing.T) { require.Equal(t, config.DataStoreConfig.Type, "ABC") require.Equal(t, config.LedgerBatchConfig.FilesPerPartition, uint32(1)) require.Equal(t, config.LedgerBatchConfig.LedgersPerFile, uint32(3)) - require.Equal(t, config.LedgerBatchConfig.FileSuffix, ".xdr.gz") require.True(t, config.Resume) url, ok := config.DataStoreConfig.Params["destination_bucket_path"] require.True(t, ok) diff --git a/exp/services/ledgerexporter/internal/exportmanager_test.go b/exp/services/ledgerexporter/internal/exportmanager_test.go index 41a5e6acf8..f3b302b739 100644 --- a/exp/services/ledgerexporter/internal/exportmanager_test.go +++ b/exp/services/ledgerexporter/internal/exportmanager_test.go @@ -38,7 +38,7 @@ func (s *ExportManagerSuite) TearDownTest() { } func (s *ExportManagerSuite) TestInvalidExportConfig() { - config := datastore.LedgerBatchConfig{LedgersPerFile: 0, FilesPerPartition: 10, FileSuffix: ".xdr.gz"} + config := datastore.LedgerBatchConfig{LedgersPerFile: 0, FilesPerPartition: 10} registry := prometheus.NewRegistry() queue := NewUploadQueue(1, registry) _, err := NewExportManager(config, &s.mockBackend, queue, registry) @@ -46,7 +46,7 @@ func (s *ExportManagerSuite) TestInvalidExportConfig() { } func (s *ExportManagerSuite) TestRun() { - config := datastore.LedgerBatchConfig{LedgersPerFile: 64, FilesPerPartition: 10, FileSuffix: ".xdr.gz"} + config := datastore.LedgerBatchConfig{LedgersPerFile: 64, FilesPerPartition: 10} registry := prometheus.NewRegistry() queue := NewUploadQueue(1, registry) exporter, err := NewExportManager(config, &s.mockBackend, queue, registry) @@ -96,7 +96,7 @@ func (s *ExportManagerSuite) TestRun() { } func (s *ExportManagerSuite) TestRunContextCancel() { - config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 1, FileSuffix: ".xdr.gz"} + config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 1} registry := prometheus.NewRegistry() queue := NewUploadQueue(1, registry) exporter, err := NewExportManager(config, &s.mockBackend, queue, registry) @@ -125,7 +125,7 @@ func (s *ExportManagerSuite) TestRunContextCancel() { } func (s *ExportManagerSuite) TestRunWithCanceledContext() { - config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 10, FileSuffix: ".xdr.gz"} + config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 10} registry := prometheus.NewRegistry() queue := NewUploadQueue(1, registry) exporter, err := NewExportManager(config, &s.mockBackend, queue, registry) @@ -138,7 +138,7 @@ func (s *ExportManagerSuite) TestRunWithCanceledContext() { } func (s *ExportManagerSuite) TestAddLedgerCloseMeta() { - config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 10, FileSuffix: ".xdr.gz"} + config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 10} registry := prometheus.NewRegistry() queue := NewUploadQueue(1, registry) exporter, err := NewExportManager(config, &s.mockBackend, queue, registry) @@ -176,7 +176,7 @@ func (s *ExportManagerSuite) TestAddLedgerCloseMeta() { } func (s *ExportManagerSuite) TestAddLedgerCloseMetaContextCancel() { - config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 10, FileSuffix: ".xdr.gz"} + config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 10} registry := prometheus.NewRegistry() queue := NewUploadQueue(1, registry) exporter, err := NewExportManager(config, &s.mockBackend, queue, registry) @@ -194,7 +194,7 @@ func (s *ExportManagerSuite) TestAddLedgerCloseMetaContextCancel() { } func (s *ExportManagerSuite) TestAddLedgerCloseMetaKeyMismatch() { - config := datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 1, FileSuffix: ".xdr.gz"} + config := datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 1} registry := prometheus.NewRegistry() queue := NewUploadQueue(1, registry) exporter, err := NewExportManager(config, &s.mockBackend, queue, registry) diff --git a/exp/services/ledgerexporter/internal/uploader.go b/exp/services/ledgerexporter/internal/uploader.go index ca07965b85..c1f80d6416 100644 --- a/exp/services/ledgerexporter/internal/uploader.go +++ b/exp/services/ledgerexporter/internal/uploader.go @@ -83,12 +83,7 @@ func (u Uploader) Upload(ctx context.Context, metaArchive *datastore.LedgerMetaA startTime := time.Now() numLedgers := strconv.FormatUint(uint64(metaArchive.GetLedgerCount()), 10) - // TODO: Add compression config and optimize best compression algorithm - // JIRA https://stellarorg.atlassian.net/browse/HUBBLE-368 - xdrEncoder, err := compressxdr.NewXDREncoder(compressxdr.GZIP, &metaArchive.Data) - if err != nil { - return err - } + xdrEncoder := compressxdr.NewXDREncoder(compressxdr.DefaultCompressor, &metaArchive.Data) writerTo := &writerToRecorder{ WriterTo: xdrEncoder, @@ -109,7 +104,7 @@ func (u Uploader) Upload(ctx context.Context, metaArchive *datastore.LedgerMetaA "already_exists": alreadyExists, }).Observe(float64(writerTo.totalUncompressed)) u.objectSizeMetrics.With(prometheus.Labels{ - "compression": compressxdr.GZIP, + "compression": xdrEncoder.Compressor.Name(), "ledgers": numLedgers, "already_exists": alreadyExists, }).Observe(float64(writerTo.totalCompressed)) diff --git a/exp/services/ledgerexporter/internal/uploader_test.go b/exp/services/ledgerexporter/internal/uploader_test.go index ee39245486..da0cc6d2ad 100644 --- a/exp/services/ledgerexporter/internal/uploader_test.go +++ b/exp/services/ledgerexporter/internal/uploader_test.go @@ -62,11 +62,10 @@ func (s *UploaderSuite) testUpload(putOkReturnVal bool) { expectedCompressedLength := capturedBuf.Len() var decodedArchive datastore.LedgerMetaArchive - xdrDecoder, err := compressxdr.NewXDRDecoder(compressxdr.GZIP, &decodedArchive.Data) - require.NoError(s.T(), err) + xdrDecoder := compressxdr.NewXDRDecoder(compressxdr.DefaultCompressor, &decodedArchive.Data) decoder := xdrDecoder - _, err = decoder.ReadFrom(&capturedBuf) + _, err := decoder.ReadFrom(&capturedBuf) require.NoError(s.T(), err) // require that the decoded data matches the original test data @@ -98,7 +97,7 @@ func (s *UploaderSuite) testUpload(putOkReturnVal bool) { metric, err = dataUploader.objectSizeMetrics.MetricVec.GetMetricWith(prometheus.Labels{ "ledgers": "100", - "compression": compressxdr.GZIP, + "compression": decoder.Compressor.Name(), "already_exists": strconv.FormatBool(alreadyExists), }) require.NoError(s.T(), err) @@ -114,7 +113,7 @@ func (s *UploaderSuite) testUpload(putOkReturnVal bool) { ) metric, err = dataUploader.objectSizeMetrics.MetricVec.GetMetricWith(prometheus.Labels{ "ledgers": "100", - "compression": compressxdr.GZIP, + "compression": decoder.Compressor.Name(), "already_exists": strconv.FormatBool(!alreadyExists), }) require.NoError(s.T(), err) @@ -185,7 +184,7 @@ func (s *UploaderSuite) testUploadPutError(putOkReturnVal bool) { getMetricValue(metric).GetSummary().GetSampleCount(), ) - for _, compression := range []string{compressxdr.GZIP, "none"} { + for _, compression := range []string{compressxdr.DefaultCompressor.Name(), "none"} { metric, err = dataUploader.objectSizeMetrics.MetricVec.GetMetricWith(prometheus.Labels{ "ledgers": "100", "compression": compression, diff --git a/go.mod b/go.mod index bb6a691f40..4f2e0fd865 100644 --- a/go.mod +++ b/go.mod @@ -122,7 +122,7 @@ require ( github.com/imkira/go-interpol v1.1.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect - github.com/klauspost/compress v1.17.0 // indirect + github.com/klauspost/compress v1.17.0 github.com/kr/text v0.2.0 // indirect github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect diff --git a/ingest/ledgerbackend/buffered_storage_backend.go b/ingest/ledgerbackend/buffered_storage_backend.go index b28e68b63c..c3c1007e87 100644 --- a/ingest/ledgerbackend/buffered_storage_backend.go +++ b/ingest/ledgerbackend/buffered_storage_backend.go @@ -10,7 +10,6 @@ import ( "github.com/pkg/errors" - "github.com/stellar/go/support/compressxdr" "github.com/stellar/go/support/datastore" "github.com/stellar/go/xdr" ) @@ -20,7 +19,6 @@ var _ LedgerBackend = (*BufferedStorageBackend)(nil) type BufferedStorageBackendConfig struct { LedgerBatchConfig datastore.LedgerBatchConfig - CompressionType string DataStore datastore.DataStore BufferSize uint32 NumWorkers uint32 @@ -42,7 +40,6 @@ type BufferedStorageBackend struct { prepared *Range // Non-nil if any range is prepared closed bool // False until the core is closed ledgerMetaArchive *datastore.LedgerMetaArchive - decoder compressxdr.XDRDecoder nextLedger uint32 lastLedger uint32 } @@ -65,25 +62,12 @@ func NewBufferedStorageBackend(ctx context.Context, config BufferedStorageBacken return nil, errors.New("ledgersPerFile must be > 0") } - if config.LedgerBatchConfig.FileSuffix == "" { - return nil, errors.New("no file suffix provided in LedgerBatchConfig") - } - - if config.CompressionType == "" { - return nil, errors.New("no compression type provided in config") - } - ledgerMetaArchive := datastore.NewLedgerMetaArchive("", 0, 0) - decoder, err := compressxdr.NewXDRDecoder(config.CompressionType, nil) - if err != nil { - return nil, err - } bsBackend := &BufferedStorageBackend{ config: config, dataStore: config.DataStore, ledgerMetaArchive: ledgerMetaArchive, - decoder: decoder, } return bsBackend, nil @@ -125,17 +109,11 @@ func (bsb *BufferedStorageBackend) getBatchForSequence(ctx context.Context, sequ } // Sequence is beyond the current LedgerCloseMetaBatch - lcmBatchBinary, err := bsb.ledgerBuffer.getFromLedgerQueue(ctx) + var err error + bsb.ledgerMetaArchive.Data, err = bsb.ledgerBuffer.getFromLedgerQueue(ctx) if err != nil { return errors.Wrap(err, "failed getting next ledger batch from queue") } - - // Turn binary into xdr - err = bsb.ledgerMetaArchive.Data.UnmarshalBinary(lcmBatchBinary) - if err != nil { - return errors.Wrap(err, "failed unmarshalling lcmBatchBinary") - } - return nil } diff --git a/ingest/ledgerbackend/buffered_storage_backend_test.go b/ingest/ledgerbackend/buffered_storage_backend_test.go index eb2e355765..063e234c31 100644 --- a/ingest/ledgerbackend/buffered_storage_backend_test.go +++ b/ingest/ledgerbackend/buffered_storage_backend_test.go @@ -29,14 +29,12 @@ func createBufferedStorageBackendConfigForTesting() BufferedStorageBackendConfig ledgerBatchConfig := datastore.LedgerBatchConfig{ LedgersPerFile: 1, FilesPerPartition: 64000, - FileSuffix: ".xdr.gz", } dataStore := new(datastore.MockDataStore) return BufferedStorageBackendConfig{ LedgerBatchConfig: ledgerBatchConfig, - CompressionType: compressxdr.GZIP, DataStore: dataStore, BufferSize: 100, NumWorkers: 5, @@ -48,13 +46,11 @@ func createBufferedStorageBackendConfigForTesting() BufferedStorageBackendConfig func createBufferedStorageBackendForTesting() BufferedStorageBackend { config := createBufferedStorageBackendConfigForTesting() ledgerMetaArchive := datastore.NewLedgerMetaArchive("", 0, 0) - decoder, _ := compressxdr.NewXDRDecoder(config.CompressionType, nil) return BufferedStorageBackend{ config: config, dataStore: config.DataStore, ledgerMetaArchive: ledgerMetaArchive, - decoder: decoder, } } @@ -67,10 +63,10 @@ func createMockdataStore(t *testing.T, start, end, partitionSize, count uint32) if count > 1 { endFileSeq := i + count - 1 readCloser = createLCMBatchReader(i, endFileSeq, count) - objectName = fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d-%d.xdr.gz", partition, math.MaxUint32-i, i, endFileSeq) + objectName = fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d-%d.xdr.zstd", partition, math.MaxUint32-i, i, endFileSeq) } else { readCloser = createLCMBatchReader(i, i, count) - objectName = fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.gz", partition, math.MaxUint32-i, i) + objectName = fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.zstd", partition, math.MaxUint32-i, i) } mockDataStore.On("GetFile", mock.Anything, objectName).Return(readCloser, nil) } @@ -105,7 +101,7 @@ func createTestLedgerCloseMetaBatch(startSeq, endSeq, count uint32) xdr.LedgerCl func createLCMBatchReader(start, end, count uint32) io.ReadCloser { testData := createTestLedgerCloseMetaBatch(start, end, count) - encoder, _ := compressxdr.NewXDREncoder(compressxdr.GZIP, testData) + encoder := compressxdr.NewXDREncoder(compressxdr.DefaultCompressor, testData) var buf bytes.Buffer encoder.WriteTo(&buf) capturedBuf := buf.Bytes() @@ -121,7 +117,6 @@ func TestNewBufferedStorageBackend(t *testing.T) { assert.NoError(t, err) assert.Equal(t, bsb.dataStore, config.DataStore) - assert.Equal(t, ".xdr.gz", bsb.config.LedgerBatchConfig.FileSuffix) assert.Equal(t, uint32(1), bsb.config.LedgerBatchConfig.LedgersPerFile) assert.Equal(t, uint32(64000), bsb.config.LedgerBatchConfig.FilesPerPartition) assert.Equal(t, uint32(100), bsb.config.BufferSize) @@ -441,7 +436,7 @@ func TestLedgerBufferClose(t *testing.T) { mockDataStore := new(datastore.MockDataStore) partition := ledgerPerFileCount*partitionSize - 1 - objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.gz", partition, math.MaxUint32-3, 3) + objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.zstd", partition, math.MaxUint32-3, 3) afterPrepareRange := make(chan struct{}) mockDataStore.On("GetFile", mock.Anything, objectName).Return(io.NopCloser(&bytes.Buffer{}), context.Canceled).Run(func(args mock.Arguments) { <-afterPrepareRange @@ -473,7 +468,7 @@ func TestLedgerBufferBoundedObjectNotFound(t *testing.T) { mockDataStore := new(datastore.MockDataStore) partition := ledgerPerFileCount*partitionSize - 1 - objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.gz", partition, math.MaxUint32-3, 3) + objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.zstd", partition, math.MaxUint32-3, 3) mockDataStore.On("GetFile", mock.Anything, objectName).Return(io.NopCloser(&bytes.Buffer{}), os.ErrNotExist).Once() t.Cleanup(func() { mockDataStore.AssertExpectations(t) @@ -499,7 +494,7 @@ func TestLedgerBufferUnboundedObjectNotFound(t *testing.T) { mockDataStore := new(datastore.MockDataStore) partition := ledgerPerFileCount*partitionSize - 1 - objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.gz", partition, math.MaxUint32-3, 3) + objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.zstd", partition, math.MaxUint32-3, 3) iteration := &atomic.Int32{} cancelAfter := int32(bsb.config.RetryLimit) + 2 mockDataStore.On("GetFile", mock.Anything, objectName).Return(io.NopCloser(&bytes.Buffer{}), os.ErrNotExist).Run(func(args mock.Arguments) { @@ -531,7 +526,7 @@ func TestLedgerBufferRetryLimit(t *testing.T) { mockDataStore := new(datastore.MockDataStore) partition := ledgerPerFileCount*partitionSize - 1 - objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.gz", partition, math.MaxUint32-3, 3) + objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.zstd", partition, math.MaxUint32-3, 3) mockDataStore.On("GetFile", mock.Anything, objectName). Return(io.NopCloser(&bytes.Buffer{}), fmt.Errorf("transient error")). Times(int(bsb.config.RetryLimit) + 1) diff --git a/ingest/ledgerbackend/ledger_buffer.go b/ingest/ledgerbackend/ledger_buffer.go index a26aab8ba2..5b2ec57ffc 100644 --- a/ingest/ledgerbackend/ledger_buffer.go +++ b/ingest/ledgerbackend/ledger_buffer.go @@ -13,6 +13,7 @@ import ( "github.com/stellar/go/support/collections/heap" "github.com/stellar/go/support/compressxdr" "github.com/stellar/go/support/datastore" + "github.com/stellar/go/xdr" ) type ledgerBatchObject struct { @@ -24,7 +25,6 @@ type ledgerBuffer struct { // Passed through from BufferedStorageBackend to control lifetime of ledgerBuffer instance config BufferedStorageBackendConfig dataStore datastore.DataStore - decoder compressxdr.XDRDecoder // context used to cancel workers within the ledgerBuffer context context.Context @@ -67,7 +67,6 @@ func (bsb *BufferedStorageBackend) newLedgerBuffer(ledgerRange Range) (*ledgerBu ledgerRange: ledgerRange, context: ctx, cancel: cancel, - decoder: bsb.decoder, } // Start workers to read LCM files @@ -203,13 +202,13 @@ func (lb *ledgerBuffer) storeObject(ledgerObject []byte, sequence uint32) { } } -func (lb *ledgerBuffer) getFromLedgerQueue(ctx context.Context) ([]byte, error) { +func (lb *ledgerBuffer) getFromLedgerQueue(ctx context.Context) (xdr.LedgerCloseMetaBatch, error) { for { select { case <-lb.context.Done(): - return nil, context.Cause(lb.context) + return xdr.LedgerCloseMetaBatch{}, context.Cause(lb.context) case <-ctx.Done(): - return nil, ctx.Err() + return xdr.LedgerCloseMetaBatch{}, ctx.Err() case compressedBinary := <-lb.ledgerQueue: // The ledger buffer invariant is maintained here because // we create an extra task when consuming one item from the ledger queue. @@ -218,13 +217,14 @@ func (lb *ledgerBuffer) getFromLedgerQueue(ctx context.Context) ([]byte, error) // len(taskQueue) + len(ledgerQueue) + ledgerPriorityQueue.Len() <= bsb.config.BufferSize lb.pushTaskQueue() - reader := bytes.NewReader(compressedBinary) - lcmBinary, err := lb.decoder.Unzip(reader) + lcmBatch := xdr.LedgerCloseMetaBatch{} + decoder := compressxdr.NewXDRDecoder(compressxdr.DefaultCompressor, &lcmBatch) + _, err := decoder.ReadFrom(bytes.NewReader(compressedBinary)) if err != nil { - return nil, err + return xdr.LedgerCloseMetaBatch{}, err } - return lcmBinary, nil + return lcmBatch, nil } } } diff --git a/support/compressxdr/compress_xdr.go b/support/compressxdr/compress_xdr.go index c43a1bca77..514251f971 100644 --- a/support/compressxdr/compress_xdr.go +++ b/support/compressxdr/compress_xdr.go @@ -3,36 +3,50 @@ package compressxdr import ( "io" - "github.com/stellar/go/support/errors" + xdr3 "github.com/stellar/go-xdr/xdr3" ) -const ( - GZIP = "gzip" -) +func NewXDREncoder(compressor Compressor, xdrPayload interface{}) XDREncoder { + return XDREncoder{Compressor: compressor, XdrPayload: xdrPayload} +} + +func NewXDRDecoder(compressor Compressor, xdrPayload interface{}) XDRDecoder { + return XDRDecoder{Compressor: compressor, XdrPayload: xdrPayload} -type XDREncoder interface { - WriteTo(w io.Writer) (int64, error) } -type XDRDecoder interface { - ReadFrom(r io.Reader) (int64, error) - Unzip(r io.Reader) ([]byte, error) +// XDREncoder combines compression with XDR encoding +type XDREncoder struct { + Compressor Compressor + XdrPayload interface{} } -func NewXDREncoder(compressionType string, xdrPayload interface{}) (XDREncoder, error) { - switch compressionType { - case GZIP: - return &XDRGzipEncoder{XdrPayload: xdrPayload}, nil - default: - return nil, errors.Errorf("invalid compression type %s, not supported", compressionType) +// WriteTo writes the XDR compressed encoded data +func (e XDREncoder) WriteTo(w io.Writer) (int64, error) { + zw, err := e.Compressor.NewWriter(w) + if err != nil { + return 0, err } + defer zw.Close() + + n, err := xdr3.Marshal(zw, e.XdrPayload) + return int64(n), err } -func NewXDRDecoder(compressionType string, xdrPayload interface{}) (XDRDecoder, error) { - switch compressionType { - case GZIP: - return &XDRGzipDecoder{XdrPayload: xdrPayload}, nil - default: - return nil, errors.Errorf("invalid compression type %s, not supported", compressionType) +// XDRDecoder combines decompression with XDR decoding +type XDRDecoder struct { + Compressor Compressor + XdrPayload interface{} +} + +// ReadFrom reads XDR compressed encoded data +func (d XDRDecoder) ReadFrom(r io.Reader) (int64, error) { + zr, err := d.Compressor.NewReader(r) + if err != nil { + return 0, err } + defer zr.Close() + + n, err := xdr3.Unmarshal(zr, d.XdrPayload) + return int64(n), err } diff --git a/support/compressxdr/compress_xdr_test.go b/support/compressxdr/compress_xdr_test.go index da94ab25ae..82aa7e186c 100644 --- a/support/compressxdr/compress_xdr_test.go +++ b/support/compressxdr/compress_xdr_test.go @@ -20,21 +20,19 @@ func createTestLedgerCloseMetaBatch(startSeq, endSeq uint32, count int) xdr.Ledg } } -func TestEncodeDecodeLedgerCloseMetaBatchGzip(t *testing.T) { +func TestEncodeDecodeLedgerCloseMetaBatch(t *testing.T) { testData := createTestLedgerCloseMetaBatch(1000, 1005, 6) // Encode the test data - encoder, err := NewXDREncoder(GZIP, testData) - require.NoError(t, err) + encoder := NewXDREncoder(DefaultCompressor, testData) var buf bytes.Buffer - _, err = encoder.WriteTo(&buf) + _, err := encoder.WriteTo(&buf) require.NoError(t, err) // Decode the encoded data lcmBatch := xdr.LedgerCloseMetaBatch{} - decoder, err := NewXDRDecoder(GZIP, &lcmBatch) - require.NoError(t, err) + decoder := NewXDRDecoder(DefaultCompressor, &lcmBatch) _, err = decoder.ReadFrom(&buf) require.NoError(t, err) @@ -48,26 +46,3 @@ func TestEncodeDecodeLedgerCloseMetaBatchGzip(t *testing.T) { require.Equal(t, testData.LedgerCloseMetas[i], decodedData.LedgerCloseMetas[i]) } } - -func TestDecodeUnzipGzip(t *testing.T) { - expectedBinary := []byte{0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0} - testData := createTestLedgerCloseMetaBatch(2, 2, 1) - - // Encode the test data - encoder, err := NewXDREncoder(GZIP, testData) - require.NoError(t, err) - - var buf bytes.Buffer - _, err = encoder.WriteTo(&buf) - require.NoError(t, err) - - // Decode the encoded data - lcmBatch := xdr.LedgerCloseMetaBatch{} - decoder, err := NewXDRDecoder(GZIP, &lcmBatch) - require.NoError(t, err) - - binary, err := decoder.Unzip(&buf) - require.NoError(t, err) - - require.Equal(t, expectedBinary, binary) -} diff --git a/support/compressxdr/compressor.go b/support/compressxdr/compressor.go new file mode 100644 index 0000000000..946e1b89f0 --- /dev/null +++ b/support/compressxdr/compressor.go @@ -0,0 +1,38 @@ +package compressxdr + +import ( + "io" + + "github.com/klauspost/compress/zstd" +) + +var DefaultCompressor = &ZstdCompressor{} + +// Compressor represents a compression algorithm. +type Compressor interface { + NewWriter(w io.Writer) (io.WriteCloser, error) + NewReader(r io.Reader) (io.ReadCloser, error) + Name() string +} + +// ZstdCompressor is an implementation of the Compressor interface for Zstd compression. +type ZstdCompressor struct{} + +// GetName returns the name of the compression algorithm. +func (z ZstdCompressor) Name() string { + return "zstd" +} + +// NewWriter creates a new Zstd writer. +func (z ZstdCompressor) NewWriter(w io.Writer) (io.WriteCloser, error) { + return zstd.NewWriter(w) +} + +// NewReader creates a new Zstd reader. +func (z ZstdCompressor) NewReader(r io.Reader) (io.ReadCloser, error) { + zr, err := zstd.NewReader(r) + if err != nil { + return nil, err + } + return zr.IOReadCloser(), err +} diff --git a/support/compressxdr/gzip_compress_xdr.go b/support/compressxdr/gzip_compress_xdr.go deleted file mode 100644 index 9171b87ffd..0000000000 --- a/support/compressxdr/gzip_compress_xdr.go +++ /dev/null @@ -1,55 +0,0 @@ -package compressxdr - -import ( - "compress/gzip" - "io" - - xdr3 "github.com/stellar/go-xdr/xdr3" -) - -type XDRGzipEncoder struct { - XdrPayload interface{} -} - -func (g *XDRGzipEncoder) WriteTo(w io.Writer) (int64, error) { - gw := gzip.NewWriter(w) - n, err := xdr3.Marshal(gw, g.XdrPayload) - if err != nil { - return int64(n), err - } - return int64(n), gw.Close() -} - -type XDRGzipDecoder struct { - XdrPayload interface{} -} - -func (d *XDRGzipDecoder) ReadFrom(r io.Reader) (int64, error) { - gr, err := gzip.NewReader(r) - if err != nil { - return 0, err - } - defer gr.Close() - - n, err := xdr3.Unmarshal(gr, d.XdrPayload) - if err != nil { - return int64(n), err - } - return int64(n), nil -} - -func (d *XDRGzipDecoder) Unzip(r io.Reader) ([]byte, error) { - gzipReader, err := gzip.NewReader(r) - if err != nil { - return nil, err - } - - defer gzipReader.Close() - - objectBytes, err := io.ReadAll(gzipReader) - if err != nil { - return nil, err - } - - return objectBytes, nil -} diff --git a/support/compressxdr/mocks.go b/support/compressxdr/mocks.go index 2525ca4477..c6d6909f43 100644 --- a/support/compressxdr/mocks.go +++ b/support/compressxdr/mocks.go @@ -19,5 +19,3 @@ func (m *MockXDRDecoder) Unzip(r io.Reader) ([]byte, error) { args := m.Called(r) return args.Get(0).([]byte), args.Error(1) } - -var _ XDRDecoder = &MockXDRDecoder{} diff --git a/support/datastore/ledgerbatch_config.go b/support/datastore/ledgerbatch_config.go index cd3a0f45bf..4ebac0d110 100644 --- a/support/datastore/ledgerbatch_config.go +++ b/support/datastore/ledgerbatch_config.go @@ -3,12 +3,13 @@ package datastore import ( "fmt" "math" + + "github.com/stellar/go/support/compressxdr" ) type LedgerBatchConfig struct { LedgersPerFile uint32 `toml:"ledgers_per_file"` FilesPerPartition uint32 `toml:"files_per_partition"` - FileSuffix string `toml:"file_suffix"` } func (ec LedgerBatchConfig) GetSequenceNumberStartBoundary(ledgerSeq uint32) uint32 { @@ -42,7 +43,7 @@ func (ec LedgerBatchConfig) GetObjectKeyFromSequenceNumber(ledgerSeq uint32) str if fileStart != fileEnd { objectKey += fmt.Sprintf("-%d", fileEnd) } - objectKey += ec.FileSuffix + objectKey += fmt.Sprintf(".xdr.%s", compressxdr.DefaultCompressor.Name()) return objectKey } diff --git a/support/datastore/ledgerbatch_config_test.go b/support/datastore/ledgerbatch_config_test.go index 255b5ed070..bd79dd265d 100644 --- a/support/datastore/ledgerbatch_config_test.go +++ b/support/datastore/ledgerbatch_config_test.go @@ -15,24 +15,23 @@ func TestGetObjectKeyFromSequenceNumber(t *testing.T) { filesPerPartition uint32 ledgerSeq uint32 ledgersPerFile uint32 - fileSuffix string expectedKey string }{ - {0, 5, 1, ".xdr.gz", "FFFFFFFA--5.xdr.gz"}, - {0, 5, 10, ".xdr.gz", "FFFFFFFF--0-9.xdr.gz"}, - {2, 10, 100, ".xdr.gz", "FFFFFFFF--0-199/FFFFFFFF--0-99.xdr.gz"}, - {2, 150, 50, ".xdr.gz", "FFFFFF9B--100-199/FFFFFF69--150-199.xdr.gz"}, - {2, 300, 200, ".xdr.gz", "FFFFFFFF--0-399/FFFFFF37--200-399.xdr.gz"}, - {2, 1, 1, ".xdr.gz", "FFFFFFFF--0-1/FFFFFFFE--1.xdr.gz"}, - {4, 10, 100, ".xdr.gz", "FFFFFFFF--0-399/FFFFFFFF--0-99.xdr.gz"}, - {4, 250, 50, ".xdr.gz", "FFFFFF37--200-399/FFFFFF05--250-299.xdr.gz"}, - {1, 300, 200, ".xdr.gz", "FFFFFF37--200-399.xdr.gz"}, - {1, 1, 1, ".xdr.gz", "FFFFFFFE--1.xdr.gz"}, + {0, 5, 1, "FFFFFFFA--5.xdr.zstd"}, + {0, 5, 10, "FFFFFFFF--0-9.xdr.zstd"}, + {2, 10, 100, "FFFFFFFF--0-199/FFFFFFFF--0-99.xdr.zstd"}, + {2, 150, 50, "FFFFFF9B--100-199/FFFFFF69--150-199.xdr.zstd"}, + {2, 300, 200, "FFFFFFFF--0-399/FFFFFF37--200-399.xdr.zstd"}, + {2, 1, 1, "FFFFFFFF--0-1/FFFFFFFE--1.xdr.zstd"}, + {4, 10, 100, "FFFFFFFF--0-399/FFFFFFFF--0-99.xdr.zstd"}, + {4, 250, 50, "FFFFFF37--200-399/FFFFFF05--250-299.xdr.zstd"}, + {1, 300, 200, "FFFFFF37--200-399.xdr.zstd"}, + {1, 1, 1, "FFFFFFFE--1.xdr.zstd"}, } for _, tc := range testCases { t.Run(fmt.Sprintf("LedgerSeq-%d-LedgersPerFile-%d", tc.ledgerSeq, tc.ledgersPerFile), func(t *testing.T) { - config := LedgerBatchConfig{FilesPerPartition: tc.filesPerPartition, LedgersPerFile: tc.ledgersPerFile, FileSuffix: tc.fileSuffix} + config := LedgerBatchConfig{FilesPerPartition: tc.filesPerPartition, LedgersPerFile: tc.ledgersPerFile} key := config.GetObjectKeyFromSequenceNumber(tc.ledgerSeq) require.Equal(t, tc.expectedKey, key) }) @@ -43,7 +42,6 @@ func TestGetObjectKeyFromSequenceNumber_ObjectKeyDescOrder(t *testing.T) { config := LedgerBatchConfig{ LedgersPerFile: 1, FilesPerPartition: 10, - FileSuffix: ".xdr.gz", } sequenceCount := 10000 sequenceMap := make(map[uint32]string) diff --git a/support/datastore/resumablemanager_test.go b/support/datastore/resumablemanager_test.go index 726854164f..05416658a8 100644 --- a/support/datastore/resumablemanager_test.go +++ b/support/datastore/resumablemanager_test.go @@ -32,7 +32,6 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), - FileSuffix: ".xdr.gz", }, networkName: "test", errorSnippet: "archive error", @@ -47,7 +46,6 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), - FileSuffix: ".xdr.gz", }, networkName: "test", }, @@ -60,7 +58,6 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), - FileSuffix: ".xdr.gz", }, networkName: "test", }, @@ -73,7 +70,6 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), - FileSuffix: ".xdr.gz", }, networkName: "test", errorSnippet: "datastore error happened", @@ -87,7 +83,6 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), - FileSuffix: ".xdr.gz", }, networkName: "test", }, @@ -100,7 +95,6 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), - FileSuffix: ".xdr.gz", }, networkName: "test", }, @@ -113,7 +107,6 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), - FileSuffix: ".xdr.gz", }, networkName: "test", }, @@ -126,7 +119,6 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), - FileSuffix: ".xdr.gz", }, networkName: "test", }, @@ -139,7 +131,6 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), - FileSuffix: ".xdr.gz", }, networkName: "test", errorSnippet: "Invalid start value", @@ -153,7 +144,6 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), - FileSuffix: ".xdr.gz", }, networkName: "test2", latestLedger: uint32(2000), @@ -167,7 +157,6 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), - FileSuffix: ".xdr.gz", }, networkName: "test3", latestLedger: uint32(3000), @@ -181,7 +170,6 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), - FileSuffix: ".xdr.gz", }, networkName: "test4", latestLedger: uint32(4000), @@ -195,7 +183,6 @@ func TestResumability(t *testing.T) { ledgerBatchConfig: LedgerBatchConfig{ FilesPerPartition: uint32(1), LedgersPerFile: uint32(10), - FileSuffix: ".xdr.gz", }, networkName: "test5", latestLedger: uint32(5000), @@ -208,58 +195,58 @@ func TestResumability(t *testing.T) { mockDataStore := &MockDataStore{} //"End ledger same as start, data store has it" - mockDataStore.On("Exists", ctx, "FFFFFFFF--0-9.xdr.gz").Return(true, nil).Once() + mockDataStore.On("Exists", ctx, "FFFFFFFF--0-9.xdr.zstd").Return(true, nil).Once() //"End ledger same as start, data store does not have it" - mockDataStore.On("Exists", ctx, "FFFFFFF5--10-19.xdr.gz").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "FFFFFFF5--10-19.xdr.zstd").Return(false, nil).Once() //"binary search encounters an error during datastore retrieval", - mockDataStore.On("Exists", ctx, "FFFFFFEB--20-29.xdr.gz").Return(false, errors.New("datastore error happened")).Once() + mockDataStore.On("Exists", ctx, "FFFFFFEB--20-29.xdr.zstd").Return(false, errors.New("datastore error happened")).Once() //"Data store is beyond boundary aligned start ledger" - mockDataStore.On("Exists", ctx, "FFFFFFE1--30-39.xdr.gz").Return(true, nil).Once() - mockDataStore.On("Exists", ctx, "FFFFFFD7--40-49.xdr.gz").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "FFFFFFE1--30-39.xdr.zstd").Return(true, nil).Once() + mockDataStore.On("Exists", ctx, "FFFFFFD7--40-49.xdr.zstd").Return(false, nil).Once() //"Data store is beyond non boundary aligned start ledger" - mockDataStore.On("Exists", ctx, "FFFFFFB9--70-79.xdr.gz").Return(true, nil).Once() - mockDataStore.On("Exists", ctx, "FFFFFFAF--80-89.xdr.gz").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "FFFFFFB9--70-79.xdr.zstd").Return(true, nil).Once() + mockDataStore.On("Exists", ctx, "FFFFFFAF--80-89.xdr.zstd").Return(false, nil).Once() //"Data store is beyond start and end ledger" - mockDataStore.On("Exists", ctx, "FFFFFEFB--260-269.xdr.gz").Return(true, nil).Once() - mockDataStore.On("Exists", ctx, "FFFFFEF1--270-279.xdr.gz").Return(true, nil).Once() + mockDataStore.On("Exists", ctx, "FFFFFEFB--260-269.xdr.zstd").Return(true, nil).Once() + mockDataStore.On("Exists", ctx, "FFFFFEF1--270-279.xdr.zstd").Return(true, nil).Once() //"Data store is not beyond start ledger" - mockDataStore.On("Exists", ctx, "FFFFFF91--110-119.xdr.gz").Return(false, nil).Once() - mockDataStore.On("Exists", ctx, "FFFFFF9B--100-109.xdr.gz").Return(false, nil).Once() - mockDataStore.On("Exists", ctx, "FFFFFFA5--90-99.xdr.gz").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "FFFFFF91--110-119.xdr.zstd").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "FFFFFF9B--100-109.xdr.zstd").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "FFFFFFA5--90-99.xdr.zstd").Return(false, nil).Once() //"No end ledger provided, data store not beyond start" uses latest from network="test2" - mockDataStore.On("Exists", ctx, "FFFFF9A1--1630-1639.xdr.gz").Return(false, nil).Once() - mockDataStore.On("Exists", ctx, "FFFFFA91--1390-1399.xdr.gz").Return(false, nil).Once() - mockDataStore.On("Exists", ctx, "FFFFFB13--1260-1269.xdr.gz").Return(false, nil).Once() - mockDataStore.On("Exists", ctx, "FFFFFB4F--1200-1209.xdr.gz").Return(false, nil).Once() - mockDataStore.On("Exists", ctx, "FFFFFB77--1160-1169.xdr.gz").Return(false, nil).Once() - mockDataStore.On("Exists", ctx, "FFFFFB6D--1170-1179.xdr.gz").Return(false, nil).Once() - mockDataStore.On("Exists", ctx, "FFFFFB81--1150-1159.xdr.gz").Return(false, nil).Once() - mockDataStore.On("Exists", ctx, "FFFFFB8B--1140-1149.xdr.gz").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "FFFFF9A1--1630-1639.xdr.zstd").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "FFFFFA91--1390-1399.xdr.zstd").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "FFFFFB13--1260-1269.xdr.zstd").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "FFFFFB4F--1200-1209.xdr.zstd").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "FFFFFB77--1160-1169.xdr.zstd").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "FFFFFB6D--1170-1179.xdr.zstd").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "FFFFFB81--1150-1159.xdr.zstd").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "FFFFFB8B--1140-1149.xdr.zstd").Return(false, nil).Once() //"No end ledger provided, data store is beyond start" uses latest from network="test3" - mockDataStore.On("Exists", ctx, "FFFFF5B9--2630-2639.xdr.gz").Return(false, nil).Once() - mockDataStore.On("Exists", ctx, "FFFFF6A9--2390-2399.xdr.gz").Return(false, nil).Once() - mockDataStore.On("Exists", ctx, "FFFFF72B--2260-2269.xdr.gz").Return(false, nil).Once() - mockDataStore.On("Exists", ctx, "FFFFF735--2250-2259.xdr.gz").Return(false, nil).Once() - mockDataStore.On("Exists", ctx, "FFFFF73F--2240-2249.xdr.gz").Return(true, nil).Once() - mockDataStore.On("Exists", ctx, "FFFFF749--2230-2239.xdr.gz").Return(true, nil).Once() - mockDataStore.On("Exists", ctx, "FFFFF767--2200-2209.xdr.gz").Return(true, nil).Once() + mockDataStore.On("Exists", ctx, "FFFFF5B9--2630-2639.xdr.zstd").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "FFFFF6A9--2390-2399.xdr.zstd").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "FFFFF72B--2260-2269.xdr.zstd").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "FFFFF735--2250-2259.xdr.zstd").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "FFFFF73F--2240-2249.xdr.zstd").Return(true, nil).Once() + mockDataStore.On("Exists", ctx, "FFFFF749--2230-2239.xdr.zstd").Return(true, nil).Once() + mockDataStore.On("Exists", ctx, "FFFFF767--2200-2209.xdr.zstd").Return(true, nil).Once() //"No end ledger provided, data store is beyond start and archive network latest, and partially into checkpoint frequency padding" uses latest from network="test4" - mockDataStore.On("Exists", ctx, "FFFFF1D1--3630-3639.xdr.gz").Return(true, nil).Once() - mockDataStore.On("Exists", ctx, "FFFFF0D7--3880-3889.xdr.gz").Return(true, nil).Once() - mockDataStore.On("Exists", ctx, "FFFFF05F--4000-4009.xdr.gz").Return(true, nil).Once() - mockDataStore.On("Exists", ctx, "FFFFF023--4060-4069.xdr.gz").Return(true, nil).Once() - mockDataStore.On("Exists", ctx, "FFFFF005--4090-4099.xdr.gz").Return(false, nil).Once() - mockDataStore.On("Exists", ctx, "FFFFF00F--4080-4089.xdr.gz").Return(false, nil).Once() - mockDataStore.On("Exists", ctx, "FFFFF019--4070-4079.xdr.gz").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "FFFFF1D1--3630-3639.xdr.zstd").Return(true, nil).Once() + mockDataStore.On("Exists", ctx, "FFFFF0D7--3880-3889.xdr.zstd").Return(true, nil).Once() + mockDataStore.On("Exists", ctx, "FFFFF05F--4000-4009.xdr.zstd").Return(true, nil).Once() + mockDataStore.On("Exists", ctx, "FFFFF023--4060-4069.xdr.zstd").Return(true, nil).Once() + mockDataStore.On("Exists", ctx, "FFFFF005--4090-4099.xdr.zstd").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "FFFFF00F--4080-4089.xdr.zstd").Return(false, nil).Once() + mockDataStore.On("Exists", ctx, "FFFFF019--4070-4079.xdr.zstd").Return(false, nil).Once() for _, tt := range tests { t.Run(tt.name, func(t *testing.T) {