From 4b1b50d8026ea2b20d1d90d5552233f68b86f24d Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Thu, 2 May 2024 11:44:06 -0400 Subject: [PATCH] wip --- ingest/ledgerbackend/gcs_backend.go | 70 ++++++++++++------------ ingest/ledgerbackend/gcs_backend_test.go | 18 +++--- 2 files changed, 44 insertions(+), 44 deletions(-) diff --git a/ingest/ledgerbackend/gcs_backend.go b/ingest/ledgerbackend/gcs_backend.go index 0386524fdf..10220e8233 100644 --- a/ingest/ledgerbackend/gcs_backend.go +++ b/ingest/ledgerbackend/gcs_backend.go @@ -32,19 +32,19 @@ type BufferConfig struct { RetryWait time.Duration } -type gcsBackendConfig struct { - bufferConfig BufferConfig - dataStoreConfig datastore.DataStoreConfig - ledgerBatchConfig datastore.LedgerBatchConfig - storageUrl string - network string - compressionType string +type GCSBackendConfig struct { + BufferConfig BufferConfig + DataStoreConfig datastore.DataStoreConfig + LedgerBatchConfig datastore.LedgerBatchConfig + StorageUrl string + Network string + CompressionType string } // GCSBackend is a ledger backend that reads from a cloud storage service. // The cloud storage service contains files generated from the ledgerExporter. type GCSBackend struct { - config gcsBackendConfig + config GCSBackendConfig context context.Context // cancel is the CancelCauseFunc for context which controls the lifetime of a GCSBackend instance. @@ -67,7 +67,7 @@ type GCSBackend struct { } type ledgerBufferGCS struct { - config gcsBackendConfig + config GCSBackendConfig dataStore datastore.DataStore taskQueue chan uint32 // buffer next gcs object read ledgerQueue chan []byte // order corrected lcm batches @@ -94,18 +94,18 @@ func (gcsb *GCSBackend) NewLedgerBuffer(ledgerRange Range) (*ledgerBufferGCS, er less := func(a, b LedgerBatchObject) bool { return a.StartLedger < b.StartLedger } - pq := heap.New(less, int(gcsb.config.bufferConfig.BufferSize)) + pq := heap.New(less, int(gcsb.config.BufferConfig.BufferSize)) done := make(chan struct{}) ledgerBuffer := &ledgerBufferGCS{ config: gcsb.config, dataStore: gcsb.dataStore, - taskQueue: make(chan uint32, gcsb.config.bufferConfig.BufferSize), - ledgerQueue: make(chan []byte, gcsb.config.bufferConfig.BufferSize), + taskQueue: make(chan uint32, gcsb.config.BufferConfig.BufferSize), + ledgerQueue: make(chan []byte, gcsb.config.BufferConfig.BufferSize), ledgerPriorityQueue: pq, count: 0, - limit: gcsb.config.bufferConfig.BufferSize, + limit: gcsb.config.BufferConfig.BufferSize, done: done, currentLedger: ledgerRange.from, nextTaskLedger: ledgerRange.from, @@ -117,7 +117,7 @@ func (gcsb *GCSBackend) NewLedgerBuffer(ledgerRange Range) (*ledgerBufferGCS, er } // Workers to read LCM files - for i := uint32(0); i < gcsb.config.bufferConfig.NumWorkers; i++ { + for i := uint32(0); i < gcsb.config.BufferConfig.NumWorkers; i++ { go ledgerBuffer.worker() } @@ -131,7 +131,7 @@ func (lb *ledgerBufferGCS) pushTaskQueue() { return } lb.taskQueue <- lb.nextTaskLedger - lb.nextTaskLedger += lb.config.ledgerBatchConfig.LedgersPerFile + lb.nextTaskLedger += lb.config.LedgerBatchConfig.LedgersPerFile lb.count++ } } @@ -147,22 +147,22 @@ func (lb *ledgerBufferGCS) worker() { return case sequence := <-lb.taskQueue: retryCount := uint32(0) - for retryCount <= lb.config.bufferConfig.RetryLimit { + for retryCount <= lb.config.BufferConfig.RetryLimit { ledgerObject, err := lb.getLedgerGCSObject(sequence) if err != nil { if e, ok := err.(*googleapi.Error); ok { // ledgerObject not found and unbounded if e.Code == 404 && !lb.ledgerRange.bounded { - time.Sleep(lb.config.bufferConfig.RetryWait * time.Second) + time.Sleep(lb.config.BufferConfig.RetryWait * time.Second) continue } } - if retryCount == lb.config.bufferConfig.RetryLimit { + if retryCount == lb.config.BufferConfig.RetryLimit { err = errors.New("maximum retries exceeded for gcs object reads") lb.cancel(err) } retryCount++ - time.Sleep(lb.config.bufferConfig.RetryWait * time.Second) + time.Sleep(lb.config.BufferConfig.RetryWait * time.Second) } // Add to priority queue and continue to next task @@ -174,7 +174,7 @@ func (lb *ledgerBufferGCS) worker() { } func (lb *ledgerBufferGCS) getLedgerGCSObject(sequence uint32) ([]byte, error) { - objectKey := lb.config.ledgerBatchConfig.GetObjectKeyFromSequenceNumber(sequence) + objectKey := lb.config.LedgerBatchConfig.GetObjectKeyFromSequenceNumber(sequence) reader, err := lb.dataStore.GetFile(context.Background(), objectKey) if err != nil { @@ -228,42 +228,42 @@ func (lb *ledgerBufferGCS) getFromLedgerQueue() ([]byte, error) { } // Return a new GCSBackend instance. -func NewGCSBackend(ctx context.Context, config gcsBackendConfig) (*GCSBackend, error) { +func NewGCSBackend(ctx context.Context, config GCSBackendConfig) (*GCSBackend, error) { // Check/set minimum config values - if config.storageUrl == "" { + if config.StorageUrl == "" { return nil, errors.New("storageURL is not set") } - if config.ledgerBatchConfig.FileSuffix == "" { + if config.LedgerBatchConfig.FileSuffix == "" { return nil, errors.New("ledgerBatchConfig.FileSuffix is not set") } - if config.ledgerBatchConfig.LedgersPerFile == 0 { - config.ledgerBatchConfig.LedgersPerFile = 1 + if config.LedgerBatchConfig.LedgersPerFile == 0 { + config.LedgerBatchConfig.LedgersPerFile = 1 } - if config.ledgerBatchConfig.FilesPerPartition == 0 { - config.ledgerBatchConfig.FilesPerPartition = 1 + if config.LedgerBatchConfig.FilesPerPartition == 0 { + config.LedgerBatchConfig.FilesPerPartition = 1 } // Check/set minimum config values - if config.bufferConfig.BufferSize == 0 { - config.bufferConfig.BufferSize = 1 + if config.BufferConfig.BufferSize == 0 { + config.BufferConfig.BufferSize = 1 } - if config.bufferConfig.NumWorkers == 0 { - config.bufferConfig.NumWorkers = 1 + if config.BufferConfig.NumWorkers == 0 { + config.BufferConfig.NumWorkers = 1 } ctx, cancel := context.WithCancelCause(ctx) - dataStore, err := datastore.NewDataStore(ctx, config.dataStoreConfig, config.network) + dataStore, err := datastore.NewDataStore(ctx, config.DataStoreConfig, config.Network) if err != nil { return nil, err } ledgerMetaArchive := datastore.NewLedgerMetaArchive("", 0, 0) - decoder, err := compressxdr.NewXDRDecoder(config.compressionType, nil) + decoder, err := compressxdr.NewXDRDecoder(config.CompressionType, nil) if err != nil { return nil, err } @@ -285,11 +285,11 @@ func (gcsb *GCSBackend) GetLatestLedgerSequence(ctx context.Context) (uint32, er var err error var archive historyarchive.ArchiveInterface - if archive, err = datastore.CreateHistoryArchiveFromNetworkName(ctx, gcsb.config.network); err != nil { + if archive, err = datastore.CreateHistoryArchiveFromNetworkName(ctx, gcsb.config.Network); err != nil { return 0, err } - resumableManager := datastore.NewResumableManager(gcsb.dataStore, gcsb.config.network, gcsb.config.ledgerBatchConfig, archive) + resumableManager := datastore.NewResumableManager(gcsb.dataStore, gcsb.config.Network, gcsb.config.LedgerBatchConfig, archive) // Start at 2 to skip the genesis ledger absentLedger, ok, err := resumableManager.FindStart(ctx, 2, 0) if err != nil { diff --git a/ingest/ledgerbackend/gcs_backend_test.go b/ingest/ledgerbackend/gcs_backend_test.go index 4432fd1d90..c128e0547f 100644 --- a/ingest/ledgerbackend/gcs_backend_test.go +++ b/ingest/ledgerbackend/gcs_backend_test.go @@ -9,7 +9,7 @@ import ( "github.com/stretchr/testify/assert" ) -func createGCSBackendConfigForTesting() gcsBackendConfig { +func createGCSBackendConfigForTesting() GCSBackendConfig { bufferConfig := BufferConfig{ BufferSize: 1000, NumWorkers: 5, @@ -30,13 +30,13 @@ func createGCSBackendConfigForTesting() gcsBackendConfig { FileSuffix: ".xdr.gz", } - return gcsBackendConfig{ - bufferConfig: bufferConfig, - dataStoreConfig: dataStoreConfig, - ledgerBatchConfig: ledgerBatchConfig, - storageUrl: "testURL", - network: "testnet", - compressionType: compressxdr.GZIP, + return GCSBackendConfig{ + BufferConfig: bufferConfig, + DataStoreConfig: dataStoreConfig, + LedgerBatchConfig: ledgerBatchConfig, + StorageUrl: "testURL", + Network: "testnet", + CompressionType: compressxdr.GZIP, } } @@ -45,7 +45,7 @@ func createGCSBackendForTesting() GCSBackend { ctx := context.Background() mockDataStore := new(datastore.MockDataStore) ledgerMetaArchive := datastore.NewLedgerMetaArchive("", 0, 0) - decoder, _ := compressxdr.NewXDRDecoder(config.compressionType, nil) + decoder, _ := compressxdr.NewXDRDecoder(config.CompressionType, nil) return GCSBackend{ config: config,