From f2cd035e103114f47bd6dff924b92cab85687c96 Mon Sep 17 00:00:00 2001 From: shawn Date: Thu, 18 Apr 2024 14:45:14 -0700 Subject: [PATCH] don't run filtered tmp processor when reingestion is enabled (#5283) --- services/horizon/cmd/db.go | 6 +- services/horizon/cmd/ingest.go | 57 +++---- services/horizon/internal/app.go | 53 +++--- services/horizon/internal/config.go | 1 - services/horizon/internal/flags.go | 10 +- services/horizon/internal/httpx/router.go | 53 +++--- .../internal/ingest/group_processors.go | 4 + services/horizon/internal/ingest/main.go | 6 +- .../internal/ingest/processor_runner.go | 37 ++-- .../internal/ingest/processor_runner_test.go | 89 +++------- services/horizon/internal/init.go | 1 - .../horizon/internal/integration/db_test.go | 160 +++++++++++++++++- .../internal/integration/parameters_test.go | 21 --- .../internal/test/integration/integration.go | 6 + 14 files changed, 299 insertions(+), 205 deletions(-) diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index 965d5d7173..cec51543c9 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -224,6 +224,10 @@ var dbReapCmd = &cobra.Command{ if err != nil { return err } + defer func() { + app.Shutdown() + app.CloseDB() + }() ctx := context.Background() app.UpdateHorizonLedgerState(ctx) return app.DeleteUnretainedHistory(ctx) @@ -409,7 +413,6 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, HistoryArchiveURLs: config.HistoryArchiveURLs, HistoryArchiveCaching: config.HistoryArchiveCaching, CheckpointFrequency: config.CheckpointFrequency, - ReingestEnabled: true, MaxReingestRetries: int(retries), ReingestRetryBackoffSeconds: int(retryBackoffSeconds), CaptiveCoreBinaryPath: config.CaptiveCoreBinaryPath, @@ -418,7 +421,6 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, CaptiveCoreStoragePath: config.CaptiveCoreStoragePath, StellarCoreURL: config.StellarCoreURL, RoundingSlippageFilter: config.RoundingSlippageFilter, - EnableIngestionFiltering: config.EnableIngestionFiltering, MaxLedgerPerFlush: maxLedgersPerFlush, SkipTxmeta: config.SkipTxmeta, } diff --git a/services/horizon/cmd/ingest.go b/services/horizon/cmd/ingest.go index 18452dc74a..864067da8f 100644 --- a/services/horizon/cmd/ingest.go +++ b/services/horizon/cmd/ingest.go @@ -125,17 +125,16 @@ var ingestVerifyRangeCmd = &cobra.Command{ } ingestConfig := ingest.Config{ - NetworkPassphrase: globalConfig.NetworkPassphrase, - HistorySession: horizonSession, - HistoryArchiveURLs: globalConfig.HistoryArchiveURLs, - HistoryArchiveCaching: globalConfig.HistoryArchiveCaching, - CaptiveCoreBinaryPath: globalConfig.CaptiveCoreBinaryPath, - CaptiveCoreConfigUseDB: globalConfig.CaptiveCoreConfigUseDB, - CheckpointFrequency: globalConfig.CheckpointFrequency, - CaptiveCoreToml: globalConfig.CaptiveCoreToml, - CaptiveCoreStoragePath: globalConfig.CaptiveCoreStoragePath, - RoundingSlippageFilter: globalConfig.RoundingSlippageFilter, - EnableIngestionFiltering: globalConfig.EnableIngestionFiltering, + NetworkPassphrase: globalConfig.NetworkPassphrase, + HistorySession: horizonSession, + HistoryArchiveURLs: globalConfig.HistoryArchiveURLs, + HistoryArchiveCaching: globalConfig.HistoryArchiveCaching, + CaptiveCoreBinaryPath: globalConfig.CaptiveCoreBinaryPath, + CaptiveCoreConfigUseDB: globalConfig.CaptiveCoreConfigUseDB, + CheckpointFrequency: globalConfig.CheckpointFrequency, + CaptiveCoreToml: globalConfig.CaptiveCoreToml, + CaptiveCoreStoragePath: globalConfig.CaptiveCoreStoragePath, + RoundingSlippageFilter: globalConfig.RoundingSlippageFilter, } system, err := ingest.NewSystem(ingestConfig) @@ -285,14 +284,13 @@ var ingestInitGenesisStateCmd = &cobra.Command{ } ingestConfig := ingest.Config{ - NetworkPassphrase: globalConfig.NetworkPassphrase, - HistorySession: horizonSession, - HistoryArchiveURLs: globalConfig.HistoryArchiveURLs, - CheckpointFrequency: globalConfig.CheckpointFrequency, - RoundingSlippageFilter: globalConfig.RoundingSlippageFilter, - EnableIngestionFiltering: globalConfig.EnableIngestionFiltering, - CaptiveCoreBinaryPath: globalConfig.CaptiveCoreBinaryPath, - CaptiveCoreConfigUseDB: globalConfig.CaptiveCoreConfigUseDB, + NetworkPassphrase: globalConfig.NetworkPassphrase, + HistorySession: horizonSession, + HistoryArchiveURLs: globalConfig.HistoryArchiveURLs, + CheckpointFrequency: globalConfig.CheckpointFrequency, + RoundingSlippageFilter: globalConfig.RoundingSlippageFilter, + CaptiveCoreBinaryPath: globalConfig.CaptiveCoreBinaryPath, + CaptiveCoreConfigUseDB: globalConfig.CaptiveCoreConfigUseDB, } system, err := ingest.NewSystem(ingestConfig) @@ -348,17 +346,16 @@ var ingestBuildStateCmd = &cobra.Command{ } ingestConfig := ingest.Config{ - NetworkPassphrase: globalConfig.NetworkPassphrase, - HistorySession: horizonSession, - HistoryArchiveURLs: globalConfig.HistoryArchiveURLs, - HistoryArchiveCaching: globalConfig.HistoryArchiveCaching, - CaptiveCoreBinaryPath: globalConfig.CaptiveCoreBinaryPath, - CaptiveCoreConfigUseDB: globalConfig.CaptiveCoreConfigUseDB, - CheckpointFrequency: globalConfig.CheckpointFrequency, - CaptiveCoreToml: globalConfig.CaptiveCoreToml, - CaptiveCoreStoragePath: globalConfig.CaptiveCoreStoragePath, - RoundingSlippageFilter: globalConfig.RoundingSlippageFilter, - EnableIngestionFiltering: globalConfig.EnableIngestionFiltering, + NetworkPassphrase: globalConfig.NetworkPassphrase, + HistorySession: horizonSession, + HistoryArchiveURLs: globalConfig.HistoryArchiveURLs, + HistoryArchiveCaching: globalConfig.HistoryArchiveCaching, + CaptiveCoreBinaryPath: globalConfig.CaptiveCoreBinaryPath, + CaptiveCoreConfigUseDB: globalConfig.CaptiveCoreConfigUseDB, + CheckpointFrequency: globalConfig.CheckpointFrequency, + CaptiveCoreToml: globalConfig.CaptiveCoreToml, + CaptiveCoreStoragePath: globalConfig.CaptiveCoreStoragePath, + RoundingSlippageFilter: globalConfig.RoundingSlippageFilter, } system, err := ingest.NewSystem(ingestConfig) diff --git a/services/horizon/internal/app.go b/services/horizon/internal/app.go index 927cffa773..36fc2031e8 100644 --- a/services/horizon/internal/app.go +++ b/services/horizon/internal/app.go @@ -156,9 +156,15 @@ func (a *App) Close() { func (a *App) waitForDone() { <-a.done - webShutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - a.webServer.Shutdown(webShutdownCtx) + a.Shutdown() +} + +func (a *App) Shutdown() { + if a.webServer != nil { + webShutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + a.webServer.Shutdown(webShutdownCtx) + } a.cancel() if a.ingester != nil { a.ingester.Shutdown() @@ -528,27 +534,26 @@ func (a *App) init() error { initTxSubMetrics(a) routerConfig := httpx.RouterConfig{ - DBSession: a.historyQ.SessionInterface, - TxSubmitter: a.submitter, - RateQuota: a.config.RateQuota, - BehindCloudflare: a.config.BehindCloudflare, - BehindAWSLoadBalancer: a.config.BehindAWSLoadBalancer, - SSEUpdateFrequency: a.config.SSEUpdateFrequency, - StaleThreshold: a.config.StaleThreshold, - ConnectionTimeout: a.config.ConnectionTimeout, - ClientQueryTimeout: a.config.ClientQueryTimeout, - MaxConcurrentRequests: a.config.MaxConcurrentRequests, - MaxHTTPRequestSize: a.config.MaxHTTPRequestSize, - NetworkPassphrase: a.config.NetworkPassphrase, - MaxPathLength: a.config.MaxPathLength, - MaxAssetsPerPathRequest: a.config.MaxAssetsPerPathRequest, - PathFinder: a.paths, - PrometheusRegistry: a.prometheusRegistry, - CoreGetter: a, - HorizonVersion: a.horizonVersion, - FriendbotURL: a.config.FriendbotURL, - EnableIngestionFiltering: a.config.EnableIngestionFiltering, - DisableTxSub: a.config.DisableTxSub, + DBSession: a.historyQ.SessionInterface, + TxSubmitter: a.submitter, + RateQuota: a.config.RateQuota, + BehindCloudflare: a.config.BehindCloudflare, + BehindAWSLoadBalancer: a.config.BehindAWSLoadBalancer, + SSEUpdateFrequency: a.config.SSEUpdateFrequency, + StaleThreshold: a.config.StaleThreshold, + ConnectionTimeout: a.config.ConnectionTimeout, + ClientQueryTimeout: a.config.ClientQueryTimeout, + MaxConcurrentRequests: a.config.MaxConcurrentRequests, + MaxHTTPRequestSize: a.config.MaxHTTPRequestSize, + NetworkPassphrase: a.config.NetworkPassphrase, + MaxPathLength: a.config.MaxPathLength, + MaxAssetsPerPathRequest: a.config.MaxAssetsPerPathRequest, + PathFinder: a.paths, + PrometheusRegistry: a.prometheusRegistry, + CoreGetter: a, + HorizonVersion: a.horizonVersion, + FriendbotURL: a.config.FriendbotURL, + DisableTxSub: a.config.DisableTxSub, HealthCheck: healthCheck{ session: a.historyQ.SessionInterface, ctx: a.ctx, diff --git a/services/horizon/internal/config.go b/services/horizon/internal/config.go index 4c8a45514f..2e4192a1c9 100644 --- a/services/horizon/internal/config.go +++ b/services/horizon/internal/config.go @@ -19,7 +19,6 @@ type Config struct { Port uint AdminPort uint - EnableIngestionFiltering bool CaptiveCoreBinaryPath string CaptiveCoreConfigPath string CaptiveCoreTomlParams ledgerbackend.CaptiveCoreTomlParams diff --git a/services/horizon/internal/flags.go b/services/horizon/internal/flags.go index 774140bb53..5489b57d50 100644 --- a/services/horizon/internal/flags.go +++ b/services/horizon/internal/flags.go @@ -279,12 +279,8 @@ func Flags() (*Config, support.ConfigOptions) { OptType: types.String, FlagDefault: "", Required: false, - ConfigKey: &config.EnableIngestionFiltering, CustomSetValue: func(opt *support.ConfigOption) error { - - // Always enable ingestion filtering by default. - config.EnableIngestionFiltering = true - + // ingestion filtering is always enabled, it has no rules by default. if val := viper.GetString(opt.Name); val != "" { stdLog.Printf( "DEPRECATED - No ingestion filter rules are defined by default, which equates to " + @@ -665,9 +661,11 @@ func Flags() (*Config, support.ConfigOptions) { Usage: "the batch size (in ledgers) to remove per reap from the Horizon database", UsedInCommands: IngestionCommands, CustomSetValue: func(opt *support.ConfigOption) error { - if val := viper.GetUint(opt.Name); val <= 0 || val > 500_000 { + val := viper.GetUint(opt.Name) + if val <= 0 || val > 500_000 { return fmt.Errorf("flag --history-retention-reap-count must be in range [1, 500,000]") } + *(opt.ConfigKey.(*uint)) = val return nil }, }, diff --git a/services/horizon/internal/httpx/router.go b/services/horizon/internal/httpx/router.go index 4ba978a96a..cd3b6821b0 100644 --- a/services/horizon/internal/httpx/router.go +++ b/services/horizon/internal/httpx/router.go @@ -34,25 +34,24 @@ type RouterConfig struct { RateQuota *throttled.RateQuota MaxConcurrentRequests uint - BehindCloudflare bool - BehindAWSLoadBalancer bool - SSEUpdateFrequency time.Duration - StaleThreshold uint - ConnectionTimeout time.Duration - ClientQueryTimeout time.Duration - MaxHTTPRequestSize uint - NetworkPassphrase string - MaxPathLength uint - MaxAssetsPerPathRequest int - PathFinder paths.Finder - PrometheusRegistry *prometheus.Registry - CoreGetter actions.CoreStateGetter - HorizonVersion string - FriendbotURL *url.URL - HealthCheck http.Handler - EnableIngestionFiltering bool - DisableTxSub bool - SkipTxMeta bool + BehindCloudflare bool + BehindAWSLoadBalancer bool + SSEUpdateFrequency time.Duration + StaleThreshold uint + ConnectionTimeout time.Duration + ClientQueryTimeout time.Duration + MaxHTTPRequestSize uint + NetworkPassphrase string + MaxPathLength uint + MaxAssetsPerPathRequest int + PathFinder paths.Finder + PrometheusRegistry *prometheus.Registry + CoreGetter actions.CoreStateGetter + HorizonVersion string + FriendbotURL *url.URL + HealthCheck http.Handler + DisableTxSub bool + SkipTxMeta bool } type Router struct { @@ -375,13 +374,11 @@ func (r *Router) addRoutes(config *RouterConfig, rateLimiter *throttled.HTTPRate r.Internal.Get("/metrics", promhttp.HandlerFor(config.PrometheusRegistry, promhttp.HandlerOpts{}).ServeHTTP) r.Internal.Get("/debug/pprof/heap", pprof.Index) r.Internal.Get("/debug/pprof/profile", pprof.Profile) - if config.EnableIngestionFiltering { - r.Internal.Route("/ingestion/filters", func(r chi.Router) { - handler := actions.FilterConfigHandler{} - r.With(historyMiddleware).Put("/asset", handler.UpdateAssetConfig) - r.With(historyMiddleware).Put("/account", handler.UpdateAccountConfig) - r.With(historyMiddleware).Get("/asset", handler.GetAssetConfig) - r.With(historyMiddleware).Get("/account", handler.GetAccountConfig) - }) - } + r.Internal.Route("/ingestion/filters", func(r chi.Router) { + handler := actions.FilterConfigHandler{} + r.With(historyMiddleware).Put("/asset", handler.UpdateAssetConfig) + r.With(historyMiddleware).Put("/account", handler.UpdateAccountConfig) + r.With(historyMiddleware).Get("/asset", handler.GetAssetConfig) + r.With(historyMiddleware).Get("/account", handler.GetAccountConfig) + }) } diff --git a/services/horizon/internal/ingest/group_processors.go b/services/horizon/internal/ingest/group_processors.go index 8b5d2d337e..00f8adde8a 100644 --- a/services/horizon/internal/ingest/group_processors.go +++ b/services/horizon/internal/ingest/group_processors.go @@ -125,6 +125,10 @@ func newGroupTransactionProcessors(processors []horizonTransactionProcessor, } } +func (g groupTransactionProcessors) IsEmpty() bool { + return len(g.processors) == 0 +} + func (g groupTransactionProcessors) ProcessTransaction(lcm xdr.LedgerCloseMeta, tx ingest.LedgerTransaction) error { for _, p := range g.processors { startTime := time.Now() diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 7dbaacaadb..d88cc3a3ce 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -97,7 +97,6 @@ type Config struct { EnableReapLookupTables bool EnableExtendedLogLedgerStats bool - ReingestEnabled bool MaxReingestRetries int ReingestRetryBackoffSeconds int @@ -108,9 +107,8 @@ type Config struct { RoundingSlippageFilter int - EnableIngestionFiltering bool - MaxLedgerPerFlush uint32 - SkipTxmeta bool + MaxLedgerPerFlush uint32 + SkipTxmeta bool } const ( diff --git a/services/horizon/internal/ingest/processor_runner.go b/services/horizon/internal/ingest/processor_runner.go index 9c38c01154..e6f0e0cf74 100644 --- a/services/horizon/internal/ingest/processor_runner.go +++ b/services/horizon/internal/ingest/processor_runner.go @@ -164,20 +164,15 @@ func (s *ProcessorRunner) buildTransactionProcessor(ledgersProcessor *processors func (s *ProcessorRunner) buildTransactionFilterer() *groupTransactionFilterers { var f []processors.LedgerTransactionFilterer - if s.config.EnableIngestionFiltering { - f = append(f, s.filters.GetFilters(s.historyQ, s.ctx)...) - } - + f = append(f, s.filters.GetFilters(s.historyQ, s.ctx)...) return newGroupTransactionFilterers(f) } func (s *ProcessorRunner) buildFilteredOutProcessor() *groupTransactionProcessors { - // when in online mode, the submission result processor must always run (regardless of filtering) var p []horizonTransactionProcessor - if s.config.EnableIngestionFiltering { - txSubProc := processors.NewTransactionFilteredTmpProcessor(s.historyQ.NewTransactionFilteredTmpBatchInsertBuilder(), s.config.SkipTxmeta) - p = append(p, txSubProc) - } + + txSubProc := processors.NewTransactionFilteredTmpProcessor(s.historyQ.NewTransactionFilteredTmpBatchInsertBuilder(), s.config.SkipTxmeta) + p = append(p, txSubProc) return newGroupTransactionProcessors(p, nil, nil) } @@ -384,6 +379,7 @@ func (s *ProcessorRunner) runTransactionProcessorsOnLedger(registry nameRegistry ledgersProcessor.ProcessLedger(ledger) groupTransactionFilterers := s.buildTransactionFilterer() + // when in online mode, the submission result processor must always run (regardless of whether filter rules exist or not) groupFilteredOutProcessors := s.buildFilteredOutProcessor() loaders, groupTransactionProcessors := s.buildTransactionProcessor(ledgersProcessor) @@ -488,11 +484,16 @@ func registerTransactionProcessors( return nil } +// Runs only transaction processors on the inbound list of ledgers. +// Updates history tables based on transactions. +// Intentionally do not make effort to insert or purge tx's on history_transactions_filtered_tmp +// Thus, using this method does not support tx sub processing for the ledgers passed in, i.e. tx submission queue will not see these. func (s *ProcessorRunner) RunTransactionProcessorsOnLedgers(ledgers []xdr.LedgerCloseMeta, execInTx bool) (err error) { ledgersProcessor := processors.NewLedgerProcessor(s.historyQ.NewLedgerBatchInsertBuilder(), CurrentVersion) groupTransactionFilterers := s.buildTransactionFilterer() - groupFilteredOutProcessors := s.buildFilteredOutProcessor() + // intentionally skip filtered out processor + groupFilteredOutProcessors := newGroupTransactionProcessors(nil, nil, nil) loaders, groupTransactionProcessors := s.buildTransactionProcessor(ledgersProcessor) startTime := time.Now() @@ -554,16 +555,16 @@ func (s *ProcessorRunner) flushProcessors(groupFilteredOutProcessors *groupTrans defer s.session.Rollback() } - if s.config.EnableIngestionFiltering { + if err := groupFilteredOutProcessors.Flush(s.ctx, s.session); err != nil { + return errors.Wrap(err, "Error flushing temp filtered tx from processor") + } - if err := groupFilteredOutProcessors.Flush(s.ctx, s.session); err != nil { - return errors.Wrap(err, "Error flushing temp filtered tx from processor") - } - if time.Since(s.lastTransactionsTmpGC) > transactionsFilteredTmpGCPeriod { - if _, err := s.historyQ.DeleteTransactionsFilteredTmpOlderThan(s.ctx, uint64(transactionsFilteredTmpGCPeriod.Seconds())); err != nil { - return errors.Wrap(err, "Error trimming filtered transactions") - } + if !groupFilteredOutProcessors.IsEmpty() && + time.Since(s.lastTransactionsTmpGC) > transactionsFilteredTmpGCPeriod { + if _, err := s.historyQ.DeleteTransactionsFilteredTmpOlderThan(s.ctx, uint64(transactionsFilteredTmpGCPeriod.Seconds())); err != nil { + return errors.Wrap(err, "Error trimming filtered transactions") } + s.lastTransactionsTmpGC = time.Now() } if err := groupTransactionProcessors.Flush(s.ctx, s.session); err != nil { diff --git a/services/horizon/internal/ingest/processor_runner_test.go b/services/horizon/internal/ingest/processor_runner_test.go index 78faf853f3..e6ce6b512c 100644 --- a/services/horizon/internal/ingest/processor_runner_test.go +++ b/services/horizon/internal/ingest/processor_runner_test.go @@ -260,75 +260,6 @@ func TestProcessorRunnerBuildTransactionProcessor(t *testing.T) { assert.IsType(t, &processors.LiquidityPoolsTransactionProcessor{}, processor.processors[8]) } -func TestProcessorRunnerWithFilterEnabled(t *testing.T) { - ctx := context.Background() - - config := Config{ - NetworkPassphrase: network.PublicNetworkPassphrase, - EnableIngestionFiltering: true, - } - - q := &mockDBQ{} - mockSession := &db.MockSession{} - defer mock.AssertExpectationsForObjects(t, q) - - ledger := xdr.LedgerCloseMeta{ - V0: &xdr.LedgerCloseMetaV0{ - LedgerHeader: xdr.LedgerHeaderHistoryEntry{ - Header: xdr.LedgerHeader{ - BucketListHash: xdr.Hash([32]byte{0, 1, 2}), - LedgerSeq: 23, - }, - }, - }, - } - - mockTransactionsFilteredTmpBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{} - defer mock.AssertExpectationsForObjects(t, mockTransactionsFilteredTmpBatchInsertBuilder) - mockTransactionsFilteredTmpBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() - q.MockQTransactions.On("NewTransactionFilteredTmpBatchInsertBuilder"). - Return(mockTransactionsFilteredTmpBatchInsertBuilder) - q.On("DeleteTransactionsFilteredTmpOlderThan", ctx, mock.AnythingOfType("uint64")). - Return(int64(0), nil) - - defer mock.AssertExpectationsForObjects(t, mockTxProcessorBatchBuilders(q, mockSession, ctx)...) - defer mock.AssertExpectationsForObjects(t, mockChangeProcessorBatchBuilders(q, ctx, true)...) - - mockBatchInsertBuilder := &history.MockLedgersBatchInsertBuilder{} - q.MockQLedgers.On("NewLedgerBatchInsertBuilder").Return(mockBatchInsertBuilder) - mockBatchInsertBuilder.On( - "Add", - ledger.V0.LedgerHeader, 0, 0, 0, 0, CurrentVersion).Return(nil) - mockBatchInsertBuilder.On( - "Exec", - ctx, - mockSession, - ).Return(nil) - defer mock.AssertExpectationsForObjects(t, mockBatchInsertBuilder) - - q.MockQAssetStats.On("RemoveContractAssetBalances", ctx, []xdr.Hash(nil)). - Return(nil).Once() - q.MockQAssetStats.On("UpdateContractAssetBalanceAmounts", ctx, []xdr.Hash{}, []string{}). - Return(nil).Once() - q.MockQAssetStats.On("InsertContractAssetBalances", ctx, []history.ContractAssetBalance(nil)). - Return(nil).Once() - q.MockQAssetStats.On("UpdateContractAssetBalanceExpirations", ctx, []xdr.Hash{}, []uint32{}). - Return(nil).Once() - q.MockQAssetStats.On("GetContractAssetBalancesExpiringAt", ctx, uint32(22)). - Return([]history.ContractAssetBalance{}, nil).Once() - - runner := ProcessorRunner{ - ctx: ctx, - config: config, - historyQ: q, - session: mockSession, - filters: &MockFilters{}, - } - - _, err := runner.RunAllProcessorsOnLedger(ledger) - assert.NoError(t, err) -} - func TestProcessorRunnerRunAllProcessorsOnLedger(t *testing.T) { ctx := context.Background() @@ -354,6 +285,7 @@ func TestProcessorRunnerRunAllProcessorsOnLedger(t *testing.T) { // Batches defer mock.AssertExpectationsForObjects(t, mockTxProcessorBatchBuilders(q, mockSession, ctx)...) defer mock.AssertExpectationsForObjects(t, mockChangeProcessorBatchBuilders(q, ctx, true)...) + defer mock.AssertExpectationsForObjects(t, mockFilteredOutProcessorsForNoRules(q, mockSession, ctx)...) mockBatchInsertBuilder := &history.MockLedgersBatchInsertBuilder{} q.MockQLedgers.On("NewLedgerBatchInsertBuilder").Return(mockBatchInsertBuilder) @@ -434,6 +366,10 @@ func TestProcessorRunnerRunTransactionsProcessorsOnLedgers(t *testing.T) { }, } + // filtered out processor should not be created + q.MockQTransactions.AssertNotCalled(t, "NewTransactionFilteredTmpBatchInsertBuilder") + q.AssertNotCalled(t, "DeleteTransactionsFilteredTmpOlderThan", ctx, mock.AnythingOfType("uint64")) + // Batches defer mock.AssertExpectationsForObjects(t, mockTxProcessorBatchBuilders(q, mockSession, ctx)...) @@ -660,3 +596,18 @@ func mockChangeProcessorBatchBuilders(q *mockDBQ, ctx context.Context, mockExec mockTrustLinesBatchInsertBuilder, } } + +func mockFilteredOutProcessorsForNoRules(q *mockDBQ, mockSession *db.MockSession, ctx context.Context) []interface{} { + mockTransactionsFilteredTmpBatchInsertBuilder := &history.MockTransactionsBatchInsertBuilder{} + // since no filter rules are used on tests in this suite, we do not need to mock the "Add" call + // the "Exec" call gets run by flush all the time + mockTransactionsFilteredTmpBatchInsertBuilder.On("Exec", ctx, mockSession).Return(nil).Once() + q.MockQTransactions.On("NewTransactionFilteredTmpBatchInsertBuilder"). + Return(mockTransactionsFilteredTmpBatchInsertBuilder) + q.On("DeleteTransactionsFilteredTmpOlderThan", ctx, mock.AnythingOfType("uint64")). + Return(int64(0), nil) + + return []interface{}{ + mockTransactionsFilteredTmpBatchInsertBuilder, + } +} diff --git a/services/horizon/internal/init.go b/services/horizon/internal/init.go index 2311833508..93580fed54 100644 --- a/services/horizon/internal/init.go +++ b/services/horizon/internal/init.go @@ -102,7 +102,6 @@ func initIngester(app *App) { EnableReapLookupTables: app.config.HistoryRetentionCount > 0, EnableExtendedLogLedgerStats: app.config.IngestEnableExtendedLogLedgerStats, RoundingSlippageFilter: app.config.RoundingSlippageFilter, - EnableIngestionFiltering: app.config.EnableIngestionFiltering, SkipTxmeta: app.config.SkipTxmeta, }) diff --git a/services/horizon/internal/integration/db_test.go b/services/horizon/internal/integration/db_test.go index f6c32f8cc7..98d584c8e6 100644 --- a/services/horizon/internal/integration/db_test.go +++ b/services/horizon/internal/integration/db_test.go @@ -14,10 +14,12 @@ import ( "github.com/stellar/go/historyarchive" "github.com/stellar/go/ingest/ledgerbackend" "github.com/stellar/go/keypair" + hProtocol "github.com/stellar/go/protocols/horizon" horizoncmd "github.com/stellar/go/services/horizon/cmd" horizon "github.com/stellar/go/services/horizon/internal" "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/services/horizon/internal/db2/schema" + "github.com/stellar/go/services/horizon/internal/ingest/filters" "github.com/stellar/go/services/horizon/internal/test/integration" "github.com/stellar/go/support/collections/set" "github.com/stellar/go/support/db" @@ -416,7 +418,10 @@ func submitAccountOps(itest *integration.Test, tt *assert.Assertions) (submitted } func initializeDBIntegrationTest(t *testing.T) (*integration.Test, int32) { - itest := integration.NewTest(t, integration.Config{}) + itest := integration.NewTest(t, integration.Config{ + HorizonIngestParameters: map[string]string{ + "admin-port": strconv.Itoa(6000), + }}) tt := assert.New(t) // Make sure all possible operations are covered by reingestion @@ -545,6 +550,159 @@ func TestReingestDB(t *testing.T) { tt.NoError(horizoncmd.RootCmd.Execute(), "Repeat the same reingest range against db, should not have errors.") } +func TestReingestDBWithFilterRules(t *testing.T) { + itest, _ := initializeDBIntegrationTest(t) + tt := assert.New(t) + + archive, err := historyarchive.Connect( + itest.GetHorizonIngestConfig().HistoryArchiveURLs[0], + historyarchive.ArchiveOptions{ + NetworkPassphrase: itest.GetHorizonIngestConfig().NetworkPassphrase, + CheckpointFrequency: itest.GetHorizonIngestConfig().CheckpointFrequency, + }) + tt.NoError(err) + + // make sure one full checkpoint has elapsed before making ledger entries + // as test can't reap before first checkpoint in general later in test + publishedFirstCheckpoint := func() bool { + has, requestErr := archive.GetRootHAS() + if requestErr != nil { + t.Logf("request to fetch checkpoint failed: %v", requestErr) + return false + } + return has.CurrentLedger > 1 + } + tt.Eventually(publishedFirstCheckpoint, 10*time.Second, time.Second) + + fullKeys, accounts := itest.CreateAccounts(2, "10000") + whitelistedAccount := accounts[0] + whitelistedAccountKey := fullKeys[0] + nonWhitelistedAccount := accounts[1] + nonWhitelistedAccountKey := fullKeys[1] + enabled := true + + // all assets are allowed by default because the asset filter config is empty. + defaultAllowedAsset := txnbuild.CreditAsset{Code: "PTS", Issuer: itest.Master().Address()} + itest.MustEstablishTrustline(whitelistedAccountKey, whitelistedAccount, defaultAllowedAsset) + itest.MustEstablishTrustline(nonWhitelistedAccountKey, nonWhitelistedAccount, defaultAllowedAsset) + + // Setup a whitelisted account rule, force refresh of filter configs to be quick + filters.SetFilterConfigCheckIntervalSeconds(1) + + expectedAccountFilter := hProtocol.AccountFilterConfig{ + Whitelist: []string{whitelistedAccount.GetAccountID()}, + Enabled: &enabled, + } + err = itest.AdminClient().SetIngestionAccountFilter(expectedAccountFilter) + tt.NoError(err) + + accountFilter, err := itest.AdminClient().GetIngestionAccountFilter() + tt.NoError(err) + + tt.ElementsMatch(expectedAccountFilter.Whitelist, accountFilter.Whitelist) + tt.Equal(expectedAccountFilter.Enabled, accountFilter.Enabled) + + // Ensure the latest filter configs are reloaded by the ingestion state machine processor + time.Sleep(time.Duration(filters.GetFilterConfigCheckIntervalSeconds()) * time.Second) + + // Make sure that when using a non-whitelisted account, the transaction is not stored + nonWhiteListTxResp := itest.MustSubmitOperations(itest.MasterAccount(), itest.Master(), + &txnbuild.Payment{ + Destination: nonWhitelistedAccount.GetAccountID(), + Amount: "10", + Asset: defaultAllowedAsset, + }, + ) + _, err = itest.Client().TransactionDetail(nonWhiteListTxResp.Hash) + tt.True(horizonclient.IsNotFoundError(err)) + + // Make sure that when using a whitelisted account, the transaction is stored + whiteListTxResp := itest.MustSubmitOperations(itest.MasterAccount(), itest.Master(), + &txnbuild.Payment{ + Destination: whitelistedAccount.GetAccountID(), + Amount: "10", + Asset: defaultAllowedAsset, + }, + ) + lastTx, err := itest.Client().TransactionDetail(whiteListTxResp.Hash) + tt.NoError(err) + + reachedLedger := uint32(lastTx.Ledger) + + t.Logf("reached ledger is %v", reachedLedger) + + // make sure a checkpoint has elapsed to lock in the chagnes made on network for reingest later + var latestCheckpoint uint32 + publishedNextCheckpoint := func() bool { + has, requestErr := archive.GetRootHAS() + if requestErr != nil { + t.Logf("request to fetch checkpoint failed: %v", requestErr) + return false + } + latestCheckpoint = has.CurrentLedger + return latestCheckpoint > reachedLedger + } + tt.Eventually(publishedNextCheckpoint, 10*time.Second, time.Second) + + // to test reingestion, stop horizon web and captive core, + // it was used to create ledger entries for test. + itest.StopHorizon() + + // clear the db with reaping all ledgers + horizoncmd.RootCmd.SetArgs(command(t, itest.GetHorizonIngestConfig(), "db", + "reap", + "--history-retention-count=1", + )) + tt.NoError(horizoncmd.RootCmd.Execute()) + + // repopulate the db with reingestion which should catchup using core reapply filter rules + // correctly on reingestion ranged + horizoncmd.RootCmd.SetArgs(command(t, itest.GetHorizonIngestConfig(), "db", + "reingest", + "range", + "1", + fmt.Sprintf("%d", reachedLedger), + )) + + tt.NoError(horizoncmd.RootCmd.Execute()) + + // bring up horizon, just the api server no ingestion, to query + // for tx's that should have been repopulated on db from reingestion per + // filter rule expectations + webApp, err := horizon.NewApp(itest.GetHorizonWebConfig()) + tt.NoError(err) + + webAppDone := make(chan struct{}) + go func() { + webApp.Serve() + close(webAppDone) + }() + + // wait until the web server is up before continuing to test requests + itest.WaitForHorizon() + + // Make sure that a tx from non-whitelisted account is not stored after reingestion + _, err = itest.Client().TransactionDetail(nonWhiteListTxResp.Hash) + tt.True(horizonclient.IsNotFoundError(err)) + + // Make sure that a tx from whitelisted account is stored after reingestion + _, err = itest.Client().TransactionDetail(whiteListTxResp.Hash) + tt.NoError(err) + + // tell the horizon web server to shutdown + webApp.Close() + + // wait for horizon to finish shutdown + tt.Eventually(func() bool { + select { + case <-webAppDone: + return true + default: + return false + } + }, 30*time.Second, time.Second) +} + func getCoreConfigFile(itest *integration.Test) string { coreConfigFile := "captive-core-reingest-range-classic-integration-tests.cfg" if itest.Config().ProtocolVersion >= ledgerbackend.MinimalSorobanProtocolSupport { diff --git a/services/horizon/internal/integration/parameters_test.go b/services/horizon/internal/integration/parameters_test.go index 7d487c556f..2ca05ba351 100644 --- a/services/horizon/internal/integration/parameters_test.go +++ b/services/horizon/internal/integration/parameters_test.go @@ -375,27 +375,6 @@ func TestDisablePathFinding(t *testing.T) { }) } -func TestIngestionFilteringAlwaysDefaultingToTrue(t *testing.T) { - t.Run("ingestion filtering flag set to default value", func(t *testing.T) { - test := integration.NewTest(t, *integration.GetTestConfig()) - err := test.StartHorizon() - assert.NoError(t, err) - test.WaitForHorizon() - assert.Equal(t, test.HorizonIngest().Config().EnableIngestionFiltering, true) - test.Shutdown() - }) - t.Run("ingestion filtering flag set to false", func(t *testing.T) { - testConfig := integration.GetTestConfig() - testConfig.HorizonIngestParameters = map[string]string{"exp-enable-ingestion-filtering": "false"} - test := integration.NewTest(t, *testConfig) - err := test.StartHorizon() - assert.NoError(t, err) - test.WaitForHorizon() - assert.Equal(t, test.HorizonIngest().Config().EnableIngestionFiltering, true) - test.Shutdown() - }) -} - func TestDisableTxSub(t *testing.T) { t.Run("require stellar-core-url when both DISABLE_TX_SUB=false and INGEST=false", func(t *testing.T) { localParams := integration.MergeMaps(networkParamArgs, map[string]string{ diff --git a/services/horizon/internal/test/integration/integration.go b/services/horizon/internal/test/integration/integration.go index 87718d5a67..4c76afe04a 100644 --- a/services/horizon/internal/test/integration/integration.go +++ b/services/horizon/internal/test/integration/integration.go @@ -96,6 +96,7 @@ type Test struct { config Config coreConfig CaptiveConfig horizonIngestConfig horizon.Config + horizonWebConfig horizon.Config environment *test.EnvironmentManager horizonClient *sdk.Client @@ -334,6 +335,10 @@ func (i *Test) GetHorizonIngestConfig() horizon.Config { return i.horizonIngestConfig } +func (i *Test) GetHorizonWebConfig() horizon.Config { + return i.horizonWebConfig +} + // Shutdown stops the integration tests and destroys all its associated // resources. It will be implicitly called when the calling test (i.e. the // `testing.Test` passed to `New()`) is finished if it hasn't been explicitly @@ -401,6 +406,7 @@ func (i *Test) StartHorizon() error { } i.horizonIngestConfig = *ingestConfig + i.horizonWebConfig = *webConfig i.appStopped = &sync.WaitGroup{} i.appStopped.Add(2)