Skip to content

Commit

Permalink
don't run filtered tmp processor when reingestion is enabled (stellar…
Browse files Browse the repository at this point in the history
  • Loading branch information
sreuland authored Apr 18, 2024
1 parent 54135f8 commit f2cd035
Show file tree
Hide file tree
Showing 14 changed files with 299 additions and 205 deletions.
6 changes: 4 additions & 2 deletions services/horizon/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,10 @@ var dbReapCmd = &cobra.Command{
if err != nil {
return err
}
defer func() {
app.Shutdown()
app.CloseDB()
}()
ctx := context.Background()
app.UpdateHorizonLedgerState(ctx)
return app.DeleteUnretainedHistory(ctx)
Expand Down Expand Up @@ -409,7 +413,6 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool,
HistoryArchiveURLs: config.HistoryArchiveURLs,
HistoryArchiveCaching: config.HistoryArchiveCaching,
CheckpointFrequency: config.CheckpointFrequency,
ReingestEnabled: true,
MaxReingestRetries: int(retries),
ReingestRetryBackoffSeconds: int(retryBackoffSeconds),
CaptiveCoreBinaryPath: config.CaptiveCoreBinaryPath,
Expand All @@ -418,7 +421,6 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool,
CaptiveCoreStoragePath: config.CaptiveCoreStoragePath,
StellarCoreURL: config.StellarCoreURL,
RoundingSlippageFilter: config.RoundingSlippageFilter,
EnableIngestionFiltering: config.EnableIngestionFiltering,
MaxLedgerPerFlush: maxLedgersPerFlush,
SkipTxmeta: config.SkipTxmeta,
}
Expand Down
57 changes: 27 additions & 30 deletions services/horizon/cmd/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,17 +125,16 @@ var ingestVerifyRangeCmd = &cobra.Command{
}

ingestConfig := ingest.Config{
NetworkPassphrase: globalConfig.NetworkPassphrase,
HistorySession: horizonSession,
HistoryArchiveURLs: globalConfig.HistoryArchiveURLs,
HistoryArchiveCaching: globalConfig.HistoryArchiveCaching,
CaptiveCoreBinaryPath: globalConfig.CaptiveCoreBinaryPath,
CaptiveCoreConfigUseDB: globalConfig.CaptiveCoreConfigUseDB,
CheckpointFrequency: globalConfig.CheckpointFrequency,
CaptiveCoreToml: globalConfig.CaptiveCoreToml,
CaptiveCoreStoragePath: globalConfig.CaptiveCoreStoragePath,
RoundingSlippageFilter: globalConfig.RoundingSlippageFilter,
EnableIngestionFiltering: globalConfig.EnableIngestionFiltering,
NetworkPassphrase: globalConfig.NetworkPassphrase,
HistorySession: horizonSession,
HistoryArchiveURLs: globalConfig.HistoryArchiveURLs,
HistoryArchiveCaching: globalConfig.HistoryArchiveCaching,
CaptiveCoreBinaryPath: globalConfig.CaptiveCoreBinaryPath,
CaptiveCoreConfigUseDB: globalConfig.CaptiveCoreConfigUseDB,
CheckpointFrequency: globalConfig.CheckpointFrequency,
CaptiveCoreToml: globalConfig.CaptiveCoreToml,
CaptiveCoreStoragePath: globalConfig.CaptiveCoreStoragePath,
RoundingSlippageFilter: globalConfig.RoundingSlippageFilter,
}

system, err := ingest.NewSystem(ingestConfig)
Expand Down Expand Up @@ -285,14 +284,13 @@ var ingestInitGenesisStateCmd = &cobra.Command{
}

ingestConfig := ingest.Config{
NetworkPassphrase: globalConfig.NetworkPassphrase,
HistorySession: horizonSession,
HistoryArchiveURLs: globalConfig.HistoryArchiveURLs,
CheckpointFrequency: globalConfig.CheckpointFrequency,
RoundingSlippageFilter: globalConfig.RoundingSlippageFilter,
EnableIngestionFiltering: globalConfig.EnableIngestionFiltering,
CaptiveCoreBinaryPath: globalConfig.CaptiveCoreBinaryPath,
CaptiveCoreConfigUseDB: globalConfig.CaptiveCoreConfigUseDB,
NetworkPassphrase: globalConfig.NetworkPassphrase,
HistorySession: horizonSession,
HistoryArchiveURLs: globalConfig.HistoryArchiveURLs,
CheckpointFrequency: globalConfig.CheckpointFrequency,
RoundingSlippageFilter: globalConfig.RoundingSlippageFilter,
CaptiveCoreBinaryPath: globalConfig.CaptiveCoreBinaryPath,
CaptiveCoreConfigUseDB: globalConfig.CaptiveCoreConfigUseDB,
}

system, err := ingest.NewSystem(ingestConfig)
Expand Down Expand Up @@ -348,17 +346,16 @@ var ingestBuildStateCmd = &cobra.Command{
}

ingestConfig := ingest.Config{
NetworkPassphrase: globalConfig.NetworkPassphrase,
HistorySession: horizonSession,
HistoryArchiveURLs: globalConfig.HistoryArchiveURLs,
HistoryArchiveCaching: globalConfig.HistoryArchiveCaching,
CaptiveCoreBinaryPath: globalConfig.CaptiveCoreBinaryPath,
CaptiveCoreConfigUseDB: globalConfig.CaptiveCoreConfigUseDB,
CheckpointFrequency: globalConfig.CheckpointFrequency,
CaptiveCoreToml: globalConfig.CaptiveCoreToml,
CaptiveCoreStoragePath: globalConfig.CaptiveCoreStoragePath,
RoundingSlippageFilter: globalConfig.RoundingSlippageFilter,
EnableIngestionFiltering: globalConfig.EnableIngestionFiltering,
NetworkPassphrase: globalConfig.NetworkPassphrase,
HistorySession: horizonSession,
HistoryArchiveURLs: globalConfig.HistoryArchiveURLs,
HistoryArchiveCaching: globalConfig.HistoryArchiveCaching,
CaptiveCoreBinaryPath: globalConfig.CaptiveCoreBinaryPath,
CaptiveCoreConfigUseDB: globalConfig.CaptiveCoreConfigUseDB,
CheckpointFrequency: globalConfig.CheckpointFrequency,
CaptiveCoreToml: globalConfig.CaptiveCoreToml,
CaptiveCoreStoragePath: globalConfig.CaptiveCoreStoragePath,
RoundingSlippageFilter: globalConfig.RoundingSlippageFilter,
}

system, err := ingest.NewSystem(ingestConfig)
Expand Down
53 changes: 29 additions & 24 deletions services/horizon/internal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,15 @@ func (a *App) Close() {

func (a *App) waitForDone() {
<-a.done
webShutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
a.webServer.Shutdown(webShutdownCtx)
a.Shutdown()
}

func (a *App) Shutdown() {
if a.webServer != nil {
webShutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
a.webServer.Shutdown(webShutdownCtx)
}
a.cancel()
if a.ingester != nil {
a.ingester.Shutdown()
Expand Down Expand Up @@ -528,27 +534,26 @@ func (a *App) init() error {
initTxSubMetrics(a)

routerConfig := httpx.RouterConfig{
DBSession: a.historyQ.SessionInterface,
TxSubmitter: a.submitter,
RateQuota: a.config.RateQuota,
BehindCloudflare: a.config.BehindCloudflare,
BehindAWSLoadBalancer: a.config.BehindAWSLoadBalancer,
SSEUpdateFrequency: a.config.SSEUpdateFrequency,
StaleThreshold: a.config.StaleThreshold,
ConnectionTimeout: a.config.ConnectionTimeout,
ClientQueryTimeout: a.config.ClientQueryTimeout,
MaxConcurrentRequests: a.config.MaxConcurrentRequests,
MaxHTTPRequestSize: a.config.MaxHTTPRequestSize,
NetworkPassphrase: a.config.NetworkPassphrase,
MaxPathLength: a.config.MaxPathLength,
MaxAssetsPerPathRequest: a.config.MaxAssetsPerPathRequest,
PathFinder: a.paths,
PrometheusRegistry: a.prometheusRegistry,
CoreGetter: a,
HorizonVersion: a.horizonVersion,
FriendbotURL: a.config.FriendbotURL,
EnableIngestionFiltering: a.config.EnableIngestionFiltering,
DisableTxSub: a.config.DisableTxSub,
DBSession: a.historyQ.SessionInterface,
TxSubmitter: a.submitter,
RateQuota: a.config.RateQuota,
BehindCloudflare: a.config.BehindCloudflare,
BehindAWSLoadBalancer: a.config.BehindAWSLoadBalancer,
SSEUpdateFrequency: a.config.SSEUpdateFrequency,
StaleThreshold: a.config.StaleThreshold,
ConnectionTimeout: a.config.ConnectionTimeout,
ClientQueryTimeout: a.config.ClientQueryTimeout,
MaxConcurrentRequests: a.config.MaxConcurrentRequests,
MaxHTTPRequestSize: a.config.MaxHTTPRequestSize,
NetworkPassphrase: a.config.NetworkPassphrase,
MaxPathLength: a.config.MaxPathLength,
MaxAssetsPerPathRequest: a.config.MaxAssetsPerPathRequest,
PathFinder: a.paths,
PrometheusRegistry: a.prometheusRegistry,
CoreGetter: a,
HorizonVersion: a.horizonVersion,
FriendbotURL: a.config.FriendbotURL,
DisableTxSub: a.config.DisableTxSub,
HealthCheck: healthCheck{
session: a.historyQ.SessionInterface,
ctx: a.ctx,
Expand Down
1 change: 0 additions & 1 deletion services/horizon/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ type Config struct {
Port uint
AdminPort uint

EnableIngestionFiltering bool
CaptiveCoreBinaryPath string
CaptiveCoreConfigPath string
CaptiveCoreTomlParams ledgerbackend.CaptiveCoreTomlParams
Expand Down
10 changes: 4 additions & 6 deletions services/horizon/internal/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,8 @@ func Flags() (*Config, support.ConfigOptions) {
OptType: types.String,
FlagDefault: "",
Required: false,
ConfigKey: &config.EnableIngestionFiltering,
CustomSetValue: func(opt *support.ConfigOption) error {

// Always enable ingestion filtering by default.
config.EnableIngestionFiltering = true

// ingestion filtering is always enabled, it has no rules by default.
if val := viper.GetString(opt.Name); val != "" {
stdLog.Printf(
"DEPRECATED - No ingestion filter rules are defined by default, which equates to " +
Expand Down Expand Up @@ -665,9 +661,11 @@ func Flags() (*Config, support.ConfigOptions) {
Usage: "the batch size (in ledgers) to remove per reap from the Horizon database",
UsedInCommands: IngestionCommands,
CustomSetValue: func(opt *support.ConfigOption) error {
if val := viper.GetUint(opt.Name); val <= 0 || val > 500_000 {
val := viper.GetUint(opt.Name)
if val <= 0 || val > 500_000 {
return fmt.Errorf("flag --history-retention-reap-count must be in range [1, 500,000]")
}
*(opt.ConfigKey.(*uint)) = val
return nil
},
},
Expand Down
53 changes: 25 additions & 28 deletions services/horizon/internal/httpx/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,24 @@ type RouterConfig struct {
RateQuota *throttled.RateQuota
MaxConcurrentRequests uint

BehindCloudflare bool
BehindAWSLoadBalancer bool
SSEUpdateFrequency time.Duration
StaleThreshold uint
ConnectionTimeout time.Duration
ClientQueryTimeout time.Duration
MaxHTTPRequestSize uint
NetworkPassphrase string
MaxPathLength uint
MaxAssetsPerPathRequest int
PathFinder paths.Finder
PrometheusRegistry *prometheus.Registry
CoreGetter actions.CoreStateGetter
HorizonVersion string
FriendbotURL *url.URL
HealthCheck http.Handler
EnableIngestionFiltering bool
DisableTxSub bool
SkipTxMeta bool
BehindCloudflare bool
BehindAWSLoadBalancer bool
SSEUpdateFrequency time.Duration
StaleThreshold uint
ConnectionTimeout time.Duration
ClientQueryTimeout time.Duration
MaxHTTPRequestSize uint
NetworkPassphrase string
MaxPathLength uint
MaxAssetsPerPathRequest int
PathFinder paths.Finder
PrometheusRegistry *prometheus.Registry
CoreGetter actions.CoreStateGetter
HorizonVersion string
FriendbotURL *url.URL
HealthCheck http.Handler
DisableTxSub bool
SkipTxMeta bool
}

type Router struct {
Expand Down Expand Up @@ -375,13 +374,11 @@ func (r *Router) addRoutes(config *RouterConfig, rateLimiter *throttled.HTTPRate
r.Internal.Get("/metrics", promhttp.HandlerFor(config.PrometheusRegistry, promhttp.HandlerOpts{}).ServeHTTP)
r.Internal.Get("/debug/pprof/heap", pprof.Index)
r.Internal.Get("/debug/pprof/profile", pprof.Profile)
if config.EnableIngestionFiltering {
r.Internal.Route("/ingestion/filters", func(r chi.Router) {
handler := actions.FilterConfigHandler{}
r.With(historyMiddleware).Put("/asset", handler.UpdateAssetConfig)
r.With(historyMiddleware).Put("/account", handler.UpdateAccountConfig)
r.With(historyMiddleware).Get("/asset", handler.GetAssetConfig)
r.With(historyMiddleware).Get("/account", handler.GetAccountConfig)
})
}
r.Internal.Route("/ingestion/filters", func(r chi.Router) {
handler := actions.FilterConfigHandler{}
r.With(historyMiddleware).Put("/asset", handler.UpdateAssetConfig)
r.With(historyMiddleware).Put("/account", handler.UpdateAccountConfig)
r.With(historyMiddleware).Get("/asset", handler.GetAssetConfig)
r.With(historyMiddleware).Get("/account", handler.GetAccountConfig)
})
}
4 changes: 4 additions & 0 deletions services/horizon/internal/ingest/group_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ func newGroupTransactionProcessors(processors []horizonTransactionProcessor,
}
}

func (g groupTransactionProcessors) IsEmpty() bool {
return len(g.processors) == 0
}

func (g groupTransactionProcessors) ProcessTransaction(lcm xdr.LedgerCloseMeta, tx ingest.LedgerTransaction) error {
for _, p := range g.processors {
startTime := time.Now()
Expand Down
6 changes: 2 additions & 4 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ type Config struct {
EnableReapLookupTables bool
EnableExtendedLogLedgerStats bool

ReingestEnabled bool
MaxReingestRetries int
ReingestRetryBackoffSeconds int

Expand All @@ -108,9 +107,8 @@ type Config struct {

RoundingSlippageFilter int

EnableIngestionFiltering bool
MaxLedgerPerFlush uint32
SkipTxmeta bool
MaxLedgerPerFlush uint32
SkipTxmeta bool
}

const (
Expand Down
37 changes: 19 additions & 18 deletions services/horizon/internal/ingest/processor_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,20 +164,15 @@ func (s *ProcessorRunner) buildTransactionProcessor(ledgersProcessor *processors

func (s *ProcessorRunner) buildTransactionFilterer() *groupTransactionFilterers {
var f []processors.LedgerTransactionFilterer
if s.config.EnableIngestionFiltering {
f = append(f, s.filters.GetFilters(s.historyQ, s.ctx)...)
}

f = append(f, s.filters.GetFilters(s.historyQ, s.ctx)...)
return newGroupTransactionFilterers(f)
}

func (s *ProcessorRunner) buildFilteredOutProcessor() *groupTransactionProcessors {
// when in online mode, the submission result processor must always run (regardless of filtering)
var p []horizonTransactionProcessor
if s.config.EnableIngestionFiltering {
txSubProc := processors.NewTransactionFilteredTmpProcessor(s.historyQ.NewTransactionFilteredTmpBatchInsertBuilder(), s.config.SkipTxmeta)
p = append(p, txSubProc)
}

txSubProc := processors.NewTransactionFilteredTmpProcessor(s.historyQ.NewTransactionFilteredTmpBatchInsertBuilder(), s.config.SkipTxmeta)
p = append(p, txSubProc)

return newGroupTransactionProcessors(p, nil, nil)
}
Expand Down Expand Up @@ -384,6 +379,7 @@ func (s *ProcessorRunner) runTransactionProcessorsOnLedger(registry nameRegistry
ledgersProcessor.ProcessLedger(ledger)

groupTransactionFilterers := s.buildTransactionFilterer()
// when in online mode, the submission result processor must always run (regardless of whether filter rules exist or not)
groupFilteredOutProcessors := s.buildFilteredOutProcessor()
loaders, groupTransactionProcessors := s.buildTransactionProcessor(ledgersProcessor)

Expand Down Expand Up @@ -488,11 +484,16 @@ func registerTransactionProcessors(
return nil
}

// Runs only transaction processors on the inbound list of ledgers.
// Updates history tables based on transactions.
// Intentionally do not make effort to insert or purge tx's on history_transactions_filtered_tmp
// Thus, using this method does not support tx sub processing for the ledgers passed in, i.e. tx submission queue will not see these.
func (s *ProcessorRunner) RunTransactionProcessorsOnLedgers(ledgers []xdr.LedgerCloseMeta, execInTx bool) (err error) {
ledgersProcessor := processors.NewLedgerProcessor(s.historyQ.NewLedgerBatchInsertBuilder(), CurrentVersion)

groupTransactionFilterers := s.buildTransactionFilterer()
groupFilteredOutProcessors := s.buildFilteredOutProcessor()
// intentionally skip filtered out processor
groupFilteredOutProcessors := newGroupTransactionProcessors(nil, nil, nil)
loaders, groupTransactionProcessors := s.buildTransactionProcessor(ledgersProcessor)

startTime := time.Now()
Expand Down Expand Up @@ -554,16 +555,16 @@ func (s *ProcessorRunner) flushProcessors(groupFilteredOutProcessors *groupTrans
defer s.session.Rollback()
}

if s.config.EnableIngestionFiltering {
if err := groupFilteredOutProcessors.Flush(s.ctx, s.session); err != nil {
return errors.Wrap(err, "Error flushing temp filtered tx from processor")
}

if err := groupFilteredOutProcessors.Flush(s.ctx, s.session); err != nil {
return errors.Wrap(err, "Error flushing temp filtered tx from processor")
}
if time.Since(s.lastTransactionsTmpGC) > transactionsFilteredTmpGCPeriod {
if _, err := s.historyQ.DeleteTransactionsFilteredTmpOlderThan(s.ctx, uint64(transactionsFilteredTmpGCPeriod.Seconds())); err != nil {
return errors.Wrap(err, "Error trimming filtered transactions")
}
if !groupFilteredOutProcessors.IsEmpty() &&
time.Since(s.lastTransactionsTmpGC) > transactionsFilteredTmpGCPeriod {
if _, err := s.historyQ.DeleteTransactionsFilteredTmpOlderThan(s.ctx, uint64(transactionsFilteredTmpGCPeriod.Seconds())); err != nil {
return errors.Wrap(err, "Error trimming filtered transactions")
}
s.lastTransactionsTmpGC = time.Now()
}

if err := groupTransactionProcessors.Flush(s.ctx, s.session); err != nil {
Expand Down
Loading

0 comments on commit f2cd035

Please sign in to comment.