Skip to content

Commit

Permalink
#4909: initial wip on integrating loaders and builders into processor…
Browse files Browse the repository at this point in the history
… runners
  • Loading branch information
sreuland committed Oct 16, 2023
1 parent 8775648 commit da5f7de
Show file tree
Hide file tree
Showing 9 changed files with 231 additions and 129 deletions.
2 changes: 2 additions & 0 deletions services/horizon/internal/ingest/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ func (r resumeState) run(s *system) (transition, error) {

// Update cursor if there's more than one ingesting instance: either
// Captive-Core or DB ingestion connected to another Stellar-Core.
// remove now?
if err = s.updateCursor(lastIngestedLedger); err != nil {
// Don't return updateCursor error.
log.WithError(err).Warn("error updating stellar-core cursor")
Expand Down Expand Up @@ -524,6 +525,7 @@ func (r resumeState) run(s *system) (transition, error) {
return retryResume(r), err
}

//TODO remove now? stellar-core-db-url is removed
if err = s.updateCursor(ingestLedger); err != nil {
// Don't return updateCursor error.
log.WithError(err).Warn("error updating stellar-core cursor")
Expand Down
28 changes: 21 additions & 7 deletions services/horizon/internal/ingest/group_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (

"github.com/stellar/go/ingest"
"github.com/stellar/go/services/horizon/internal/ingest/processors"
"github.com/stellar/go/support/db"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
)

type processorsRunDurations map[string]time.Duration
Expand Down Expand Up @@ -51,33 +53,45 @@ func (g groupChangeProcessors) Commit(ctx context.Context) error {
}

type groupTransactionProcessors struct {
processors []horizonTransactionProcessor
processors []horizonTransactionProcessor
lazyLoaders []horizonLazyLoader
processorsRunDurations
}

func newGroupTransactionProcessors(processors []horizonTransactionProcessor) *groupTransactionProcessors {
func newGroupTransactionProcessors(processors []horizonTransactionProcessor, lazyLoaders []horizonLazyLoader) *groupTransactionProcessors {
return &groupTransactionProcessors{
processors: processors,
processorsRunDurations: make(map[string]time.Duration),
lazyLoaders: lazyLoaders,
}
}

func (g groupTransactionProcessors) ProcessTransaction(ctx context.Context, tx ingest.LedgerTransaction) error {
func (g groupTransactionProcessors) ProcessTransaction(lcm xdr.LedgerCloseMeta, tx ingest.LedgerTransaction) error {
for _, p := range g.processors {
startTime := time.Now()
if err := p.ProcessTransaction(ctx, tx); err != nil {
if err := p.ProcessTransaction(lcm, tx); err != nil {
return errors.Wrapf(err, "error in %T.ProcessTransaction", p)
}
g.AddRunDuration(fmt.Sprintf("%T", p), startTime)
}
return nil
}

func (g groupTransactionProcessors) Commit(ctx context.Context) error {
func (g groupTransactionProcessors) Flush(ctx context.Context, session db.SessionInterface) error {
// need to trigger all lazy loaders to now resolve their future placeholders
// with real db values first
for _, loader := range g.lazyLoaders {
if err := loader.Exec(ctx, session); err != nil {
return errors.Wrapf(err, "error during lazy loader resolution, %T.Exec", loader)
}
}

// now flush each processor which may call loader.GetNow(), which
// required the prior loader.Exec() to have been called.
for _, p := range g.processors {
startTime := time.Now()
if err := p.Commit(ctx); err != nil {
return errors.Wrapf(err, "error in %T.Commit", p)
if err := p.Flush(ctx, session); err != nil {
return errors.Wrapf(err, "error in %T.Flush", p)
}
g.AddRunDuration(fmt.Sprintf("%T", p), startTime)
}
Expand Down
40 changes: 23 additions & 17 deletions services/horizon/internal/ingest/group_processors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/stretchr/testify/suite"

"github.com/stellar/go/ingest"
"github.com/stellar/go/support/db"
"github.com/stellar/go/xdr"
)

var _ horizonChangeProcessor = (*mockHorizonChangeProcessor)(nil)
Expand All @@ -35,12 +37,12 @@ type mockHorizonTransactionProcessor struct {
mock.Mock
}

func (m *mockHorizonTransactionProcessor) ProcessTransaction(ctx context.Context, transaction ingest.LedgerTransaction) error {
args := m.Called(ctx, transaction)
func (m *mockHorizonTransactionProcessor) ProcessTransaction(lcm xdr.LedgerCloseMeta, transaction ingest.LedgerTransaction) error {
args := m.Called(lcm, transaction)
return args.Error(0)
}

func (m *mockHorizonTransactionProcessor) Commit(ctx context.Context) error {
func (m *mockHorizonTransactionProcessor) Flush(ctx context.Context, session db.SessionInterface) error {
args := m.Called(ctx)
return args.Error(0)
}
Expand Down Expand Up @@ -124,6 +126,7 @@ type GroupTransactionProcessorsTestSuiteLedger struct {
processors *groupTransactionProcessors
processorA *mockHorizonTransactionProcessor
processorB *mockHorizonTransactionProcessor
session db.SessionInterface
}

func TestGroupTransactionProcessorsTestSuiteLedger(t *testing.T) {
Expand All @@ -137,7 +140,8 @@ func (s *GroupTransactionProcessorsTestSuiteLedger) SetupTest() {
s.processors = newGroupTransactionProcessors([]horizonTransactionProcessor{
s.processorA,
s.processorB,
})
}, nil)
s.session = &db.MockSession{}
}

func (s *GroupTransactionProcessorsTestSuiteLedger) TearDownTest() {
Expand All @@ -147,46 +151,48 @@ func (s *GroupTransactionProcessorsTestSuiteLedger) TearDownTest() {

func (s *GroupTransactionProcessorsTestSuiteLedger) TestProcessTransactionFails() {
transaction := ingest.LedgerTransaction{}
closeMeta := xdr.LedgerCloseMeta{}
s.processorA.
On("ProcessTransaction", s.ctx, transaction).
On("ProcessTransaction", closeMeta, transaction).
Return(errors.New("transient error")).Once()

err := s.processors.ProcessTransaction(s.ctx, transaction)
err := s.processors.ProcessTransaction(closeMeta, transaction)
s.Assert().Error(err)
s.Assert().EqualError(err, "error in *ingest.mockHorizonTransactionProcessor.ProcessTransaction: transient error")
}

func (s *GroupTransactionProcessorsTestSuiteLedger) TestProcessTransactionSucceeds() {
transaction := ingest.LedgerTransaction{}
closeMeta := xdr.LedgerCloseMeta{}
s.processorA.
On("ProcessTransaction", s.ctx, transaction).
On("ProcessTransaction", closeMeta, transaction).
Return(nil).Once()
s.processorB.
On("ProcessTransaction", s.ctx, transaction).
On("ProcessTransaction", closeMeta, transaction).
Return(nil).Once()

err := s.processors.ProcessTransaction(s.ctx, transaction)
err := s.processors.ProcessTransaction(closeMeta, transaction)
s.Assert().NoError(err)
}

func (s *GroupTransactionProcessorsTestSuiteLedger) TestCommitFails() {
func (s *GroupTransactionProcessorsTestSuiteLedger) TestFlushFails() {
s.processorA.
On("Commit", s.ctx).
On("Flush", s.ctx, s.session).
Return(errors.New("transient error")).Once()

err := s.processors.Commit(s.ctx)
err := s.processors.Flush(s.ctx, s.session)
s.Assert().Error(err)
s.Assert().EqualError(err, "error in *ingest.mockHorizonTransactionProcessor.Commit: transient error")
s.Assert().EqualError(err, "error in *ingest.mockHorizonTransactionProcessor.Flush: transient error")
}

func (s *GroupTransactionProcessorsTestSuiteLedger) TestCommitSucceeds() {
func (s *GroupTransactionProcessorsTestSuiteLedger) TestFlushSucceeds() {
s.processorA.
On("Commit", s.ctx).
On("Flush", s.ctx, s.session).
Return(nil).Once()
s.processorB.
On("Commit", s.ctx).
On("Flush", s.ctx, s.session).
Return(nil).Once()

err := s.processors.Commit(s.ctx)
err := s.processors.Flush(s.ctx, s.session)
s.Assert().NoError(err)
}
68 changes: 38 additions & 30 deletions services/horizon/internal/ingest/processor_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ type horizonChangeProcessor interface {

type horizonTransactionProcessor interface {
processors.LedgerTransactionProcessor
Commit(context.Context) error
}

type horizonLazyLoader interface {
Exec(ctx context.Context, session db.SessionInterface) error
}

type statsChangeProcessor struct {
Expand All @@ -47,10 +50,6 @@ type statsLedgerTransactionProcessor struct {
*processors.StatsLedgerTransactionProcessor
}

func (statsLedgerTransactionProcessor) Commit(ctx context.Context) error {
return nil
}

type ledgerStats struct {
changeStats ingest.StatsChangeProcessorResults
changeDurations processorsRunDurations
Expand Down Expand Up @@ -135,24 +134,36 @@ func buildChangeProcessor(
func (s *ProcessorRunner) buildTransactionProcessor(
ledgerTransactionStats *processors.StatsLedgerTransactionProcessor,
tradeProcessor *processors.TradeProcessor,
ledger xdr.LedgerHeaderHistoryEntry,
ledger xdr.LedgerCloseMeta,
txBuilder history.TransactionBatchInsertBuilder,
) *groupTransactionProcessors {
accountLoader := history.NewAccountLoader()
assetLoader := history.NewAssetLoader()
lpLoader := history.NewLiquidityPoolLoader()

lazyLoaders := []horizonLazyLoader{accountLoader, assetLoader, lpLoader}

statsLedgerTransactionProcessor := &statsLedgerTransactionProcessor{
StatsLedgerTransactionProcessor: ledgerTransactionStats,
}
*tradeProcessor = *processors.NewTradeProcessor(s.session, s.historyQ, ledger)
sequence := uint32(ledger.Header.LedgerSeq)
return newGroupTransactionProcessors([]horizonTransactionProcessor{
*tradeProcessor = *processors.NewTradeProcessor(accountLoader,
lpLoader, assetLoader, s.historyQ.NewTradeBatchInsertBuilder())

processors := []horizonTransactionProcessor{
statsLedgerTransactionProcessor,
processors.NewEffectProcessor(s.session, s.historyQ, sequence),
processors.NewLedgerProcessor(s.session, s.historyQ, ledger, CurrentVersion),
processors.NewOperationProcessor(s.session, s.historyQ, sequence),
processors.NewEffectProcessor(accountLoader, s.historyQ.NewEffectBatchInsertBuilder()),
processors.NewLedgerProcessor(s.historyQ.NewLedgerBatchInsertBuilder(), int(ledger.LedgerHeaderHistoryEntry().Header.LedgerVersion)),
processors.NewOperationProcessor(s.historyQ.NewOperationBatchInsertBuilder()),
tradeProcessor,
processors.NewParticipantsProcessor(s.session, s.historyQ, sequence),
processors.NewTransactionProcessor(s.session, s.historyQ, sequence),
processors.NewClaimableBalancesTransactionProcessor(s.session, s.historyQ, sequence),
processors.NewLiquidityPoolsTransactionProcessor(s.session, s.historyQ, sequence),
})
processors.NewParticipantsProcessor(accountLoader,
s.historyQ.NewTransactionParticipantsBatchInsertBuilder(), s.historyQ.NewOperationParticipantBatchInsertBuilder()),
processors.NewTransactionProcessor(txBuilder),
processors.NewClaimableBalancesTransactionProcessor(history.NewClaimableBalanceLoader(),
s.historyQ.NewTransactionClaimableBalanceBatchInsertBuilder(), s.historyQ.NewOperationClaimableBalanceBatchInsertBuilder()),
processors.NewLiquidityPoolsTransactionProcessor(lpLoader,
s.historyQ.NewTransactionLiquidityPoolBatchInsertBuilder(), s.historyQ.NewOperationLiquidityPoolBatchInsertBuilder())}

return newGroupTransactionProcessors(processors, lazyLoaders)
}

func (s *ProcessorRunner) buildTransactionFilterer() *groupTransactionFilterers {
Expand All @@ -164,15 +175,15 @@ func (s *ProcessorRunner) buildTransactionFilterer() *groupTransactionFilterers
return newGroupTransactionFilterers(f)
}

func (s *ProcessorRunner) buildFilteredOutProcessor(ledger xdr.LedgerHeaderHistoryEntry) *groupTransactionProcessors {
func (s *ProcessorRunner) buildFilteredOutProcessor(txBuilder history.TransactionBatchInsertBuilder) *groupTransactionProcessors {
// when in online mode, the submission result processor must always run (regardless of filtering)
var p []horizonTransactionProcessor
if s.config.EnableIngestionFiltering {
txSubProc := processors.NewTransactionFilteredTmpProcessor(s.session, s.historyQ, uint32(ledger.Header.LedgerSeq))
txSubProc := processors.NewTransactionFilteredTmpProcessor(txBuilder)
p = append(p, txSubProc)
}

return newGroupTransactionProcessors(p)
return newGroupTransactionProcessors(p, nil)
}

// checkIfProtocolVersionSupported checks if this Horizon version supports the
Expand Down Expand Up @@ -321,36 +332,33 @@ func (s *ProcessorRunner) RunTransactionProcessorsOnLedger(ledger xdr.LedgerClos
err = errors.Wrap(err, "Error while checking for supported protocol version")
return
}
header := transactionReader.GetHeader()

txBuilder := s.historyQ.NewTransactionBatchInsertBuilder()
groupTransactionFilterers := s.buildTransactionFilterer()
groupFilteredOutProcessors := s.buildFilteredOutProcessor(header)
groupFilteredOutProcessors := s.buildFilteredOutProcessor(txBuilder)
groupTransactionProcessors := s.buildTransactionProcessor(
&ledgerTransactionStats, &tradeProcessor, header)
&ledgerTransactionStats, &tradeProcessor, ledger, txBuilder)
err = processors.StreamLedgerTransactions(s.ctx,
groupTransactionFilterers,
groupFilteredOutProcessors,
groupTransactionProcessors,
transactionReader,
ledger,
)
if err != nil {
err = errors.Wrap(err, "Error streaming changes from ledger")
return
}

if s.config.EnableIngestionFiltering {
err = groupFilteredOutProcessors.Commit(s.ctx)
if err != nil {
err = errors.Wrap(err, "Error committing filtered changes from processor")
return
}
if time.Since(s.lastTransactionsTmpGC) > transactionsFilteredTmpGCPeriod {
s.historyQ.DeleteTransactionsFilteredTmpOlderThan(s.ctx, uint64(transactionsFilteredTmpGCPeriod.Seconds()))
}
}

err = groupTransactionProcessors.Commit(s.ctx)
err = groupTransactionProcessors.Flush(s.ctx, s.session)
if err != nil {
err = errors.Wrap(err, "Error committing changes from processor")
err = errors.Wrap(err, "Error flushing changes from processor")
return
}

Expand Down
Loading

0 comments on commit da5f7de

Please sign in to comment.