Skip to content

Commit

Permalink
Ingest events into DB (#192)
Browse files Browse the repository at this point in the history
* Ingest events into DB

* Update tests

* Update tests

* Fix tests

* Ignore ingestion when events are empty
  • Loading branch information
psheth9 authored Jun 18, 2024
1 parent 4c9fdda commit 3121178
Show file tree
Hide file tree
Showing 7 changed files with 315 additions and 0 deletions.
12 changes: 12 additions & 0 deletions cmd/soroban-rpc/internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type ReadWriter interface {

type WriteTx interface {
TransactionWriter() TransactionWriter
EventWriter() EventWriter
LedgerEntryWriter() LedgerEntryWriter
LedgerWriter() LedgerWriter

Expand Down Expand Up @@ -226,6 +227,12 @@ func (rw *readWriter) NewTx(ctx context.Context) (WriteTx, error) {
stmtCache: stmtCache,
passphrase: rw.passphrase,
},
eventWriter: eventHandler{
log: rw.log,
db: txSession,
stmtCache: stmtCache,
passphrase: rw.passphrase,
},
}
writer.txWriter.RegisterMetrics(
rw.metrics.TxIngestDuration,
Expand All @@ -242,6 +249,7 @@ type writeTx struct {
ledgerEntryWriter ledgerEntryWriter
ledgerWriter ledgerWriter
txWriter transactionHandler
eventWriter eventHandler
ledgerRetentionWindow uint32
}

Expand All @@ -257,6 +265,10 @@ func (w writeTx) TransactionWriter() TransactionWriter {
return &w.txWriter
}

func (w writeTx) EventWriter() EventWriter {
return &w.eventWriter
}

func (w writeTx) Commit(ledgerSeq uint32) error {
if err := w.ledgerEntryWriter.flush(); err != nil {
return err
Expand Down
99 changes: 99 additions & 0 deletions cmd/soroban-rpc/internal/db/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package db

import (
sq "github.com/Masterminds/squirrel"
"github.com/prometheus/client_golang/prometheus"
"github.com/stellar/go/ingest"
"github.com/stellar/go/support/db"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/events"
"io"
)

const eventTableName = "events"

type EventWriter interface {
InsertEvents(lcm xdr.LedgerCloseMeta) error
}

type EventReader interface {
GetEvents(lcm xdr.LedgerCloseMeta) error
}

type eventHandler struct {
log *log.Entry
db db.SessionInterface
stmtCache *sq.StmtCache
passphrase string
ingestMetric, countMetric prometheus.Observer
}

func (eventHandler *eventHandler) InsertEvents(lcm xdr.LedgerCloseMeta) error {
txCount := lcm.CountTransactions()

if eventHandler.stmtCache == nil {
return errors.New("EventWriter incorrectly initialized without stmtCache")
} else if txCount == 0 {
return nil
}

var txReader *ingest.LedgerTransactionReader
txReader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(eventHandler.passphrase, lcm)
if err != nil {
return errors.Wrapf(err,
"failed to open transaction reader for ledger %d",
lcm.LedgerSequence())
}
defer func() {
closeErr := txReader.Close()
if err == nil {
err = closeErr
}
}()

for {
var tx ingest.LedgerTransaction
tx, err = txReader.Read()
if err == io.EOF {
err = nil
break
}
if err != nil {
return err
}

if !tx.Result.Successful() {
continue
}

txEvents, err := tx.GetDiagnosticEvents()
if err != nil {
return err
}

if len(txEvents) == 0 {
continue
}

query := sq.Insert(eventTableName).
Columns("id", "ledger_sequence", "application_order", "contract_id", "event_type")

for index, e := range txEvents {
var contractId []byte
if e.Event.ContractId != nil {
contractId = e.Event.ContractId[:]
}
id := events.Cursor{Ledger: lcm.LedgerSequence(), Tx: tx.Index, Op: 0, Event: uint32(index)}.String()
query = query.Values(id, lcm.LedgerSequence(), tx.Index, contractId, int(e.Event.Type))
}

_, err = query.RunWith(eventHandler.stmtCache).Exec()
if err != nil {
return err
}
}

return nil
}
167 changes: 167 additions & 0 deletions cmd/soroban-rpc/internal/db/event_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package db

import (
"context"
"github.com/sirupsen/logrus"
"github.com/stellar/go/keypair"
"github.com/stellar/go/network"
"github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/daemon/interfaces"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"testing"
"time"
)

func transactionMetaWithEvents(events ...xdr.ContractEvent) xdr.TransactionMeta {
return xdr.TransactionMeta{
V: 3,
Operations: &[]xdr.OperationMeta{},
V3: &xdr.TransactionMetaV3{
SorobanMeta: &xdr.SorobanTransactionMeta{
Events: events,
},
},
}
}

func contractEvent(contractID xdr.Hash, topic []xdr.ScVal, body xdr.ScVal) xdr.ContractEvent {
return xdr.ContractEvent{
ContractId: &contractID,
Type: xdr.ContractEventTypeContract,
Body: xdr.ContractEventBody{
V: 0,
V0: &xdr.ContractEventV0{
Topics: topic,
Data: body,
},
},
}
}

func ledgerCloseMetaWithEvents(sequence uint32, closeTimestamp int64, txMeta ...xdr.TransactionMeta) xdr.LedgerCloseMeta {
var txProcessing []xdr.TransactionResultMeta
var phases []xdr.TransactionPhase

for _, item := range txMeta {
var operations []xdr.Operation
for range item.MustV3().SorobanMeta.Events {
operations = append(operations,
xdr.Operation{
Body: xdr.OperationBody{
Type: xdr.OperationTypeInvokeHostFunction,
InvokeHostFunctionOp: &xdr.InvokeHostFunctionOp{
HostFunction: xdr.HostFunction{
Type: xdr.HostFunctionTypeHostFunctionTypeInvokeContract,
InvokeContract: &xdr.InvokeContractArgs{
ContractAddress: xdr.ScAddress{
Type: xdr.ScAddressTypeScAddressTypeContract,
ContractId: &xdr.Hash{0x1, 0x2},
},
FunctionName: "foo",
Args: nil,
},
},
Auth: []xdr.SorobanAuthorizationEntry{},
},
},
})
}
envelope := xdr.TransactionEnvelope{
Type: xdr.EnvelopeTypeEnvelopeTypeTx,
V1: &xdr.TransactionV1Envelope{
Tx: xdr.Transaction{
SourceAccount: xdr.MustMuxedAddress(keypair.MustRandom().Address()),
Operations: operations,
},
},
}
txHash, err := network.HashTransactionInEnvelope(envelope, network.FutureNetworkPassphrase)
if err != nil {
panic(err)
}

txProcessing = append(txProcessing, xdr.TransactionResultMeta{
TxApplyProcessing: item,
Result: xdr.TransactionResultPair{
TransactionHash: txHash,
},
})
components := []xdr.TxSetComponent{
{
Type: xdr.TxSetComponentTypeTxsetCompTxsMaybeDiscountedFee,
TxsMaybeDiscountedFee: &xdr.TxSetComponentTxsMaybeDiscountedFee{
Txs: []xdr.TransactionEnvelope{
envelope,
},
},
},
}
phases = append(phases, xdr.TransactionPhase{
V: 0,
V0Components: &components,
})
}

return xdr.LedgerCloseMeta{
V: 1,
V1: &xdr.LedgerCloseMetaV1{
LedgerHeader: xdr.LedgerHeaderHistoryEntry{
Hash: xdr.Hash{},
Header: xdr.LedgerHeader{
ScpValue: xdr.StellarValue{
CloseTime: xdr.TimePoint(closeTimestamp),
},
LedgerSeq: xdr.Uint32(sequence),
},
},
TxSet: xdr.GeneralizedTransactionSet{
V: 1,
V1TxSet: &xdr.TransactionSetV1{
PreviousLedgerHash: xdr.Hash{},
Phases: phases,
},
},
TxProcessing: txProcessing,
},
}
}

func TestInsertEvents(t *testing.T) {
db := NewTestDB(t)
ctx := context.TODO()
log := log.DefaultLogger
log.SetLevel(logrus.TraceLevel)
now := time.Now().UTC()

writer := NewReadWriter(log, db, interfaces.MakeNoOpDeamon(), 10, 10, passphrase)
write, err := writer.NewTx(ctx)
require.NoError(t, err)
contractID := xdr.Hash([32]byte{})
counter := xdr.ScSymbol("COUNTER")

var txMeta []xdr.TransactionMeta
for i := 0; i < 10; i++ {
txMeta = append(txMeta, transactionMetaWithEvents(
contractEvent(
contractID,
xdr.ScVec{xdr.ScVal{
Type: xdr.ScValTypeScvSymbol,
Sym: &counter,
}},
xdr.ScVal{
Type: xdr.ScValTypeScvSymbol,
Sym: &counter,
},
),
))
}
ledgerCloseMeta := ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...)

eventW := write.EventWriter()
err = eventW.InsertEvents(ledgerCloseMeta)
assert.NoError(t, err)

//TODO: Call getEvents and validate events data.
}
16 changes: 16 additions & 0 deletions cmd/soroban-rpc/internal/db/migrations/03_events.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-- +migrate Up

-- indexing table to find events in ledgers by contract_id
CREATE TABLE events(
id TEXT PRIMARY KEY,
ledger_sequence INTEGER NOT NULL,
application_order INTEGER NOT NULL,
contract_id BLOB,
event_type INTEGER NOT NULL
);

CREATE INDEX idx_ledger_sequence ON events(ledger_sequence);
CREATE INDEX idx_contract_id ON events(contract_id);

-- +migrate Down
drop table events cascade;
14 changes: 14 additions & 0 deletions cmd/soroban-rpc/internal/ingest/mock_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ type MockTx struct {
mock.Mock
}

func (m MockTx) EventWriter() db.EventWriter {
args := m.Called()
return args.Get(0).(db.EventWriter)
}

func (m MockTx) LedgerEntryWriter() db.LedgerEntryWriter {
args := m.Called()
return args.Get(0).(db.LedgerEntryWriter)
Expand Down Expand Up @@ -96,3 +101,12 @@ func (m MockTransactionWriter) InsertTransactions(ledger xdr.LedgerCloseMeta) er
func (m MockTransactionWriter) RegisterMetrics(ingest, count prometheus.Observer) {
m.Called(ingest, count)
}

type MockEventWriter struct {
mock.Mock
}

func (m MockEventWriter) InsertEvents(ledger xdr.LedgerCloseMeta) error {
args := m.Called(ledger)
return args.Error(0)
}
4 changes: 4 additions & 0 deletions cmd/soroban-rpc/internal/ingest/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,10 @@ func (s *Service) ingestLedgerCloseMeta(tx db.WriteTx, ledgerCloseMeta xdr.Ledge
With(prometheus.Labels{"type": "transactions"}).
Observe(time.Since(startTime).Seconds())

if err := tx.EventWriter().InsertEvents(ledgerCloseMeta); err != nil {
return err
}

if err := s.eventStore.IngestEvents(ledgerCloseMeta); err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions cmd/soroban-rpc/internal/ingest/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,15 @@ func TestIngestion(t *testing.T) {
mockLedgerEntryWriter := &MockLedgerEntryWriter{}
mockLedgerWriter := &MockLedgerWriter{}
mockTxWriter := &MockTransactionWriter{}
mockEventWriter := &MockEventWriter{}
ctx := context.Background()
mockDB.On("NewTx", ctx).Return(mockTx, nil).Once()
mockTx.On("Commit", sequence).Return(nil).Once()
mockTx.On("Rollback").Return(nil).Once()
mockTx.On("LedgerEntryWriter").Return(mockLedgerEntryWriter).Twice()
mockTx.On("LedgerWriter").Return(mockLedgerWriter).Once()
mockTx.On("TransactionWriter").Return(mockTxWriter).Once()
mockTx.On("EventWriter").Return(mockEventWriter).Once()

src := xdr.MustAddress("GBXGQJWVLWOYHFLVTKWV5FGHA3LNYY2JQKM7OAJAUEQFU6LPCSEFVXON")
firstTx := xdr.TransactionEnvelope{
Expand Down Expand Up @@ -254,6 +256,7 @@ func TestIngestion(t *testing.T) {
Return(nil).Once()
mockLedgerWriter.On("InsertLedger", ledger).Return(nil).Once()
mockTxWriter.On("InsertTransactions", ledger).Return(nil).Once()
mockEventWriter.On("InsertEvents", ledger).Return(nil).Once()
assert.NoError(t, service.ingest(ctx, sequence))

mockDB.AssertExpectations(t)
Expand Down

0 comments on commit 3121178

Please sign in to comment.