diff --git a/services/horizon/internal/db2/history/account_loader.go b/services/horizon/internal/db2/history/account_loader.go index e9fd9bedea..afba3c01e0 100644 --- a/services/horizon/internal/db2/history/account_loader.go +++ b/services/horizon/internal/db2/history/account_loader.go @@ -28,7 +28,7 @@ const loaderLookupBatchSize = 50000 // Value implements the database/sql/driver Valuer interface. func (a FutureAccountID) Value() (driver.Value, error) { - return a.loader.GetNow(a.address), nil + return a.loader.GetNow(a.address) } // AccountLoader will map account addresses to their history @@ -71,11 +71,15 @@ func (a *AccountLoader) GetFuture(address string) FutureAccountID { // GetNow should only be called on values which were registered by // GetFuture() calls. Also, Exec() must be called before any GetNow // call can succeed. -func (a *AccountLoader) GetNow(address string) int64 { - if id, ok := a.ids[address]; !ok { - panic(fmt.Errorf("address %v not present", address)) +func (a *AccountLoader) GetNow(address string) (int64, error) { + if !a.sealed { + return 0, fmt.Errorf(`invalid account loader state, + Exec was not called yet to properly seal and resolve %v id`, address) + } + if internalID, ok := a.ids[address]; !ok { + return 0, fmt.Errorf(`account loader address %q was not found`, address) } else { - return id + return internalID, nil } } @@ -207,3 +211,7 @@ func NewAccountLoaderStub() AccountLoaderStub { func (a AccountLoaderStub) Insert(address string, id int64) { a.Loader.ids[address] = id } + +func (a AccountLoaderStub) Sealed() { + a.Loader.sealed = true +} diff --git a/services/horizon/internal/db2/history/account_loader_test.go b/services/horizon/internal/db2/history/account_loader_test.go index 11047f3be2..54d2c7a143 100644 --- a/services/horizon/internal/db2/history/account_loader_test.go +++ b/services/horizon/internal/db2/history/account_loader_test.go @@ -22,16 +22,11 @@ func TestAccountLoader(t *testing.T) { } loader := NewAccountLoader() - var futures []FutureAccountID for _, address := range addresses { future := loader.GetFuture(address) - futures = append(futures, future) - assert.Panics(t, func() { - loader.GetNow(address) - }) - assert.Panics(t, func() { - future.Value() - }) + _, err := future.Value() + assert.Error(t, err) + assert.Contains(t, err.Error(), `invalid account loader state,`) duplicateFuture := loader.GetFuture(address) assert.Equal(t, future, duplicateFuture) } @@ -42,15 +37,16 @@ func TestAccountLoader(t *testing.T) { }) q := &Q{session} - for i, address := range addresses { - future := futures[i] - id := loader.GetNow(address) - val, err := future.Value() + for _, address := range addresses { + internalId, err := loader.GetNow(address) assert.NoError(t, err) - assert.Equal(t, id, val) var account Account assert.NoError(t, q.AccountByAddress(context.Background(), &account, address)) - assert.Equal(t, account.ID, id) + assert.Equal(t, account.ID, internalId) assert.Equal(t, account.Address, address) } + + _, err := loader.GetNow("not present") + assert.Error(t, err) + assert.Contains(t, err.Error(), `was not found`) } diff --git a/services/horizon/internal/db2/history/asset_loader.go b/services/horizon/internal/db2/history/asset_loader.go index 6ef3d7a350..ca30c671e9 100644 --- a/services/horizon/internal/db2/history/asset_loader.go +++ b/services/horizon/internal/db2/history/asset_loader.go @@ -41,7 +41,7 @@ type FutureAssetID struct { // Value implements the database/sql/driver Valuer interface. func (a FutureAssetID) Value() (driver.Value, error) { - return a.loader.GetNow(a.asset), nil + return a.loader.GetNow(a.asset) } // AssetLoader will map assets to their history @@ -81,11 +81,15 @@ func (a *AssetLoader) GetFuture(asset AssetKey) FutureAssetID { // GetNow should only be called on values which were registered by // GetFuture() calls. Also, Exec() must be called before any GetNow // call can succeed. -func (a *AssetLoader) GetNow(asset AssetKey) int64 { - if id, ok := a.ids[asset]; !ok { - panic(fmt.Errorf("asset %v not present", asset)) +func (a *AssetLoader) GetNow(asset AssetKey) (int64, error) { + if !a.sealed { + return 0, fmt.Errorf(`invalid asset loader state, + Exec was not called yet to properly seal and resolve %v id`, asset) + } + if internalID, ok := a.ids[asset]; !ok { + return 0, fmt.Errorf(`asset loader id %v was not found`, asset) } else { - return id + return internalID, nil } } @@ -213,3 +217,7 @@ func NewAssetLoaderStub() AssetLoaderStub { func (a AssetLoaderStub) Insert(asset AssetKey, id int64) { a.Loader.ids[asset] = id } + +func (a AssetLoaderStub) Sealed() { + a.Loader.sealed = true +} diff --git a/services/horizon/internal/db2/history/asset_loader_test.go b/services/horizon/internal/db2/history/asset_loader_test.go index 99f510266c..65932abbd3 100644 --- a/services/horizon/internal/db2/history/asset_loader_test.go +++ b/services/horizon/internal/db2/history/asset_loader_test.go @@ -36,16 +36,11 @@ func TestAssetLoader(t *testing.T) { } loader := NewAssetLoader() - var futures []FutureAssetID for _, key := range keys { future := loader.GetFuture(key) - futures = append(futures, future) - assert.Panics(t, func() { - loader.GetNow(key) - }) - assert.Panics(t, func() { - future.Value() - }) + _, err := future.Value() + assert.Error(t, err) + assert.Contains(t, err.Error(), `invalid asset loader state,`) duplicateFuture := loader.GetFuture(key) assert.Equal(t, future, duplicateFuture) } @@ -56,12 +51,9 @@ func TestAssetLoader(t *testing.T) { }) q := &Q{session} - for i, key := range keys { - future := futures[i] - internalID := loader.GetNow(key) - val, err := future.Value() + for _, key := range keys { + internalID, err := loader.GetNow(key) assert.NoError(t, err) - assert.Equal(t, internalID, val) var assetXDR xdr.Asset if key.Type == "native" { assetXDR = xdr.MustNewNativeAsset() @@ -72,4 +64,8 @@ func TestAssetLoader(t *testing.T) { assert.NoError(t, err) assert.Equal(t, assetID, internalID) } + + _, err := loader.GetNow(AssetKey{}) + assert.Error(t, err) + assert.Contains(t, err.Error(), `was not found`) } diff --git a/services/horizon/internal/db2/history/claimable_balance_loader.go b/services/horizon/internal/db2/history/claimable_balance_loader.go index a077eb683e..dd7dee4ea5 100644 --- a/services/horizon/internal/db2/history/claimable_balance_loader.go +++ b/services/horizon/internal/db2/history/claimable_balance_loader.go @@ -23,7 +23,7 @@ type FutureClaimableBalanceID struct { // Value implements the database/sql/driver Valuer interface. func (a FutureClaimableBalanceID) Value() (driver.Value, error) { - return a.loader.getNow(a.id), nil + return a.loader.getNow(a.id) } // ClaimableBalanceLoader will map claimable balance ids to their internal @@ -64,11 +64,15 @@ func (a *ClaimableBalanceLoader) GetFuture(id string) FutureClaimableBalanceID { // getNow should only be called on values which were registered by // GetFuture() calls. Also, Exec() must be called before any getNow // call can succeed. -func (a *ClaimableBalanceLoader) getNow(id string) int64 { +func (a *ClaimableBalanceLoader) getNow(id string) (int64, error) { + if !a.sealed { + return 0, fmt.Errorf(`invalid claimable balance loader state, + Exec was not called yet to properly seal and resolve %v id`, id) + } if internalID, ok := a.ids[id]; !ok { - panic(fmt.Errorf("id %v not present", id)) + return 0, fmt.Errorf(`claimable balance loader id %q was not found`, id) } else { - return internalID + return internalID, nil } } diff --git a/services/horizon/internal/db2/history/claimable_balance_loader_test.go b/services/horizon/internal/db2/history/claimable_balance_loader_test.go index b119daa674..4dd7324521 100644 --- a/services/horizon/internal/db2/history/claimable_balance_loader_test.go +++ b/services/horizon/internal/db2/history/claimable_balance_loader_test.go @@ -32,12 +32,9 @@ func TestClaimableBalanceLoader(t *testing.T) { for _, id := range ids { future := loader.GetFuture(id) futures = append(futures, future) - assert.Panics(t, func() { - loader.getNow(id) - }) - assert.Panics(t, func() { - future.Value() - }) + _, err := future.Value() + assert.Error(t, err) + assert.Contains(t, err.Error(), `invalid claimable balance loader state,`) duplicateFuture := loader.GetFuture(id) assert.Equal(t, future, duplicateFuture) } @@ -50,13 +47,16 @@ func TestClaimableBalanceLoader(t *testing.T) { q := &Q{session} for i, id := range ids { future := futures[i] - internalID := loader.getNow(id) - val, err := future.Value() + internalID, err := future.Value() assert.NoError(t, err) - assert.Equal(t, internalID, val) cb, err := q.ClaimableBalanceByID(context.Background(), id) assert.NoError(t, err) assert.Equal(t, cb.BalanceID, id) assert.Equal(t, cb.InternalID, internalID) } + + futureCb := &FutureClaimableBalanceID{id: "not-present", loader: loader} + _, err := futureCb.Value() + assert.Error(t, err) + assert.Contains(t, err.Error(), `was not found`) } diff --git a/services/horizon/internal/db2/history/liquidity_pool_loader.go b/services/horizon/internal/db2/history/liquidity_pool_loader.go index 7c2fe6fd4d..7355d10ffa 100644 --- a/services/horizon/internal/db2/history/liquidity_pool_loader.go +++ b/services/horizon/internal/db2/history/liquidity_pool_loader.go @@ -23,7 +23,7 @@ type FutureLiquidityPoolID struct { // Value implements the database/sql/driver Valuer interface. func (a FutureLiquidityPoolID) Value() (driver.Value, error) { - return a.loader.GetNow(a.id), nil + return a.loader.GetNow(a.id) } // LiquidityPoolLoader will map liquidity pools to their internal @@ -64,11 +64,15 @@ func (a *LiquidityPoolLoader) GetFuture(id string) FutureLiquidityPoolID { // GetNow should only be called on values which were registered by // GetFuture() calls. Also, Exec() must be called before any GetNow // call can succeed. -func (a *LiquidityPoolLoader) GetNow(id string) int64 { - if id, ok := a.ids[id]; !ok { - panic(fmt.Errorf("id %v not present", id)) +func (a *LiquidityPoolLoader) GetNow(id string) (int64, error) { + if !a.sealed { + return 0, fmt.Errorf(`invalid liquidity pool loader state, + Exec was not called yet to properly seal and resolve %v id`, id) + } + if internalID, ok := a.ids[id]; !ok { + return 0, fmt.Errorf(`liquidity pool loader id %q was not found`, id) } else { - return id + return internalID, nil } } @@ -158,3 +162,7 @@ func NewLiquidityPoolLoaderStub() LiquidityPoolLoaderStub { func (a LiquidityPoolLoaderStub) Insert(lp string, id int64) { a.Loader.ids[lp] = id } + +func (a LiquidityPoolLoaderStub) Sealed() { + a.Loader.sealed = true +} diff --git a/services/horizon/internal/db2/history/liquidity_pool_loader_test.go b/services/horizon/internal/db2/history/liquidity_pool_loader_test.go index e2b1e05beb..6e5b4addf7 100644 --- a/services/horizon/internal/db2/history/liquidity_pool_loader_test.go +++ b/services/horizon/internal/db2/history/liquidity_pool_loader_test.go @@ -25,16 +25,11 @@ func TestLiquidityPoolLoader(t *testing.T) { } loader := NewLiquidityPoolLoader() - var futures []FutureLiquidityPoolID for _, id := range ids { future := loader.GetFuture(id) - futures = append(futures, future) - assert.Panics(t, func() { - loader.GetNow(id) - }) - assert.Panics(t, func() { - future.Value() - }) + _, err := future.Value() + assert.Error(t, err) + assert.Contains(t, err.Error(), `invalid liquidity pool loader state,`) duplicateFuture := loader.GetFuture(id) assert.Equal(t, future, duplicateFuture) } @@ -45,15 +40,16 @@ func TestLiquidityPoolLoader(t *testing.T) { }) q := &Q{session} - for i, id := range ids { - future := futures[i] - internalID := loader.GetNow(id) - val, err := future.Value() + for _, id := range ids { + internalID, err := loader.GetNow(id) assert.NoError(t, err) - assert.Equal(t, internalID, val) lp, err := q.LiquidityPoolByID(context.Background(), id) assert.NoError(t, err) assert.Equal(t, lp.PoolID, id) assert.Equal(t, lp.InternalID, internalID) } + + _, err := loader.GetNow("not present") + assert.Error(t, err) + assert.Contains(t, err.Error(), `was not found`) } diff --git a/services/horizon/internal/db2/history/operation_participant_batch_insert_builder_test.go b/services/horizon/internal/db2/history/operation_participant_batch_insert_builder_test.go index fc2ca9c831..508bfa22cf 100644 --- a/services/horizon/internal/db2/history/operation_participant_batch_insert_builder_test.go +++ b/services/horizon/internal/db2/history/operation_participant_batch_insert_builder_test.go @@ -43,6 +43,8 @@ func TestAddOperationParticipants(t *testing.T) { op := ops[0] tt.Assert.Equal(int64(240518172673), op.OperationID) - tt.Assert.Equal(accountLoader.GetNow(address), op.AccountID) + val, err := accountLoader.GetNow(address) + tt.Assert.NoError(err) + tt.Assert.Equal(val, op.AccountID) } } diff --git a/services/horizon/internal/db2/history/participants_test.go b/services/horizon/internal/db2/history/participants_test.go index 16671098bf..07f6d59c3e 100644 --- a/services/horizon/internal/db2/history/participants_test.go +++ b/services/horizon/internal/db2/history/participants_test.go @@ -63,7 +63,9 @@ func TestTransactionParticipantsBatch(t *testing.T) { {TransactionID: 2}, } for i := range expected { - expected[i].AccountID = accountLoader.GetNow(addresses[i]) + val, err := accountLoader.GetNow(addresses[i]) + tt.Assert.NoError(err) + expected[i].AccountID = val } tt.Assert.ElementsMatch(expected, participants) } diff --git a/services/horizon/internal/ingest/processor_runner.go b/services/horizon/internal/ingest/processor_runner.go index 4e360b7769..f5be5ae6c3 100644 --- a/services/horizon/internal/ingest/processor_runner.go +++ b/services/horizon/internal/ingest/processor_runner.go @@ -140,8 +140,9 @@ func (s *ProcessorRunner) buildTransactionProcessor( accountLoader := history.NewAccountLoader() assetLoader := history.NewAssetLoader() lpLoader := history.NewLiquidityPoolLoader() + cbLoader := history.NewClaimableBalanceLoader() - lazyLoaders := []horizonLazyLoader{accountLoader, assetLoader, lpLoader} + lazyLoaders := []horizonLazyLoader{accountLoader, assetLoader, lpLoader, cbLoader} statsLedgerTransactionProcessor := &statsLedgerTransactionProcessor{ StatsLedgerTransactionProcessor: ledgerTransactionStats, @@ -158,7 +159,7 @@ func (s *ProcessorRunner) buildTransactionProcessor( processors.NewParticipantsProcessor(accountLoader, s.historyQ.NewTransactionParticipantsBatchInsertBuilder(), s.historyQ.NewOperationParticipantBatchInsertBuilder()), processors.NewTransactionProcessor(s.historyQ.NewTransactionBatchInsertBuilder()), - processors.NewClaimableBalancesTransactionProcessor(history.NewClaimableBalanceLoader(), + processors.NewClaimableBalancesTransactionProcessor(cbLoader, s.historyQ.NewTransactionClaimableBalanceBatchInsertBuilder(), s.historyQ.NewOperationClaimableBalanceBatchInsertBuilder()), processors.NewLiquidityPoolsTransactionProcessor(lpLoader, s.historyQ.NewTransactionLiquidityPoolBatchInsertBuilder(), s.historyQ.NewOperationLiquidityPoolBatchInsertBuilder())} @@ -328,7 +329,7 @@ func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger xdr.LedgerClos } // ensure capture of the ledger to history regardless of whether it has transactions. - ledgersProcessor := processors.NewLedgerProcessor(s.historyQ.NewLedgerBatchInsertBuilder(), int(ledger.LedgerHeaderHistoryEntry().Header.LedgerVersion)) + ledgersProcessor := processors.NewLedgerProcessor(s.historyQ.NewLedgerBatchInsertBuilder(), CurrentVersion) ledgersProcessor.ProcessLedger(ledger) transactionReader, err = ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(s.config.NetworkPassphrase, ledger) @@ -406,9 +407,6 @@ func (s *ProcessorRunner) RunAllProcessorsOnLedger(ledger xdr.LedgerCloseMeta) ( stats.transactionStats, stats.transactionDurations, stats.tradeStats, err = s.RunTransactionProcessorsOnLedger(ledger) - if err != nil { - return - } return } diff --git a/services/horizon/internal/ingest/processors/trades_processor.go b/services/horizon/internal/ingest/processors/trades_processor.go index b1084c6e08..d5ee89f51e 100644 --- a/services/horizon/internal/ingest/processors/trades_processor.go +++ b/services/horizon/internal/ingest/processors/trades_processor.go @@ -92,17 +92,38 @@ func (p *TradeProcessor) Flush(ctx context.Context, session db.SessionInterface) for _, trade := range p.trades { row := trade.row if trade.sellerAccount != "" { - row.BaseAccountID = null.IntFrom(p.accountLoader.GetNow(trade.sellerAccount)) + val, err := p.accountLoader.GetNow(trade.sellerAccount) + if err != nil { + return err + } + row.BaseAccountID = null.IntFrom(val) } if trade.buyerAccount != "" { - row.CounterAccountID = null.IntFrom(p.accountLoader.GetNow(trade.buyerAccount)) + val, err := p.accountLoader.GetNow(trade.buyerAccount) + if err != nil { + return err + } + row.CounterAccountID = null.IntFrom(val) } if trade.liquidityPoolID != "" { - row.BaseLiquidityPoolID = null.IntFrom(p.lpLoader.GetNow(trade.liquidityPoolID)) + val, err := p.lpLoader.GetNow(trade.liquidityPoolID) + if err != nil { + return err + } + row.BaseLiquidityPoolID = null.IntFrom(val) + } + + val, err := p.assetLoader.GetNow(history.AssetKeyFromXDR(trade.soldAsset)) + if err != nil { + return err } + row.BaseAssetID = val - row.BaseAssetID = p.assetLoader.GetNow(history.AssetKeyFromXDR(trade.soldAsset)) - row.CounterAssetID = p.assetLoader.GetNow(history.AssetKeyFromXDR(trade.boughtAsset)) + val, err = p.assetLoader.GetNow(history.AssetKeyFromXDR(trade.boughtAsset)) + if err != nil { + return err + } + row.CounterAssetID = val if row.BaseAssetID > row.CounterAssetID { row.BaseIsSeller = false diff --git a/services/horizon/internal/ingest/processors/trades_processor_test.go b/services/horizon/internal/ingest/processors/trades_processor_test.go index 5b2a2f20e3..864985f367 100644 --- a/services/horizon/internal/ingest/processors/trades_processor_test.go +++ b/services/horizon/internal/ingest/processors/trades_processor_test.go @@ -727,12 +727,15 @@ func (s *TradeProcessorTestSuiteLedger) mockReadTradeTransactions() []history.In } func (s *TradeProcessorTestSuiteLedger) stubLoaders() { + s.accountLoader.Sealed() for key, id := range s.unmuxedAccountToID { s.accountLoader.Insert(key, id) } + s.assetLoader.Sealed() for key, id := range s.assetToID { s.assetLoader.Insert(key, id.ID) } + s.lpLoader.Sealed() for key, id := range s.lpToID { s.lpLoader.Insert(PoolIDToString(key), id) }