Skip to content

Commit

Permalink
services/horizon: Add DISABLE_SOROBAN_INGEST flag to skip soroban ing…
Browse files Browse the repository at this point in the history
…estion processing (stellar#5176)
  • Loading branch information
urvisavla authored and sreuland committed Jan 27, 2024
1 parent 431009b commit 67ba6a4
Show file tree
Hide file tree
Showing 18 changed files with 608 additions and 261 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/horizon.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ jobs:
key: ${{ env.COMBINED_SOURCE_HASH }}

- if: ${{ steps.horizon_binary_tests_hash.outputs.cache-hit != 'true' }}
run: go test -race -timeout 45m -v ./services/horizon/internal/integration/...
run: go test -race -timeout 65m -v ./services/horizon/internal/integration/...

- name: Save Horizon binary and integration tests source hash to cache
if: ${{ success() && steps.horizon_binary_tests_hash.outputs.cache-hit != 'true' }}
Expand Down
5 changes: 5 additions & 0 deletions services/horizon/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ file. This project adheres to [Semantic Versioning](http://semver.org/).
- We now include metrics for history archive requests ([5166](https://github.com/stellar/go/pull/5166))
- Http history archive requests now include a unique user agent ([5166](https://github.com/stellar/go/pull/5166))
- Added a deprecation warning for using command-line flags when running Horizon ([5051](https://github.com/stellar/go/pull/5051))
- New optional config `DISABLE_SOROBAN_INGEST` ([5175](https://github.com/stellar/go/issues/5175)). Defaults to `FALSE`, when `TRUE` and a soroban transaction is ingested, the following will occur:
* no effects will be generated for contract invocations.
* history_transactions.tx_meta column will have serialized xdr that equates to an empty `xdr.TransactionMeta.V3`, `Operations`, `TxChangesAfter`, `TxChangesBefore` will empty arrays and `SorobanMeta` will be nil.
* API transaction model for `result_meta_xdr` will have same empty serialized xdr for `xdr.TransactionMeta.V3`, `Operations`, `TxChangesAfter`, `TxChangesBefore` will empty arrays and `SorobanMeta` will be nil.
* API `Operation` model for `InvokeHostFunctionOp` type, will have empty `asset_balance_changes`

### Breaking Changes
- Removed configuration flags `--stellar-core-url-db`, `--cursor-name` `--skip-cursor-update` , they were related to legacy non-captive core ingestion and are no longer usable.
Expand Down
1 change: 1 addition & 0 deletions services/horizon/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,7 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool,
RoundingSlippageFilter: config.RoundingSlippageFilter,
EnableIngestionFiltering: config.EnableIngestionFiltering,
MaxLedgerPerFlush: maxLedgersPerFlush,
SkipSorobanIngestion: config.SkipSorobanIngestion,
}

if ingestConfig.HistorySession, err = db.Open("postgres", config.DatabaseURL); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions services/horizon/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,6 @@ type Config struct {
Network string
// DisableTxSub disables transaction submission functionality for Horizon.
DisableTxSub bool
// SkipSorobanIngestion skips Soroban related ingestion processing.
SkipSorobanIngestion bool
}
11 changes: 11 additions & 0 deletions services/horizon/internal/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ const (
EnableIngestionFilteringFlagName = "exp-enable-ingestion-filtering"
// DisableTxSubFlagName is the command line flag for disabling transaction submission feature of Horizon
DisableTxSubFlagName = "disable-tx-sub"
// SkipSorobanIngestionFlagName is the command line flag for disabling Soroban related ingestion processing
SkipSorobanIngestionFlagName = "disable-soroban-ingest"

// StellarPubnet is a constant representing the Stellar public network
StellarPubnet = "pubnet"
Expand Down Expand Up @@ -730,6 +732,15 @@ func Flags() (*Config, support.ConfigOptions) {
HistoryArchiveURLsFlagName, CaptiveCoreConfigPathName),
UsedInCommands: IngestionCommands,
},
&support.ConfigOption{
Name: SkipSorobanIngestionFlagName,
ConfigKey: &config.SkipSorobanIngestion,
OptType: types.Bool,
FlagDefault: false,
Required: false,
Usage: "excludes Soroban data during ingestion processing",
UsedInCommands: IngestionCommands,
},
}

return config, flags
Expand Down
2 changes: 2 additions & 0 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ type Config struct {

EnableIngestionFiltering bool
MaxLedgerPerFlush uint32

SkipSorobanIngestion bool
}

const (
Expand Down
14 changes: 10 additions & 4 deletions services/horizon/internal/ingest/processor_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func buildChangeProcessor(
source ingestionSource,
ledgerSequence uint32,
networkPassphrase string,
skipSorobanIngestion bool,
) *groupChangeProcessors {
statsChangeProcessor := &statsChangeProcessor{
StatsChangeProcessor: changeStats,
Expand Down Expand Up @@ -144,13 +145,13 @@ func (s *ProcessorRunner) buildTransactionProcessor(ledgersProcessor *processors

processors := []horizonTransactionProcessor{
statsLedgerTransactionProcessor,
processors.NewEffectProcessor(accountLoader, s.historyQ.NewEffectBatchInsertBuilder(), s.config.NetworkPassphrase),
processors.NewEffectProcessor(accountLoader, s.historyQ.NewEffectBatchInsertBuilder(), s.config.NetworkPassphrase, s.config.SkipSorobanIngestion),
ledgersProcessor,
processors.NewOperationProcessor(s.historyQ.NewOperationBatchInsertBuilder(), s.config.NetworkPassphrase),
processors.NewOperationProcessor(s.historyQ.NewOperationBatchInsertBuilder(), s.config.NetworkPassphrase, s.config.SkipSorobanIngestion),
tradeProcessor,
processors.NewParticipantsProcessor(accountLoader,
s.historyQ.NewTransactionParticipantsBatchInsertBuilder(), s.historyQ.NewOperationParticipantBatchInsertBuilder()),
processors.NewTransactionProcessor(s.historyQ.NewTransactionBatchInsertBuilder()),
processors.NewTransactionProcessor(s.historyQ.NewTransactionBatchInsertBuilder(), s.config.SkipSorobanIngestion),
processors.NewClaimableBalancesTransactionProcessor(cbLoader,
s.historyQ.NewTransactionClaimableBalanceBatchInsertBuilder(), s.historyQ.NewOperationClaimableBalanceBatchInsertBuilder()),
processors.NewLiquidityPoolsTransactionProcessor(lpLoader,
Expand All @@ -172,7 +173,10 @@ func (s *ProcessorRunner) buildFilteredOutProcessor() *groupTransactionProcessor
// when in online mode, the submission result processor must always run (regardless of filtering)
var p []horizonTransactionProcessor
if s.config.EnableIngestionFiltering {
txSubProc := processors.NewTransactionFilteredTmpProcessor(s.historyQ.NewTransactionFilteredTmpBatchInsertBuilder())
txSubProc := processors.NewTransactionFilteredTmpProcessor(
s.historyQ.NewTransactionFilteredTmpBatchInsertBuilder(),
s.config.SkipSorobanIngestion,
)
p = append(p, txSubProc)
}

Expand Down Expand Up @@ -235,6 +239,7 @@ func (s *ProcessorRunner) RunHistoryArchiveIngestion(
historyArchiveSource,
checkpointLedger,
s.config.NetworkPassphrase,
s.config.SkipSorobanIngestion,
)

if checkpointLedger == 1 {
Expand Down Expand Up @@ -493,6 +498,7 @@ func (s *ProcessorRunner) RunAllProcessorsOnLedger(ledger xdr.LedgerCloseMeta) (
ledgerSource,
ledger.LedgerSequence(),
s.config.NetworkPassphrase,
s.config.SkipSorobanIngestion,
)
err = s.runChangeProcessorOnLedger(groupChangeProcessors, ledger)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions services/horizon/internal/ingest/processor_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func TestProcessorRunnerBuildChangeProcessor(t *testing.T) {
}

stats := &ingest.StatsChangeProcessor{}
processor := buildChangeProcessor(runner.historyQ, stats, ledgerSource, 123, "")
processor := buildChangeProcessor(runner.historyQ, stats, ledgerSource, 123, "", false)
assert.IsType(t, &groupChangeProcessors{}, processor)

assert.IsType(t, &statsChangeProcessor{}, processor.processors[0])
Expand All @@ -201,7 +201,7 @@ func TestProcessorRunnerBuildChangeProcessor(t *testing.T) {
filters: &MockFilters{},
}

processor = buildChangeProcessor(runner.historyQ, stats, historyArchiveSource, 456, "")
processor = buildChangeProcessor(runner.historyQ, stats, historyArchiveSource, 456, "", false)
assert.IsType(t, &groupChangeProcessors{}, processor)

assert.IsType(t, &statsChangeProcessor{}, processor.processors[0])
Expand Down Expand Up @@ -271,6 +271,7 @@ func TestProcessorRunnerWithFilterEnabled(t *testing.T) {
config := Config{
NetworkPassphrase: network.PublicNetworkPassphrase,
EnableIngestionFiltering: true,
SkipSorobanIngestion: false,
}

q := &mockDBQ{}
Expand Down
22 changes: 20 additions & 2 deletions services/horizon/internal/ingest/processors/effects_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,20 @@ type EffectProcessor struct {
accountLoader *history.AccountLoader
batch history.EffectBatchInsertBuilder
network string
skipSoroban bool
}

func NewEffectProcessor(
accountLoader *history.AccountLoader,
batch history.EffectBatchInsertBuilder,
network string,
skipSoroban bool,
) *EffectProcessor {
return &EffectProcessor{
accountLoader: accountLoader,
batch: batch,
network: network,
skipSoroban: skipSoroban,
}
}

Expand All @@ -50,14 +53,29 @@ func (p *EffectProcessor) ProcessTransaction(
return nil
}

for opi, op := range transaction.Envelope.Operations() {
elidedTransaction := transaction

if p.skipSoroban &&
elidedTransaction.UnsafeMeta.V == 3 &&
elidedTransaction.UnsafeMeta.V3.SorobanMeta != nil {
elidedTransaction.UnsafeMeta.V3 = &xdr.TransactionMetaV3{
Ext: xdr.ExtensionPoint{},
TxChangesBefore: xdr.LedgerEntryChanges{},
Operations: []xdr.OperationMeta{},
TxChangesAfter: xdr.LedgerEntryChanges{},
SorobanMeta: nil,
}
}

for opi, op := range elidedTransaction.Envelope.Operations() {
operation := transactionOperationWrapper{
index: uint32(opi),
transaction: transaction,
transaction: elidedTransaction,
operation: op,
ledgerSequence: uint32(lcm.LedgerSequence()),
network: p.network,
}

if err := operation.ingestEffects(p.accountLoader, p.batch); err != nil {
return errors.Wrapf(err, "reading operation %v effects", operation.ID())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func (s *EffectsProcessorTestSuiteLedger) SetupTest() {
s.accountLoader,
s.mockBatchInsertBuilder,
networkPassphrase,
false,
)

s.txs = []ingest.LedgerTransaction{
Expand Down
52 changes: 34 additions & 18 deletions services/horizon/internal/ingest/processors/operations_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,29 @@ import (

// OperationProcessor operations processor
type OperationProcessor struct {
batch history.OperationBatchInsertBuilder
network string
batch history.OperationBatchInsertBuilder
network string
skipSoroban bool
}

func NewOperationProcessor(batch history.OperationBatchInsertBuilder, network string) *OperationProcessor {
func NewOperationProcessor(batch history.OperationBatchInsertBuilder, network string, skipSoroban bool) *OperationProcessor {
return &OperationProcessor{
batch: batch,
network: network,
batch: batch,
network: network,
skipSoroban: skipSoroban,
}
}

// ProcessTransaction process the given transaction
func (p *OperationProcessor) ProcessTransaction(lcm xdr.LedgerCloseMeta, transaction ingest.LedgerTransaction) error {
for i, op := range transaction.Envelope.Operations() {
operation := transactionOperationWrapper{
index: uint32(i),
transaction: transaction,
operation: op,
ledgerSequence: lcm.LedgerSequence(),
network: p.network,
index: uint32(i),
transaction: transaction,
operation: op,
ledgerSequence: lcm.LedgerSequence(),
network: p.network,
skipSorobanDetails: p.skipSoroban,
}
details, err := operation.Details()
if err != nil {
Expand Down Expand Up @@ -82,11 +85,12 @@ func (p *OperationProcessor) Flush(ctx context.Context, session db.SessionInterf

// transactionOperationWrapper represents the data for a single operation within a transaction
type transactionOperationWrapper struct {
index uint32
transaction ingest.LedgerTransaction
operation xdr.Operation
ledgerSequence uint32
network string
index uint32
transaction ingest.LedgerTransaction
operation xdr.Operation
ledgerSequence uint32
network string
skipSorobanDetails bool
}

// ID returns the ID for the operation.
Expand Down Expand Up @@ -266,6 +270,11 @@ func (operation *transactionOperationWrapper) IsPayment() bool {
case xdr.OperationTypeAccountMerge:
return true
case xdr.OperationTypeInvokeHostFunction:
// #5175, may want to consider skipping this parsing of payment from contracts
// as part of eliding soroban ingestion aspects when DISABLE_SOROBAN_INGEST.
// but, may cause inconsistencies that aren't worth the gain,
// as payments won't be thoroughly accurate, i.e. a payment could have
// happened within a contract invoke.
diagnosticEvents, err := operation.transaction.GetDiagnosticEvents()
if err != nil {
return false
Expand Down Expand Up @@ -689,11 +698,18 @@ func (operation *transactionOperationWrapper) Details() (map[string]interface{},
}
details["parameters"] = params

if balanceChanges, err := operation.parseAssetBalanceChangesFromContractEvents(); err != nil {
return nil, err
var balanceChanges []map[string]interface{}
var parseErr error
if operation.skipSorobanDetails {
// https://github.com/stellar/go/issues/5175
// intentionally toggle off parsing soroban meta into "asset_balance_changes"
balanceChanges = make([]map[string]interface{}, 0)
} else {
details["asset_balance_changes"] = balanceChanges
if balanceChanges, parseErr = operation.parseAssetBalanceChangesFromContractEvents(); parseErr != nil {
return nil, parseErr
}
}
details["asset_balance_changes"] = balanceChanges

case xdr.HostFunctionTypeHostFunctionTypeCreateContract:
args := op.HostFunction.MustCreateContract()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (s *OperationsProcessorTestSuiteLedger) SetupTest() {
s.processor = NewOperationProcessor(
s.mockBatchInsertBuilder,
"test network",
false,
)
}

Expand Down Expand Up @@ -375,6 +376,65 @@ func (s *OperationsProcessorTestSuiteLedger) TestOperationTypeInvokeHostFunction
}
s.Assert().Equal(found, 4, "should have one balance changed record for each of mint, burn, clawback, transfer")
})

s.T().Run("InvokeContractAssetBalancesElidedFromDetails", func(t *testing.T) {
randomIssuer := keypair.MustRandom()
randomAsset := xdr.MustNewCreditAsset("TESTING", randomIssuer.Address())
passphrase := "passphrase"
randomAccount := keypair.MustRandom().Address()
contractId := [32]byte{}
zeroContractStrKey, err := strkey.Encode(strkey.VersionByteContract, contractId[:])
s.Assert().NoError(err)

transferContractEvent := contractevents.GenerateEvent(contractevents.EventTypeTransfer, randomAccount, zeroContractStrKey, "", randomAsset, big.NewInt(10000000), passphrase)
burnContractEvent := contractevents.GenerateEvent(contractevents.EventTypeBurn, zeroContractStrKey, "", "", randomAsset, big.NewInt(10000000), passphrase)
mintContractEvent := contractevents.GenerateEvent(contractevents.EventTypeMint, "", zeroContractStrKey, randomAccount, randomAsset, big.NewInt(10000000), passphrase)
clawbackContractEvent := contractevents.GenerateEvent(contractevents.EventTypeClawback, zeroContractStrKey, "", randomAccount, randomAsset, big.NewInt(10000000), passphrase)

tx = ingest.LedgerTransaction{
UnsafeMeta: xdr.TransactionMeta{
V: 3,
V3: &xdr.TransactionMetaV3{
SorobanMeta: &xdr.SorobanTransactionMeta{
Events: []xdr.ContractEvent{
transferContractEvent,
burnContractEvent,
mintContractEvent,
clawbackContractEvent,
},
},
},
},
}
wrapper := transactionOperationWrapper{
skipSorobanDetails: true,
transaction: tx,
operation: xdr.Operation{
SourceAccount: &source,
Body: xdr.OperationBody{
Type: xdr.OperationTypeInvokeHostFunction,
InvokeHostFunctionOp: &xdr.InvokeHostFunctionOp{
HostFunction: xdr.HostFunction{
Type: xdr.HostFunctionTypeHostFunctionTypeInvokeContract,
InvokeContract: &xdr.InvokeContractArgs{
ContractAddress: xdr.ScAddress{
Type: xdr.ScAddressTypeScAddressTypeContract,
ContractId: &xdr.Hash{0x1, 0x2},
},
FunctionName: "foo",
Args: xdr.ScVec{},
},
},
},
},
},
network: passphrase,
}

details, err := wrapper.Details()
s.Assert().NoError(err)
s.Assert().Len(details["asset_balance_changes"], 0, "for invokehostfn op, no asset balances should be in details when skip soroban is enabled")
})
}

func (s *OperationsProcessorTestSuiteLedger) assertInvokeHostFunctionParameter(parameters []map[string]string, paramPosition int, expectedType string, expectedVal xdr.ScVal) {
Expand Down
Loading

0 comments on commit 67ba6a4

Please sign in to comment.