diff --git a/cmd/soroban-rpc/internal/db/db.go b/cmd/soroban-rpc/internal/db/db.go index 227e2115..e922ea35 100644 --- a/cmd/soroban-rpc/internal/db/db.go +++ b/cmd/soroban-rpc/internal/db/db.go @@ -37,6 +37,7 @@ type ReadWriter interface { type WriteTx interface { TransactionWriter() TransactionWriter + EventWriter() EventWriter LedgerEntryWriter() LedgerEntryWriter LedgerWriter() LedgerWriter @@ -226,6 +227,12 @@ func (rw *readWriter) NewTx(ctx context.Context) (WriteTx, error) { stmtCache: stmtCache, passphrase: rw.passphrase, }, + eventWriter: eventHandler{ + log: rw.log, + db: txSession, + stmtCache: stmtCache, + passphrase: rw.passphrase, + }, } writer.txWriter.RegisterMetrics( rw.metrics.TxIngestDuration, @@ -242,6 +249,7 @@ type writeTx struct { ledgerEntryWriter ledgerEntryWriter ledgerWriter ledgerWriter txWriter transactionHandler + eventWriter eventHandler ledgerRetentionWindow uint32 } @@ -257,6 +265,10 @@ func (w writeTx) TransactionWriter() TransactionWriter { return &w.txWriter } +func (w writeTx) EventWriter() EventWriter { + return &w.eventWriter +} + func (w writeTx) Commit(ledgerSeq uint32) error { if err := w.ledgerEntryWriter.flush(); err != nil { return err diff --git a/cmd/soroban-rpc/internal/db/event.go b/cmd/soroban-rpc/internal/db/event.go new file mode 100644 index 00000000..df249003 --- /dev/null +++ b/cmd/soroban-rpc/internal/db/event.go @@ -0,0 +1,99 @@ +package db + +import ( + sq "github.com/Masterminds/squirrel" + "github.com/prometheus/client_golang/prometheus" + "github.com/stellar/go/ingest" + "github.com/stellar/go/support/db" + "github.com/stellar/go/support/errors" + "github.com/stellar/go/support/log" + "github.com/stellar/go/xdr" + "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/events" + "io" +) + +const eventTableName = "events" + +type EventWriter interface { + InsertEvents(lcm xdr.LedgerCloseMeta) error +} + +type EventReader interface { + GetEvents(lcm xdr.LedgerCloseMeta) error +} + +type eventHandler struct { + log *log.Entry + db db.SessionInterface + stmtCache *sq.StmtCache + passphrase string + ingestMetric, countMetric prometheus.Observer +} + +func (eventHandler *eventHandler) InsertEvents(lcm xdr.LedgerCloseMeta) error { + txCount := lcm.CountTransactions() + + if eventHandler.stmtCache == nil { + return errors.New("EventWriter incorrectly initialized without stmtCache") + } else if txCount == 0 { + return nil + } + + var txReader *ingest.LedgerTransactionReader + txReader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(eventHandler.passphrase, lcm) + if err != nil { + return errors.Wrapf(err, + "failed to open transaction reader for ledger %d", + lcm.LedgerSequence()) + } + defer func() { + closeErr := txReader.Close() + if err == nil { + err = closeErr + } + }() + + for { + var tx ingest.LedgerTransaction + tx, err = txReader.Read() + if err == io.EOF { + err = nil + break + } + if err != nil { + return err + } + + if !tx.Result.Successful() { + continue + } + + txEvents, err := tx.GetDiagnosticEvents() + if err != nil { + return err + } + + if len(txEvents) == 0 { + continue + } + + query := sq.Insert(eventTableName). + Columns("id", "ledger_sequence", "application_order", "contract_id", "event_type") + + for index, e := range txEvents { + var contractId []byte + if e.Event.ContractId != nil { + contractId = e.Event.ContractId[:] + } + id := events.Cursor{Ledger: lcm.LedgerSequence(), Tx: tx.Index, Op: 0, Event: uint32(index)}.String() + query = query.Values(id, lcm.LedgerSequence(), tx.Index, contractId, int(e.Event.Type)) + } + + _, err = query.RunWith(eventHandler.stmtCache).Exec() + if err != nil { + return err + } + } + + return nil +} diff --git a/cmd/soroban-rpc/internal/db/event_test.go b/cmd/soroban-rpc/internal/db/event_test.go new file mode 100644 index 00000000..eb272ddb --- /dev/null +++ b/cmd/soroban-rpc/internal/db/event_test.go @@ -0,0 +1,167 @@ +package db + +import ( + "context" + "github.com/sirupsen/logrus" + "github.com/stellar/go/keypair" + "github.com/stellar/go/network" + "github.com/stellar/go/support/log" + "github.com/stellar/go/xdr" + "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/daemon/interfaces" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +func transactionMetaWithEvents(events ...xdr.ContractEvent) xdr.TransactionMeta { + return xdr.TransactionMeta{ + V: 3, + Operations: &[]xdr.OperationMeta{}, + V3: &xdr.TransactionMetaV3{ + SorobanMeta: &xdr.SorobanTransactionMeta{ + Events: events, + }, + }, + } +} + +func contractEvent(contractID xdr.Hash, topic []xdr.ScVal, body xdr.ScVal) xdr.ContractEvent { + return xdr.ContractEvent{ + ContractId: &contractID, + Type: xdr.ContractEventTypeContract, + Body: xdr.ContractEventBody{ + V: 0, + V0: &xdr.ContractEventV0{ + Topics: topic, + Data: body, + }, + }, + } +} + +func ledgerCloseMetaWithEvents(sequence uint32, closeTimestamp int64, txMeta ...xdr.TransactionMeta) xdr.LedgerCloseMeta { + var txProcessing []xdr.TransactionResultMeta + var phases []xdr.TransactionPhase + + for _, item := range txMeta { + var operations []xdr.Operation + for range item.MustV3().SorobanMeta.Events { + operations = append(operations, + xdr.Operation{ + Body: xdr.OperationBody{ + Type: xdr.OperationTypeInvokeHostFunction, + InvokeHostFunctionOp: &xdr.InvokeHostFunctionOp{ + HostFunction: xdr.HostFunction{ + Type: xdr.HostFunctionTypeHostFunctionTypeInvokeContract, + InvokeContract: &xdr.InvokeContractArgs{ + ContractAddress: xdr.ScAddress{ + Type: xdr.ScAddressTypeScAddressTypeContract, + ContractId: &xdr.Hash{0x1, 0x2}, + }, + FunctionName: "foo", + Args: nil, + }, + }, + Auth: []xdr.SorobanAuthorizationEntry{}, + }, + }, + }) + } + envelope := xdr.TransactionEnvelope{ + Type: xdr.EnvelopeTypeEnvelopeTypeTx, + V1: &xdr.TransactionV1Envelope{ + Tx: xdr.Transaction{ + SourceAccount: xdr.MustMuxedAddress(keypair.MustRandom().Address()), + Operations: operations, + }, + }, + } + txHash, err := network.HashTransactionInEnvelope(envelope, network.FutureNetworkPassphrase) + if err != nil { + panic(err) + } + + txProcessing = append(txProcessing, xdr.TransactionResultMeta{ + TxApplyProcessing: item, + Result: xdr.TransactionResultPair{ + TransactionHash: txHash, + }, + }) + components := []xdr.TxSetComponent{ + { + Type: xdr.TxSetComponentTypeTxsetCompTxsMaybeDiscountedFee, + TxsMaybeDiscountedFee: &xdr.TxSetComponentTxsMaybeDiscountedFee{ + Txs: []xdr.TransactionEnvelope{ + envelope, + }, + }, + }, + } + phases = append(phases, xdr.TransactionPhase{ + V: 0, + V0Components: &components, + }) + } + + return xdr.LedgerCloseMeta{ + V: 1, + V1: &xdr.LedgerCloseMetaV1{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Hash: xdr.Hash{}, + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: xdr.TimePoint(closeTimestamp), + }, + LedgerSeq: xdr.Uint32(sequence), + }, + }, + TxSet: xdr.GeneralizedTransactionSet{ + V: 1, + V1TxSet: &xdr.TransactionSetV1{ + PreviousLedgerHash: xdr.Hash{}, + Phases: phases, + }, + }, + TxProcessing: txProcessing, + }, + } +} + +func TestInsertEvents(t *testing.T) { + db := NewTestDB(t) + ctx := context.TODO() + log := log.DefaultLogger + log.SetLevel(logrus.TraceLevel) + now := time.Now().UTC() + + writer := NewReadWriter(log, db, interfaces.MakeNoOpDeamon(), 10, 10, passphrase) + write, err := writer.NewTx(ctx) + require.NoError(t, err) + contractID := xdr.Hash([32]byte{}) + counter := xdr.ScSymbol("COUNTER") + + var txMeta []xdr.TransactionMeta + for i := 0; i < 10; i++ { + txMeta = append(txMeta, transactionMetaWithEvents( + contractEvent( + contractID, + xdr.ScVec{xdr.ScVal{ + Type: xdr.ScValTypeScvSymbol, + Sym: &counter, + }}, + xdr.ScVal{ + Type: xdr.ScValTypeScvSymbol, + Sym: &counter, + }, + ), + )) + } + ledgerCloseMeta := ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...) + + eventW := write.EventWriter() + err = eventW.InsertEvents(ledgerCloseMeta) + assert.NoError(t, err) + + //TODO: Call getEvents and validate events data. +} diff --git a/cmd/soroban-rpc/internal/db/migrations/03_events.sql b/cmd/soroban-rpc/internal/db/migrations/03_events.sql new file mode 100644 index 00000000..d8040575 --- /dev/null +++ b/cmd/soroban-rpc/internal/db/migrations/03_events.sql @@ -0,0 +1,16 @@ +-- +migrate Up + +-- indexing table to find events in ledgers by contract_id +CREATE TABLE events( + id TEXT PRIMARY KEY, + ledger_sequence INTEGER NOT NULL, + application_order INTEGER NOT NULL, + contract_id BLOB, + event_type INTEGER NOT NULL +); + +CREATE INDEX idx_ledger_sequence ON events(ledger_sequence); +CREATE INDEX idx_contract_id ON events(contract_id); + +-- +migrate Down +drop table events cascade; diff --git a/cmd/soroban-rpc/internal/ingest/mock_db_test.go b/cmd/soroban-rpc/internal/ingest/mock_db_test.go index 6e57658d..7c1ecd12 100644 --- a/cmd/soroban-rpc/internal/ingest/mock_db_test.go +++ b/cmd/soroban-rpc/internal/ingest/mock_db_test.go @@ -36,6 +36,11 @@ type MockTx struct { mock.Mock } +func (m MockTx) EventWriter() db.EventWriter { + args := m.Called() + return args.Get(0).(db.EventWriter) +} + func (m MockTx) LedgerEntryWriter() db.LedgerEntryWriter { args := m.Called() return args.Get(0).(db.LedgerEntryWriter) @@ -96,3 +101,12 @@ func (m MockTransactionWriter) InsertTransactions(ledger xdr.LedgerCloseMeta) er func (m MockTransactionWriter) RegisterMetrics(ingest, count prometheus.Observer) { m.Called(ingest, count) } + +type MockEventWriter struct { + mock.Mock +} + +func (m MockEventWriter) InsertEvents(ledger xdr.LedgerCloseMeta) error { + args := m.Called(ledger) + return args.Error(0) +} diff --git a/cmd/soroban-rpc/internal/ingest/service.go b/cmd/soroban-rpc/internal/ingest/service.go index 54ccbef4..16285100 100644 --- a/cmd/soroban-rpc/internal/ingest/service.go +++ b/cmd/soroban-rpc/internal/ingest/service.go @@ -337,6 +337,10 @@ func (s *Service) ingestLedgerCloseMeta(tx db.WriteTx, ledgerCloseMeta xdr.Ledge With(prometheus.Labels{"type": "transactions"}). Observe(time.Since(startTime).Seconds()) + if err := tx.EventWriter().InsertEvents(ledgerCloseMeta); err != nil { + return err + } + if err := s.eventStore.IngestEvents(ledgerCloseMeta); err != nil { return err } diff --git a/cmd/soroban-rpc/internal/ingest/service_test.go b/cmd/soroban-rpc/internal/ingest/service_test.go index 78f42494..10090c16 100644 --- a/cmd/soroban-rpc/internal/ingest/service_test.go +++ b/cmd/soroban-rpc/internal/ingest/service_test.go @@ -81,6 +81,7 @@ func TestIngestion(t *testing.T) { mockLedgerEntryWriter := &MockLedgerEntryWriter{} mockLedgerWriter := &MockLedgerWriter{} mockTxWriter := &MockTransactionWriter{} + mockEventWriter := &MockEventWriter{} ctx := context.Background() mockDB.On("NewTx", ctx).Return(mockTx, nil).Once() mockTx.On("Commit", sequence).Return(nil).Once() @@ -88,6 +89,7 @@ func TestIngestion(t *testing.T) { mockTx.On("LedgerEntryWriter").Return(mockLedgerEntryWriter).Twice() mockTx.On("LedgerWriter").Return(mockLedgerWriter).Once() mockTx.On("TransactionWriter").Return(mockTxWriter).Once() + mockTx.On("EventWriter").Return(mockEventWriter).Once() src := xdr.MustAddress("GBXGQJWVLWOYHFLVTKWV5FGHA3LNYY2JQKM7OAJAUEQFU6LPCSEFVXON") firstTx := xdr.TransactionEnvelope{ @@ -254,6 +256,7 @@ func TestIngestion(t *testing.T) { Return(nil).Once() mockLedgerWriter.On("InsertLedger", ledger).Return(nil).Once() mockTxWriter.On("InsertTransactions", ledger).Return(nil).Once() + mockEventWriter.On("InsertEvents", ledger).Return(nil).Once() assert.NoError(t, service.ingest(ctx, sequence)) mockDB.AssertExpectations(t)