Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
chowbao committed May 2, 2024
1 parent fdd3637 commit 4b1b50d
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 44 deletions.
70 changes: 35 additions & 35 deletions ingest/ledgerbackend/gcs_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,19 @@ type BufferConfig struct {
RetryWait time.Duration
}

type gcsBackendConfig struct {
bufferConfig BufferConfig
dataStoreConfig datastore.DataStoreConfig
ledgerBatchConfig datastore.LedgerBatchConfig
storageUrl string
network string
compressionType string
type GCSBackendConfig struct {
BufferConfig BufferConfig
DataStoreConfig datastore.DataStoreConfig
LedgerBatchConfig datastore.LedgerBatchConfig
StorageUrl string
Network string
CompressionType string
}

// GCSBackend is a ledger backend that reads from a cloud storage service.
// The cloud storage service contains files generated from the ledgerExporter.
type GCSBackend struct {
config gcsBackendConfig
config GCSBackendConfig

context context.Context
// cancel is the CancelCauseFunc for context which controls the lifetime of a GCSBackend instance.
Expand All @@ -67,7 +67,7 @@ type GCSBackend struct {
}

type ledgerBufferGCS struct {
config gcsBackendConfig
config GCSBackendConfig
dataStore datastore.DataStore
taskQueue chan uint32 // buffer next gcs object read
ledgerQueue chan []byte // order corrected lcm batches
Expand All @@ -94,18 +94,18 @@ func (gcsb *GCSBackend) NewLedgerBuffer(ledgerRange Range) (*ledgerBufferGCS, er
less := func(a, b LedgerBatchObject) bool {
return a.StartLedger < b.StartLedger
}
pq := heap.New(less, int(gcsb.config.bufferConfig.BufferSize))
pq := heap.New(less, int(gcsb.config.BufferConfig.BufferSize))

done := make(chan struct{})

ledgerBuffer := &ledgerBufferGCS{
config: gcsb.config,
dataStore: gcsb.dataStore,
taskQueue: make(chan uint32, gcsb.config.bufferConfig.BufferSize),
ledgerQueue: make(chan []byte, gcsb.config.bufferConfig.BufferSize),
taskQueue: make(chan uint32, gcsb.config.BufferConfig.BufferSize),
ledgerQueue: make(chan []byte, gcsb.config.BufferConfig.BufferSize),
ledgerPriorityQueue: pq,
count: 0,
limit: gcsb.config.bufferConfig.BufferSize,
limit: gcsb.config.BufferConfig.BufferSize,
done: done,
currentLedger: ledgerRange.from,
nextTaskLedger: ledgerRange.from,
Expand All @@ -117,7 +117,7 @@ func (gcsb *GCSBackend) NewLedgerBuffer(ledgerRange Range) (*ledgerBufferGCS, er
}

// Workers to read LCM files
for i := uint32(0); i < gcsb.config.bufferConfig.NumWorkers; i++ {
for i := uint32(0); i < gcsb.config.BufferConfig.NumWorkers; i++ {
go ledgerBuffer.worker()
}

Expand All @@ -131,7 +131,7 @@ func (lb *ledgerBufferGCS) pushTaskQueue() {
return
}
lb.taskQueue <- lb.nextTaskLedger
lb.nextTaskLedger += lb.config.ledgerBatchConfig.LedgersPerFile
lb.nextTaskLedger += lb.config.LedgerBatchConfig.LedgersPerFile
lb.count++
}
}
Expand All @@ -147,22 +147,22 @@ func (lb *ledgerBufferGCS) worker() {
return
case sequence := <-lb.taskQueue:
retryCount := uint32(0)
for retryCount <= lb.config.bufferConfig.RetryLimit {
for retryCount <= lb.config.BufferConfig.RetryLimit {
ledgerObject, err := lb.getLedgerGCSObject(sequence)
if err != nil {
if e, ok := err.(*googleapi.Error); ok {
// ledgerObject not found and unbounded
if e.Code == 404 && !lb.ledgerRange.bounded {
time.Sleep(lb.config.bufferConfig.RetryWait * time.Second)
time.Sleep(lb.config.BufferConfig.RetryWait * time.Second)
continue
}
}
if retryCount == lb.config.bufferConfig.RetryLimit {
if retryCount == lb.config.BufferConfig.RetryLimit {
err = errors.New("maximum retries exceeded for gcs object reads")
lb.cancel(err)
}
retryCount++
time.Sleep(lb.config.bufferConfig.RetryWait * time.Second)
time.Sleep(lb.config.BufferConfig.RetryWait * time.Second)
}

// Add to priority queue and continue to next task
Expand All @@ -174,7 +174,7 @@ func (lb *ledgerBufferGCS) worker() {
}

func (lb *ledgerBufferGCS) getLedgerGCSObject(sequence uint32) ([]byte, error) {
objectKey := lb.config.ledgerBatchConfig.GetObjectKeyFromSequenceNumber(sequence)
objectKey := lb.config.LedgerBatchConfig.GetObjectKeyFromSequenceNumber(sequence)

reader, err := lb.dataStore.GetFile(context.Background(), objectKey)
if err != nil {
Expand Down Expand Up @@ -228,42 +228,42 @@ func (lb *ledgerBufferGCS) getFromLedgerQueue() ([]byte, error) {
}

// Return a new GCSBackend instance.
func NewGCSBackend(ctx context.Context, config gcsBackendConfig) (*GCSBackend, error) {
func NewGCSBackend(ctx context.Context, config GCSBackendConfig) (*GCSBackend, error) {
// Check/set minimum config values
if config.storageUrl == "" {
if config.StorageUrl == "" {
return nil, errors.New("storageURL is not set")
}

if config.ledgerBatchConfig.FileSuffix == "" {
if config.LedgerBatchConfig.FileSuffix == "" {
return nil, errors.New("ledgerBatchConfig.FileSuffix is not set")
}

if config.ledgerBatchConfig.LedgersPerFile == 0 {
config.ledgerBatchConfig.LedgersPerFile = 1
if config.LedgerBatchConfig.LedgersPerFile == 0 {
config.LedgerBatchConfig.LedgersPerFile = 1
}

if config.ledgerBatchConfig.FilesPerPartition == 0 {
config.ledgerBatchConfig.FilesPerPartition = 1
if config.LedgerBatchConfig.FilesPerPartition == 0 {
config.LedgerBatchConfig.FilesPerPartition = 1
}

// Check/set minimum config values
if config.bufferConfig.BufferSize == 0 {
config.bufferConfig.BufferSize = 1
if config.BufferConfig.BufferSize == 0 {
config.BufferConfig.BufferSize = 1
}

if config.bufferConfig.NumWorkers == 0 {
config.bufferConfig.NumWorkers = 1
if config.BufferConfig.NumWorkers == 0 {
config.BufferConfig.NumWorkers = 1
}

ctx, cancel := context.WithCancelCause(ctx)

dataStore, err := datastore.NewDataStore(ctx, config.dataStoreConfig, config.network)
dataStore, err := datastore.NewDataStore(ctx, config.DataStoreConfig, config.Network)
if err != nil {
return nil, err
}

ledgerMetaArchive := datastore.NewLedgerMetaArchive("", 0, 0)
decoder, err := compressxdr.NewXDRDecoder(config.compressionType, nil)
decoder, err := compressxdr.NewXDRDecoder(config.CompressionType, nil)
if err != nil {
return nil, err
}
Expand All @@ -285,11 +285,11 @@ func (gcsb *GCSBackend) GetLatestLedgerSequence(ctx context.Context) (uint32, er
var err error
var archive historyarchive.ArchiveInterface

if archive, err = datastore.CreateHistoryArchiveFromNetworkName(ctx, gcsb.config.network); err != nil {
if archive, err = datastore.CreateHistoryArchiveFromNetworkName(ctx, gcsb.config.Network); err != nil {
return 0, err
}

resumableManager := datastore.NewResumableManager(gcsb.dataStore, gcsb.config.network, gcsb.config.ledgerBatchConfig, archive)
resumableManager := datastore.NewResumableManager(gcsb.dataStore, gcsb.config.Network, gcsb.config.LedgerBatchConfig, archive)
// Start at 2 to skip the genesis ledger
absentLedger, ok, err := resumableManager.FindStart(ctx, 2, 0)
if err != nil {
Expand Down
18 changes: 9 additions & 9 deletions ingest/ledgerbackend/gcs_backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/stretchr/testify/assert"
)

func createGCSBackendConfigForTesting() gcsBackendConfig {
func createGCSBackendConfigForTesting() GCSBackendConfig {
bufferConfig := BufferConfig{
BufferSize: 1000,
NumWorkers: 5,
Expand All @@ -30,13 +30,13 @@ func createGCSBackendConfigForTesting() gcsBackendConfig {
FileSuffix: ".xdr.gz",
}

return gcsBackendConfig{
bufferConfig: bufferConfig,
dataStoreConfig: dataStoreConfig,
ledgerBatchConfig: ledgerBatchConfig,
storageUrl: "testURL",
network: "testnet",
compressionType: compressxdr.GZIP,
return GCSBackendConfig{
BufferConfig: bufferConfig,
DataStoreConfig: dataStoreConfig,
LedgerBatchConfig: ledgerBatchConfig,
StorageUrl: "testURL",
Network: "testnet",
CompressionType: compressxdr.GZIP,
}
}

Expand All @@ -45,7 +45,7 @@ func createGCSBackendForTesting() GCSBackend {
ctx := context.Background()
mockDataStore := new(datastore.MockDataStore)
ledgerMetaArchive := datastore.NewLedgerMetaArchive("", 0, 0)
decoder, _ := compressxdr.NewXDRDecoder(config.compressionType, nil)
decoder, _ := compressxdr.NewXDRDecoder(config.CompressionType, nil)

return GCSBackend{
config: config,
Expand Down

0 comments on commit 4b1b50d

Please sign in to comment.