Skip to content

Commit

Permalink
exp/services/ledgerexporter: Change default compression algorithm fro…
Browse files Browse the repository at this point in the history
…m gzip to zstd
  • Loading branch information
urvisavla committed May 9, 2024
1 parent 735600a commit 81c1d8b
Show file tree
Hide file tree
Showing 17 changed files with 180 additions and 204 deletions.
1 change: 0 additions & 1 deletion exp/services/ledgerexporter/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ destination_bucket_path = "exporter-test/ledgers"
[exporter_config]
ledgers_per_file = 1
files_per_partition = 64000
file_suffix = ".xdr.gz"

[stellar_core_config]
stellar_core_binary_path = "/usr/local/bin/stellar-core"
Expand Down
8 changes: 4 additions & 4 deletions exp/services/ledgerexporter/internal/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestApplyResumeInvalidDataStoreLedgersPerFileBoundary(t *testing.T) {
StartLedger: 3,
EndLedger: 9,
Resume: true,
LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50, FileSuffix: ".xdr.gz"},
LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50},
}
mockResumableManager := &datastore.MockResumableManager{}
// simulate the datastore has inconsistent data,
Expand All @@ -61,7 +61,7 @@ func TestApplyResumeWithPartialRemoteDataPresent(t *testing.T) {
StartLedger: 10,
EndLedger: 99,
Resume: true,
LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50, FileSuffix: ".xdr.gz"},
LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50},
}
mockResumableManager := &datastore.MockResumableManager{}
// simulates a data store that had ledger files populated up to seq=49, so the first absent ledger would be 50
Expand All @@ -80,7 +80,7 @@ func TestApplyResumeWithNoRemoteDataPresent(t *testing.T) {
StartLedger: 10,
EndLedger: 99,
Resume: true,
LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50, FileSuffix: ".xdr.gz"},
LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50},
}
mockResumableManager := &datastore.MockResumableManager{}
// simulates a data store that had no data in the requested range
Expand All @@ -102,7 +102,7 @@ func TestApplyResumeWithNoRemoteDataAndRequestFromGenesis(t *testing.T) {
StartLedger: 2,
EndLedger: 99,
Resume: true,
LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50, FileSuffix: ".xdr.gz"},
LedgerBatchConfig: datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 50},
}
mockResumableManager := &datastore.MockResumableManager{}
// simulates a data store that had no data in the requested range
Expand Down
1 change: 0 additions & 1 deletion exp/services/ledgerexporter/internal/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ func TestNewConfigResumeEnabled(t *testing.T) {
require.Equal(t, config.DataStoreConfig.Type, "ABC")
require.Equal(t, config.LedgerBatchConfig.FilesPerPartition, uint32(1))
require.Equal(t, config.LedgerBatchConfig.LedgersPerFile, uint32(3))
require.Equal(t, config.LedgerBatchConfig.FileSuffix, ".xdr.gz")
require.True(t, config.Resume)
url, ok := config.DataStoreConfig.Params["destination_bucket_path"]
require.True(t, ok)
Expand Down
40 changes: 20 additions & 20 deletions exp/services/ledgerexporter/internal/exportmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/support/collections/set"
"github.com/stellar/go/support/compressxdr"
"github.com/stellar/go/support/datastore"
)

Expand All @@ -39,15 +40,15 @@ func (s *ExportManagerSuite) TearDownTest() {
}

func (s *ExportManagerSuite) TestInvalidExportConfig() {
config := datastore.LedgerBatchConfig{LedgersPerFile: 0, FilesPerPartition: 10, FileSuffix: ".xdr.gz"}
config := datastore.LedgerBatchConfig{LedgersPerFile: 0, FilesPerPartition: 10}
registry := prometheus.NewRegistry()
queue := NewUploadQueue(1, registry)
_, err := NewExportManager(config, &s.mockBackend, queue, registry)
require.Error(s.T(), err)
}

func (s *ExportManagerSuite) TestRun() {
config := datastore.LedgerBatchConfig{LedgersPerFile: 64, FilesPerPartition: 10, FileSuffix: ".xdr.gz"}
config := datastore.LedgerBatchConfig{LedgersPerFile: 64, FilesPerPartition: 10}
registry := prometheus.NewRegistry()
queue := NewUploadQueue(1, registry)
exporter, err := NewExportManager(config, &s.mockBackend, queue, registry)
Expand Down Expand Up @@ -97,7 +98,7 @@ func (s *ExportManagerSuite) TestRun() {
}

func (s *ExportManagerSuite) TestRunContextCancel() {
config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 1, FileSuffix: ".xdr.gz"}
config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 1}
registry := prometheus.NewRegistry()
queue := NewUploadQueue(1, registry)
exporter, err := NewExportManager(config, &s.mockBackend, queue, registry)
Expand Down Expand Up @@ -126,7 +127,7 @@ func (s *ExportManagerSuite) TestRunContextCancel() {
}

func (s *ExportManagerSuite) TestRunWithCanceledContext() {
config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 10, FileSuffix: ".xdr.gz"}
config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 10}
registry := prometheus.NewRegistry()
queue := NewUploadQueue(1, registry)
exporter, err := NewExportManager(config, &s.mockBackend, queue, registry)
Expand All @@ -143,32 +144,31 @@ func (s *ExportManagerSuite) TestGetObjectKeyFromSequenceNumber() {
filesPerPartition uint32
ledgerSeq uint32
ledgersPerFile uint32
fileSuffix string
expectedKey string
}{
{0, 5, 1, ".xdr.gz", "5.xdr.gz"},
{0, 5, 10, ".xdr.gz", "0-9.xdr.gz"},
{2, 10, 100, ".xdr.gz", "0-199/0-99.xdr.gz"},
{2, 150, 50, ".xdr.gz", "100-199/150-199.xdr.gz"},
{2, 300, 200, ".xdr.gz", "0-399/200-399.xdr.gz"},
{2, 1, 1, ".xdr.gz", "0-1/1.xdr.gz"},
{4, 10, 100, ".xdr.gz", "0-399/0-99.xdr.gz"},
{4, 250, 50, ".xdr.gz", "200-399/250-299.xdr.gz"},
{1, 300, 200, ".xdr.gz", "200-399.xdr.gz"},
{1, 1, 1, ".xdr.gz", "1.xdr.gz"},
{0, 5, 1, "5"},
{0, 5, 10, "0-9"},
{2, 10, 100, "0-199/0-99"},
{2, 150, 50, "100-199/150-199"},
{2, 300, 200, "0-399/200-399"},
{2, 1, 1, "0-1/1"},
{4, 10, 100, "0-399/0-99"},
{4, 250, 50, "200-399/250-299"},
{1, 300, 200, "200-399"},
{1, 1, 1, "1"},
}

for _, tc := range testCases {
s.T().Run(fmt.Sprintf("LedgerSeq-%d-LedgersPerFile-%d", tc.ledgerSeq, tc.ledgersPerFile), func(t *testing.T) {
config := datastore.LedgerBatchConfig{FilesPerPartition: tc.filesPerPartition, LedgersPerFile: tc.ledgersPerFile, FileSuffix: tc.fileSuffix}
config := datastore.LedgerBatchConfig{FilesPerPartition: tc.filesPerPartition, LedgersPerFile: tc.ledgersPerFile}
key := config.GetObjectKeyFromSequenceNumber(tc.ledgerSeq)
require.Equal(t, tc.expectedKey, key)
require.Equal(t, tc.expectedKey+".xdr."+compressxdr.DefaultCompressor.GetName(), key)
})
}
}

func (s *ExportManagerSuite) TestAddLedgerCloseMeta() {
config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 10, FileSuffix: ".xdr.gz"}
config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 10}
registry := prometheus.NewRegistry()
queue := NewUploadQueue(1, registry)
exporter, err := NewExportManager(config, &s.mockBackend, queue, registry)
Expand Down Expand Up @@ -206,7 +206,7 @@ func (s *ExportManagerSuite) TestAddLedgerCloseMeta() {
}

func (s *ExportManagerSuite) TestAddLedgerCloseMetaContextCancel() {
config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 10, FileSuffix: ".xdr.gz"}
config := datastore.LedgerBatchConfig{LedgersPerFile: 1, FilesPerPartition: 10}
registry := prometheus.NewRegistry()
queue := NewUploadQueue(1, registry)
exporter, err := NewExportManager(config, &s.mockBackend, queue, registry)
Expand All @@ -224,7 +224,7 @@ func (s *ExportManagerSuite) TestAddLedgerCloseMetaContextCancel() {
}

func (s *ExportManagerSuite) TestAddLedgerCloseMetaKeyMismatch() {
config := datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 1, FileSuffix: ".xdr.gz"}
config := datastore.LedgerBatchConfig{LedgersPerFile: 10, FilesPerPartition: 1}
registry := prometheus.NewRegistry()
queue := NewUploadQueue(1, registry)
exporter, err := NewExportManager(config, &s.mockBackend, queue, registry)
Expand Down
11 changes: 3 additions & 8 deletions exp/services/ledgerexporter/internal/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,10 @@ func (u Uploader) Upload(ctx context.Context, metaArchive *datastore.LedgerMetaA
startTime := time.Now()
numLedgers := strconv.FormatUint(uint64(metaArchive.GetLedgerCount()), 10)

// TODO: Add compression config and optimize best compression algorithm
// JIRA https://stellarorg.atlassian.net/browse/HUBBLE-368
xdrEncoder, err := compressxdr.NewXDREncoder(compressxdr.GZIP, &metaArchive.Data)
if err != nil {
return err
}
xdrEncoder := compressxdr.NewXDREncoder(compressxdr.DefaultCompressor, &metaArchive.Data)

writerTo := &writerToRecorder{
WriterTo: xdrEncoder,
WriterTo: &xdrEncoder,
}
ok, err := u.dataStore.PutFileIfNotExists(ctx, metaArchive.GetObjectKey(), writerTo)
if err != nil {
Expand All @@ -109,7 +104,7 @@ func (u Uploader) Upload(ctx context.Context, metaArchive *datastore.LedgerMetaA
"already_exists": alreadyExists,
}).Observe(float64(writerTo.totalUncompressed))
u.objectSizeMetrics.With(prometheus.Labels{
"compression": compressxdr.GZIP,
"compression": xdrEncoder.Compressor.GetName(),
"ledgers": numLedgers,
"already_exists": alreadyExists,
}).Observe(float64(writerTo.totalCompressed))
Expand Down
12 changes: 6 additions & 6 deletions exp/services/ledgerexporter/internal/uploader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,10 @@ func (s *UploaderSuite) testUpload(putOkReturnVal bool) {

expectedCompressedLength := capturedBuf.Len()
var decodedArchive datastore.LedgerMetaArchive
xdrDecoder, err := compressxdr.NewXDRDecoder(compressxdr.GZIP, &decodedArchive.Data)
require.NoError(s.T(), err)
xdrDecoder := compressxdr.NewXDRDecoder(compressxdr.DefaultCompressor, &decodedArchive.Data)

decoder := xdrDecoder
_, err = decoder.ReadFrom(&capturedBuf)
_, err := decoder.ReadFrom(&capturedBuf)
require.NoError(s.T(), err)

// require that the decoded data matches the original test data
Expand Down Expand Up @@ -98,7 +97,7 @@ func (s *UploaderSuite) testUpload(putOkReturnVal bool) {

metric, err = dataUploader.objectSizeMetrics.MetricVec.GetMetricWith(prometheus.Labels{
"ledgers": "100",
"compression": compressxdr.GZIP,
"compression": decoder.Compressor.GetName(),
"already_exists": strconv.FormatBool(alreadyExists),
})
require.NoError(s.T(), err)
Expand All @@ -114,7 +113,7 @@ func (s *UploaderSuite) testUpload(putOkReturnVal bool) {
)
metric, err = dataUploader.objectSizeMetrics.MetricVec.GetMetricWith(prometheus.Labels{
"ledgers": "100",
"compression": compressxdr.GZIP,
"compression": decoder.Compressor.GetName(),
"already_exists": strconv.FormatBool(!alreadyExists),
})
require.NoError(s.T(), err)
Expand Down Expand Up @@ -172,6 +171,7 @@ func (s *UploaderSuite) testUploadPutError(putOkReturnVal bool) {
dataUploader := NewUploader(&s.mockDataStore, queue, registry)
err := dataUploader.Upload(context.Background(), archive)
require.Equal(s.T(), fmt.Sprintf("error uploading %s: error in PutFileIfNotExists", key), err.Error())
decoder := compressxdr.NewXDRDecoder(compressxdr.DefaultCompressor, nil)

for _, alreadyExists := range []string{"true", "false"} {
metric, err := dataUploader.uploadDurationMetric.MetricVec.GetMetricWith(prometheus.Labels{
Expand All @@ -185,7 +185,7 @@ func (s *UploaderSuite) testUploadPutError(putOkReturnVal bool) {
getMetricValue(metric).GetSummary().GetSampleCount(),
)

for _, compression := range []string{compressxdr.GZIP, "none"} {
for _, compression := range []string{decoder.Compressor.GetName(), "none"} {
metric, err = dataUploader.objectSizeMetrics.MetricVec.GetMetricWith(prometheus.Labels{
"ledgers": "100",
"compression": compression,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ require (
github.com/imkira/go-interpol v1.1.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/compress v1.17.0 // indirect
github.com/klauspost/compress v1.17.0
github.com/kr/text v0.2.0 // indirect
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect
Expand Down
14 changes: 1 addition & 13 deletions ingest/ledgerbackend/buffered_storage_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ var _ LedgerBackend = (*BufferedStorageBackend)(nil)

type BufferedStorageBackendConfig struct {
LedgerBatchConfig datastore.LedgerBatchConfig
CompressionType string
DataStore datastore.DataStore
BufferSize uint32
NumWorkers uint32
Expand Down Expand Up @@ -65,19 +64,8 @@ func NewBufferedStorageBackend(ctx context.Context, config BufferedStorageBacken
return nil, errors.New("ledgersPerFile must be > 0")
}

if config.LedgerBatchConfig.FileSuffix == "" {
return nil, errors.New("no file suffix provided in LedgerBatchConfig")
}

if config.CompressionType == "" {
return nil, errors.New("no compression type provided in config")
}

ledgerMetaArchive := datastore.NewLedgerMetaArchive("", 0, 0)
decoder, err := compressxdr.NewXDRDecoder(config.CompressionType, nil)
if err != nil {
return nil, err
}
decoder := compressxdr.NewXDRDecoder(compressxdr.DefaultCompressor, nil)

bsBackend := &BufferedStorageBackend{
config: config,
Expand Down
19 changes: 8 additions & 11 deletions ingest/ledgerbackend/buffered_storage_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,12 @@ func createBufferedStorageBackendConfigForTesting() BufferedStorageBackendConfig
ledgerBatchConfig := datastore.LedgerBatchConfig{
LedgersPerFile: 1,
FilesPerPartition: 64000,
FileSuffix: ".xdr.gz",
}

dataStore := new(datastore.MockDataStore)

return BufferedStorageBackendConfig{
LedgerBatchConfig: ledgerBatchConfig,
CompressionType: compressxdr.GZIP,
DataStore: dataStore,
BufferSize: 100,
NumWorkers: 5,
Expand All @@ -47,7 +45,7 @@ func createBufferedStorageBackendConfigForTesting() BufferedStorageBackendConfig
func createBufferedStorageBackendForTesting() BufferedStorageBackend {
config := createBufferedStorageBackendConfigForTesting()
ledgerMetaArchive := datastore.NewLedgerMetaArchive("", 0, 0)
decoder, _ := compressxdr.NewXDRDecoder(config.CompressionType, nil)
decoder := compressxdr.NewXDRDecoder(compressxdr.DefaultCompressor, nil)

return BufferedStorageBackend{
config: config,
Expand All @@ -66,10 +64,10 @@ func createMockdataStore(t *testing.T, start, end, partitionSize, count uint32)
if count > 1 {
endFileSeq := i + count - 1
readCloser = createLCMBatchReader(i, endFileSeq, count)
objectName = fmt.Sprintf("0-%d/%d-%d.xdr.gz", partition, i, endFileSeq)
objectName = fmt.Sprintf("0-%d/%d-%d.xdr.zstd", partition, i, endFileSeq)
} else {
readCloser = createLCMBatchReader(i, i, count)
objectName = fmt.Sprintf("0-%d/%d.xdr.gz", partition, i)
objectName = fmt.Sprintf("0-%d/%d.xdr.zstd", partition, i)
}
mockDataStore.On("GetFile", mock.Anything, objectName).Return(readCloser, nil)
}
Expand Down Expand Up @@ -104,7 +102,7 @@ func createTestLedgerCloseMetaBatch(startSeq, endSeq, count uint32) xdr.LedgerCl

func createLCMBatchReader(start, end, count uint32) io.ReadCloser {
testData := createTestLedgerCloseMetaBatch(start, end, count)
encoder, _ := compressxdr.NewXDREncoder(compressxdr.GZIP, testData)
encoder := compressxdr.NewXDREncoder(compressxdr.DefaultCompressor, testData)
var buf bytes.Buffer
encoder.WriteTo(&buf)
capturedBuf := buf.Bytes()
Expand All @@ -120,7 +118,6 @@ func TestNewBufferedStorageBackend(t *testing.T) {
assert.NoError(t, err)

assert.Equal(t, bsb.dataStore, config.DataStore)
assert.Equal(t, ".xdr.gz", bsb.config.LedgerBatchConfig.FileSuffix)
assert.Equal(t, uint32(1), bsb.config.LedgerBatchConfig.LedgersPerFile)
assert.Equal(t, uint32(64000), bsb.config.LedgerBatchConfig.FilesPerPartition)
assert.Equal(t, uint32(100), bsb.config.BufferSize)
Expand Down Expand Up @@ -440,7 +437,7 @@ func TestLedgerBufferClose(t *testing.T) {
mockDataStore := new(datastore.MockDataStore)
partition := ledgerPerFileCount*partitionSize - 1

objectName := fmt.Sprintf("0-%d/%d.xdr.gz", partition, 3)
objectName := fmt.Sprintf("0-%d/%d.xdr.zstd", partition, 3)
afterPrepareRange := make(chan struct{})
mockDataStore.On("GetFile", mock.Anything, objectName).Return(io.NopCloser(&bytes.Buffer{}), context.Canceled).Run(func(args mock.Arguments) {
<-afterPrepareRange
Expand Down Expand Up @@ -472,7 +469,7 @@ func TestLedgerBufferBoundedObjectNotFound(t *testing.T) {
mockDataStore := new(datastore.MockDataStore)
partition := ledgerPerFileCount*partitionSize - 1

objectName := fmt.Sprintf("0-%d/%d.xdr.gz", partition, 3)
objectName := fmt.Sprintf("0-%d/%d.xdr.zstd", partition, 3)
mockDataStore.On("GetFile", mock.Anything, objectName).Return(io.NopCloser(&bytes.Buffer{}), os.ErrNotExist).Once()
t.Cleanup(func() {
mockDataStore.AssertExpectations(t)
Expand All @@ -498,7 +495,7 @@ func TestLedgerBufferUnboundedObjectNotFound(t *testing.T) {
mockDataStore := new(datastore.MockDataStore)
partition := ledgerPerFileCount*partitionSize - 1

objectName := fmt.Sprintf("0-%d/%d.xdr.gz", partition, 3)
objectName := fmt.Sprintf("0-%d/%d.xdr.zstd", partition, 3)
iteration := &atomic.Int32{}
cancelAfter := int32(bsb.config.RetryLimit) + 2
mockDataStore.On("GetFile", mock.Anything, objectName).Return(io.NopCloser(&bytes.Buffer{}), os.ErrNotExist).Run(func(args mock.Arguments) {
Expand Down Expand Up @@ -530,7 +527,7 @@ func TestLedgerBufferRetryLimit(t *testing.T) {
mockDataStore := new(datastore.MockDataStore)
partition := ledgerPerFileCount*partitionSize - 1

objectName := fmt.Sprintf("0-%d/%d.xdr.gz", partition, 3)
objectName := fmt.Sprintf("0-%d/%d.xdr.zstd", partition, 3)
mockDataStore.On("GetFile", mock.Anything, objectName).
Return(io.NopCloser(&bytes.Buffer{}), fmt.Errorf("transient error")).
Times(int(bsb.config.RetryLimit) + 1)
Expand Down
Loading

0 comments on commit 81c1d8b

Please sign in to comment.