Skip to content

Commit

Permalink
exp/services/ledgerexporter: Change default compression algorithm fro…
Browse files Browse the repository at this point in the history
…m gzip to zstd (stellar#5306)

---------

Co-authored-by: tamirms <[email protected]>
  • Loading branch information
urvisavla and tamirms authored May 10, 2024
1 parent 0f6abb0 commit 79f44c6
Show file tree
Hide file tree
Showing 18 changed files with 162 additions and 241 deletions.
1 change: 0 additions & 1 deletion exp/services/ledgerexporter/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ destination_bucket_path = "exporter-test/ledgers"
[exporter_config]
ledgers_per_file = 1
files_per_partition = 64000
file_suffix = ".xdr.gz"

[stellar_core_config]
stellar_core_binary_path = "/usr/local/bin/stellar-core"
Expand Down
8 changes: 4 additions & 4 deletions exp/services/ledgerexporter/internal/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestApplyResumeInvalidDataStoreLedgersPerFileBoundary(t *testing.T) {
StartLedger: 3,
EndLedger: 9,
Resume: true,
LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50, FileSuffix: ".xdr.gz"},
LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50},
}
mockResumableManager := &datastore.MockResumableManager{}
// simulate the datastore has inconsistent data,
Expand All @@ -61,7 +61,7 @@ func TestApplyResumeWithPartialRemoteDataPresent(t *testing.T) {
StartLedger: 10,
EndLedger: 99,
Resume: true,
LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50, FileSuffix: ".xdr.gz"},
LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50},
}
mockResumableManager := &datastore.MockResumableManager{}
// simulates a data store that had ledger files populated up to seq=49, so the first absent ledger would be 50
Expand All @@ -80,7 +80,7 @@ func TestApplyResumeWithNoRemoteDataPresent(t *testing.T) {
StartLedger: 10,
EndLedger: 99,
Resume: true,
LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50, FileSuffix: ".xdr.gz"},
LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50},
}
mockResumableManager := &datastore.MockResumableManager{}
// simulates a data store that had no data in the requested range
Expand All @@ -102,7 +102,7 @@ func TestApplyResumeWithNoRemoteDataAndRequestFromGenesis(t *testing.T) {
StartLedger: 2,
EndLedger: 99,
Resume: true,
LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50, FileSuffix: ".xdr.gz"},
LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50},
}
mockResumableManager := &datastore.MockResumableManager{}
// simulates a data store that had no data in the requested range
Expand Down
1 change: 0 additions & 1 deletion exp/services/ledgerexporter/internal/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ func TestNewConfigResumeEnabled(t *testing.T) {
require.Equal(t, config.DataStoreConfig.Type, "ABC")
require.Equal(t, config.LedgerBatchConfig.FilesPerPartition, uint32(1))
require.Equal(t, config.LedgerBatchConfig.LedgersPerFile, uint32(3))
require.Equal(t, config.LedgerBatchConfig.FileSuffix, ".xdr.gz")
require.True(t, config.Resume)
url, ok := config.DataStoreConfig.Params["destination_bucket_path"]
require.True(t, ok)
Expand Down
14 changes: 7 additions & 7 deletions exp/services/ledgerexporter/internal/exportmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ func (s *ExportManagerSuite) TearDownTest() {
}

func (s *ExportManagerSuite) TestInvalidExportConfig() {
config := datastore.LedgerBatchConfig{LedgersPerFile: 0, FilesPerPartition: 10, FileSuffix: ".xdr.gz"}
config := datastore.LedgerBatchConfig{LedgersPerFile: 0, FilesPerPartition: 10}
registry := prometheus.NewRegistry()
queue := NewUploadQueue(1, registry)
_, err := NewExportManager(config, &s.mockBackend, queue, registry)
require.Error(s.T(), err)
}

func (s *ExportManagerSuite) TestRun() {
config := datastore.LedgerBatchConfig{LedgersPerFile: 64, FilesPerPartition: 10, FileSuffix: ".xdr.gz"}
config := datastore.LedgerBatchConfig{LedgersPerFile: 64, FilesPerPartition: 10}
registry := prometheus.NewRegistry()
queue := NewUploadQueue(1, registry)
exporter, err := NewExportManager(config, &s.mockBackend, queue, registry)
Expand Down Expand Up @@ -96,7 +96,7 @@ func (s *ExportManagerSuite) TestRun() {
}

func (s *ExportManagerSuite) TestRunContextCancel() {
config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 1, FileSuffix: ".xdr.gz"}
config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 1}
registry := prometheus.NewRegistry()
queue := NewUploadQueue(1, registry)
exporter, err := NewExportManager(config, &s.mockBackend, queue, registry)
Expand Down Expand Up @@ -125,7 +125,7 @@ func (s *ExportManagerSuite) TestRunContextCancel() {
}

func (s *ExportManagerSuite) TestRunWithCanceledContext() {
config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 10, FileSuffix: ".xdr.gz"}
config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 10}
registry := prometheus.NewRegistry()
queue := NewUploadQueue(1, registry)
exporter, err := NewExportManager(config, &s.mockBackend, queue, registry)
Expand All @@ -138,7 +138,7 @@ func (s *ExportManagerSuite) TestRunWithCanceledContext() {
}

func (s *ExportManagerSuite) TestAddLedgerCloseMeta() {
config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 10, FileSuffix: ".xdr.gz"}
config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 10}
registry := prometheus.NewRegistry()
queue := NewUploadQueue(1, registry)
exporter, err := NewExportManager(config, &s.mockBackend, queue, registry)
Expand Down Expand Up @@ -176,7 +176,7 @@ func (s *ExportManagerSuite) TestAddLedgerCloseMeta() {
}

func (s *ExportManagerSuite) TestAddLedgerCloseMetaContextCancel() {
config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 10, FileSuffix: ".xdr.gz"}
config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 10}
registry := prometheus.NewRegistry()
queue := NewUploadQueue(1, registry)
exporter, err := NewExportManager(config, &s.mockBackend, queue, registry)
Expand All @@ -194,7 +194,7 @@ func (s *ExportManagerSuite) TestAddLedgerCloseMetaContextCancel() {
}

func (s *ExportManagerSuite) TestAddLedgerCloseMetaKeyMismatch() {
config := datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 1, FileSuffix: ".xdr.gz"}
config := datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 1}
registry := prometheus.NewRegistry()
queue := NewUploadQueue(1, registry)
exporter, err := NewExportManager(config, &s.mockBackend, queue, registry)
Expand Down
9 changes: 2 additions & 7 deletions exp/services/ledgerexporter/internal/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,7 @@ func (u Uploader) Upload(ctx context.Context, metaArchive *datastore.LedgerMetaA
startTime := time.Now()
numLedgers := strconv.FormatUint(uint64(metaArchive.GetLedgerCount()), 10)

// TODO: Add compression config and optimize best compression algorithm
// JIRA https://stellarorg.atlassian.net/browse/HUBBLE-368
xdrEncoder, err := compressxdr.NewXDREncoder(compressxdr.GZIP, &metaArchive.Data)
if err != nil {
return err
}
xdrEncoder := compressxdr.NewXDREncoder(compressxdr.DefaultCompressor, &metaArchive.Data)

writerTo := &writerToRecorder{
WriterTo: xdrEncoder,
Expand All @@ -109,7 +104,7 @@ func (u Uploader) Upload(ctx context.Context, metaArchive *datastore.LedgerMetaA
"already_exists": alreadyExists,
}).Observe(float64(writerTo.totalUncompressed))
u.objectSizeMetrics.With(prometheus.Labels{
"compression": compressxdr.GZIP,
"compression": xdrEncoder.Compressor.Name(),
"ledgers": numLedgers,
"already_exists": alreadyExists,
}).Observe(float64(writerTo.totalCompressed))
Expand Down
11 changes: 5 additions & 6 deletions exp/services/ledgerexporter/internal/uploader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,10 @@ func (s *UploaderSuite) testUpload(putOkReturnVal bool) {

expectedCompressedLength := capturedBuf.Len()
var decodedArchive datastore.LedgerMetaArchive
xdrDecoder, err := compressxdr.NewXDRDecoder(compressxdr.GZIP, &decodedArchive.Data)
require.NoError(s.T(), err)
xdrDecoder := compressxdr.NewXDRDecoder(compressxdr.DefaultCompressor, &decodedArchive.Data)

decoder := xdrDecoder
_, err = decoder.ReadFrom(&capturedBuf)
_, err := decoder.ReadFrom(&capturedBuf)
require.NoError(s.T(), err)

// require that the decoded data matches the original test data
Expand Down Expand Up @@ -98,7 +97,7 @@ func (s *UploaderSuite) testUpload(putOkReturnVal bool) {

metric, err = dataUploader.objectSizeMetrics.MetricVec.GetMetricWith(prometheus.Labels{
"ledgers": "100",
"compression": compressxdr.GZIP,
"compression": decoder.Compressor.Name(),
"already_exists": strconv.FormatBool(alreadyExists),
})
require.NoError(s.T(), err)
Expand All @@ -114,7 +113,7 @@ func (s *UploaderSuite) testUpload(putOkReturnVal bool) {
)
metric, err = dataUploader.objectSizeMetrics.MetricVec.GetMetricWith(prometheus.Labels{
"ledgers": "100",
"compression": compressxdr.GZIP,
"compression": decoder.Compressor.Name(),
"already_exists": strconv.FormatBool(!alreadyExists),
})
require.NoError(s.T(), err)
Expand Down Expand Up @@ -185,7 +184,7 @@ func (s *UploaderSuite) testUploadPutError(putOkReturnVal bool) {
getMetricValue(metric).GetSummary().GetSampleCount(),
)

for _, compression := range []string{compressxdr.GZIP, "none"} {
for _, compression := range []string{compressxdr.DefaultCompressor.Name(), "none"} {
metric, err = dataUploader.objectSizeMetrics.MetricVec.GetMetricWith(prometheus.Labels{
"ledgers": "100",
"compression": compression,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ require (
github.com/imkira/go-interpol v1.1.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/klauspost/compress v1.17.0
github.com/kr/text v0.2.0 // indirect
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect
Expand Down
26 changes: 2 additions & 24 deletions ingest/ledgerbackend/buffered_storage_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/pkg/errors"

"github.com/stellar/go/support/compressxdr"
"github.com/stellar/go/support/datastore"
"github.com/stellar/go/xdr"
)
Expand All @@ -20,7 +19,6 @@ var _ LedgerBackend = (*BufferedStorageBackend)(nil)

type BufferedStorageBackendConfig struct {
LedgerBatchConfig datastore.LedgerBatchConfig
CompressionType string
DataStore datastore.DataStore
BufferSize uint32
NumWorkers uint32
Expand All @@ -42,7 +40,6 @@ type BufferedStorageBackend struct {
prepared *Range // Non-nil if any range is prepared
closed bool // False until the core is closed
ledgerMetaArchive *datastore.LedgerMetaArchive
decoder compressxdr.XDRDecoder
nextLedger uint32
lastLedger uint32
}
Expand All @@ -65,25 +62,12 @@ func NewBufferedStorageBackend(ctx context.Context, config BufferedStorageBacken
return nil, errors.New("ledgersPerFile must be > 0")
}

if config.LedgerBatchConfig.FileSuffix == "" {
return nil, errors.New("no file suffix provided in LedgerBatchConfig")
}

if config.CompressionType == "" {
return nil, errors.New("no compression type provided in config")
}

ledgerMetaArchive := datastore.NewLedgerMetaArchive("", 0, 0)
decoder, err := compressxdr.NewXDRDecoder(config.CompressionType, nil)
if err != nil {
return nil, err
}

bsBackend := &BufferedStorageBackend{
config: config,
dataStore: config.DataStore,
ledgerMetaArchive: ledgerMetaArchive,
decoder: decoder,
}

return bsBackend, nil
Expand Down Expand Up @@ -125,17 +109,11 @@ func (bsb *BufferedStorageBackend) getBatchForSequence(ctx context.Context, sequ
}

// Sequence is beyond the current LedgerCloseMetaBatch
lcmBatchBinary, err := bsb.ledgerBuffer.getFromLedgerQueue(ctx)
var err error
bsb.ledgerMetaArchive.Data, err = bsb.ledgerBuffer.getFromLedgerQueue(ctx)
if err != nil {
return errors.Wrap(err, "failed getting next ledger batch from queue")
}

// Turn binary into xdr
err = bsb.ledgerMetaArchive.Data.UnmarshalBinary(lcmBatchBinary)
if err != nil {
return errors.Wrap(err, "failed unmarshalling lcmBatchBinary")
}

return nil
}

Expand Down
19 changes: 7 additions & 12 deletions ingest/ledgerbackend/buffered_storage_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,12 @@ func createBufferedStorageBackendConfigForTesting() BufferedStorageBackendConfig
ledgerBatchConfig := datastore.LedgerBatchConfig{
LedgersPerFile: 1,
FilesPerPartition: 64000,
FileSuffix: ".xdr.gz",
}

dataStore := new(datastore.MockDataStore)

return BufferedStorageBackendConfig{
LedgerBatchConfig: ledgerBatchConfig,
CompressionType: compressxdr.GZIP,
DataStore: dataStore,
BufferSize: 100,
NumWorkers: 5,
Expand All @@ -48,13 +46,11 @@ func createBufferedStorageBackendConfigForTesting() BufferedStorageBackendConfig
func createBufferedStorageBackendForTesting() BufferedStorageBackend {
config := createBufferedStorageBackendConfigForTesting()
ledgerMetaArchive := datastore.NewLedgerMetaArchive("", 0, 0)
decoder, _ := compressxdr.NewXDRDecoder(config.CompressionType, nil)

return BufferedStorageBackend{
config: config,
dataStore: config.DataStore,
ledgerMetaArchive: ledgerMetaArchive,
decoder: decoder,
}
}

Expand All @@ -67,10 +63,10 @@ func createMockdataStore(t *testing.T, start, end, partitionSize, count uint32)
if count > 1 {
endFileSeq := i + count - 1
readCloser = createLCMBatchReader(i, endFileSeq, count)
objectName = fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d-%d.xdr.gz", partition, math.MaxUint32-i, i, endFileSeq)
objectName = fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d-%d.xdr.zstd", partition, math.MaxUint32-i, i, endFileSeq)
} else {
readCloser = createLCMBatchReader(i, i, count)
objectName = fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.gz", partition, math.MaxUint32-i, i)
objectName = fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.zstd", partition, math.MaxUint32-i, i)
}
mockDataStore.On("GetFile", mock.Anything, objectName).Return(readCloser, nil)
}
Expand Down Expand Up @@ -105,7 +101,7 @@ func createTestLedgerCloseMetaBatch(startSeq, endSeq, count uint32) xdr.LedgerCl

func createLCMBatchReader(start, end, count uint32) io.ReadCloser {
testData := createTestLedgerCloseMetaBatch(start, end, count)
encoder, _ := compressxdr.NewXDREncoder(compressxdr.GZIP, testData)
encoder := compressxdr.NewXDREncoder(compressxdr.DefaultCompressor, testData)
var buf bytes.Buffer
encoder.WriteTo(&buf)
capturedBuf := buf.Bytes()
Expand All @@ -121,7 +117,6 @@ func TestNewBufferedStorageBackend(t *testing.T) {
assert.NoError(t, err)

assert.Equal(t, bsb.dataStore, config.DataStore)
assert.Equal(t, ".xdr.gz", bsb.config.LedgerBatchConfig.FileSuffix)
assert.Equal(t, uint32(1), bsb.config.LedgerBatchConfig.LedgersPerFile)
assert.Equal(t, uint32(64000), bsb.config.LedgerBatchConfig.FilesPerPartition)
assert.Equal(t, uint32(100), bsb.config.BufferSize)
Expand Down Expand Up @@ -441,7 +436,7 @@ func TestLedgerBufferClose(t *testing.T) {
mockDataStore := new(datastore.MockDataStore)
partition := ledgerPerFileCount*partitionSize - 1

objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.gz", partition, math.MaxUint32-3, 3)
objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.zstd", partition, math.MaxUint32-3, 3)
afterPrepareRange := make(chan struct{})
mockDataStore.On("GetFile", mock.Anything, objectName).Return(io.NopCloser(&bytes.Buffer{}), context.Canceled).Run(func(args mock.Arguments) {
<-afterPrepareRange
Expand Down Expand Up @@ -473,7 +468,7 @@ func TestLedgerBufferBoundedObjectNotFound(t *testing.T) {
mockDataStore := new(datastore.MockDataStore)
partition := ledgerPerFileCount*partitionSize - 1

objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.gz", partition, math.MaxUint32-3, 3)
objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.zstd", partition, math.MaxUint32-3, 3)
mockDataStore.On("GetFile", mock.Anything, objectName).Return(io.NopCloser(&bytes.Buffer{}), os.ErrNotExist).Once()
t.Cleanup(func() {
mockDataStore.AssertExpectations(t)
Expand All @@ -499,7 +494,7 @@ func TestLedgerBufferUnboundedObjectNotFound(t *testing.T) {
mockDataStore := new(datastore.MockDataStore)
partition := ledgerPerFileCount*partitionSize - 1

objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.gz", partition, math.MaxUint32-3, 3)
objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.zstd", partition, math.MaxUint32-3, 3)
iteration := &atomic.Int32{}
cancelAfter := int32(bsb.config.RetryLimit) + 2
mockDataStore.On("GetFile", mock.Anything, objectName).Return(io.NopCloser(&bytes.Buffer{}), os.ErrNotExist).Run(func(args mock.Arguments) {
Expand Down Expand Up @@ -531,7 +526,7 @@ func TestLedgerBufferRetryLimit(t *testing.T) {
mockDataStore := new(datastore.MockDataStore)
partition := ledgerPerFileCount*partitionSize - 1

objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.gz", partition, math.MaxUint32-3, 3)
objectName := fmt.Sprintf("FFFFFFFF--0-%d/%08X--%d.xdr.zstd", partition, math.MaxUint32-3, 3)
mockDataStore.On("GetFile", mock.Anything, objectName).
Return(io.NopCloser(&bytes.Buffer{}), fmt.Errorf("transient error")).
Times(int(bsb.config.RetryLimit) + 1)
Expand Down
Loading

0 comments on commit 79f44c6

Please sign in to comment.