Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ingest events into DB #192

Merged
merged 5 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions cmd/soroban-rpc/internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type ReadWriter interface {

type WriteTx interface {
TransactionWriter() TransactionWriter
EventWriter() EventWriter
LedgerEntryWriter() LedgerEntryWriter
LedgerWriter() LedgerWriter

Expand Down Expand Up @@ -226,6 +227,12 @@ func (rw *readWriter) NewTx(ctx context.Context) (WriteTx, error) {
stmtCache: stmtCache,
passphrase: rw.passphrase,
},
eventWriter: eventHandler{
log: rw.log,
db: txSession,
stmtCache: stmtCache,
passphrase: rw.passphrase,
},
}
writer.txWriter.RegisterMetrics(
rw.metrics.TxIngestDuration,
Expand All @@ -242,6 +249,7 @@ type writeTx struct {
ledgerEntryWriter ledgerEntryWriter
ledgerWriter ledgerWriter
txWriter transactionHandler
eventWriter eventHandler
ledgerRetentionWindow uint32
}

Expand All @@ -257,6 +265,10 @@ func (w writeTx) TransactionWriter() TransactionWriter {
return &w.txWriter
}

func (w writeTx) EventWriter() EventWriter {
return &w.eventWriter
}

func (w writeTx) Commit(ledgerSeq uint32) error {
if err := w.ledgerEntryWriter.flush(); err != nil {
return err
Expand Down
91 changes: 91 additions & 0 deletions cmd/soroban-rpc/internal/db/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package db

import (
sq "github.com/Masterminds/squirrel"
"github.com/prometheus/client_golang/prometheus"
"github.com/stellar/go/ingest"
"github.com/stellar/go/support/db"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/events"
"io"
)

const eventTableName = "events"

type EventWriter interface {
InsertEvents(lcm xdr.LedgerCloseMeta) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the interface should not be defined in the same file as its implementation: https://go.dev/wiki/CodeReviewComments#interfaces

Would like to get a recommendation from @tamirms about this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, but that's how it's done everywhere else in the db package

}

type EventReader interface {
GetEvents(lcm xdr.LedgerCloseMeta) error
}

type eventHandler struct {
log *log.Entry
db db.SessionInterface
stmtCache *sq.StmtCache
passphrase string
ingestMetric, countMetric prometheus.Observer
}

func (eventHandler *eventHandler) InsertEvents(lcm xdr.LedgerCloseMeta) error {
txCount := lcm.CountTransactions()

if eventHandler.stmtCache == nil {
return errors.New("EventWriter incorrectly initialized without stmtCache")
} else if txCount == 0 {
return nil
}

var txReader *ingest.LedgerTransactionReader
txReader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(eventHandler.passphrase, lcm)
if err != nil {
return errors.Wrapf(err,
"failed to open transaction reader for ledger %d",
lcm.LedgerSequence())
}
defer func() {
closeErr := txReader.Close()
if err == nil {
err = closeErr
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't overwrite the error, just use errors.Join()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, note that err is shadowed down the line, so this may not be the err you want

}
}()

for {
var tx ingest.LedgerTransaction
tx, err = txReader.Read()
if err == io.EOF {
err = nil
break
}
if err != nil {
return err
}

if !tx.Result.Successful() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we skipping if the transaction is not successful? There can be diagnostic events for both successful and failed txns right?

@2opremio @tamirms

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is the existing behavior. Can update it if it is not true anymore.

Copy link
Contributor

@aditya1702 aditya1702 Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I saw it is in current getEvents too. I wanted to confirm this is true since I saw diagnostic events in both types of txns. @tamirms @2opremio

continue
}

txEvents, err := tx.GetDiagnosticEvents()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shadows the errabove, so the error handling in defer above won't do what you expect. You may want to use a named return parameter to make it cleaner.

if err != nil {
return err
}

query := sq.Insert(eventTableName).
Columns("id", "ledger_sequence", "application_order", "contract_id", "event_type")

for index, e := range txEvents {
id := events.Cursor{Ledger: lcm.LedgerSequence(), Tx: tx.Index, Op: 0, Event: uint32(index)}.String()
Copy link
Contributor

@2opremio 2opremio Jun 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@psheth9 I have just realized that the table already contains a ledger row, so the ledger info here is redundant.

Also:

  1. if the op is always 0 (and we don't plan that to change) that info shouldn't go into the DB.
  2. the tx index isn't the same as the application order field?

Thus, unless there is a good reason for the opposite, I would suggest storing the tx index and event index instead of the id field.

query = query.Values(id, lcm.LedgerSequence(), tx.Index, e.Event.ContractId[:], int(e.Event.Type))
}

_, err = query.RunWith(eventHandler.stmtCache).Exec()
if err != nil {
return err
}
}

return nil
}
167 changes: 167 additions & 0 deletions cmd/soroban-rpc/internal/db/event_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package db

import (
"context"
"github.com/sirupsen/logrus"
"github.com/stellar/go/keypair"
"github.com/stellar/go/network"
"github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/daemon/interfaces"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"testing"
"time"
)

func transactionMetaWithEvents(events ...xdr.ContractEvent) xdr.TransactionMeta {
return xdr.TransactionMeta{
V: 3,
Operations: &[]xdr.OperationMeta{},
V3: &xdr.TransactionMetaV3{
SorobanMeta: &xdr.SorobanTransactionMeta{
Events: events,
},
},
}
}

func contractEvent(contractID xdr.Hash, topic []xdr.ScVal, body xdr.ScVal) xdr.ContractEvent {
return xdr.ContractEvent{
ContractId: &contractID,
Type: xdr.ContractEventTypeContract,
Body: xdr.ContractEventBody{
V: 0,
V0: &xdr.ContractEventV0{
Topics: topic,
Data: body,
},
},
}
}

func ledgerCloseMetaWithEvents(sequence uint32, closeTimestamp int64, txMeta ...xdr.TransactionMeta) xdr.LedgerCloseMeta {
var txProcessing []xdr.TransactionResultMeta
var phases []xdr.TransactionPhase

for _, item := range txMeta {
var operations []xdr.Operation
for range item.MustV3().SorobanMeta.Events {
operations = append(operations,
xdr.Operation{
Body: xdr.OperationBody{
Type: xdr.OperationTypeInvokeHostFunction,
InvokeHostFunctionOp: &xdr.InvokeHostFunctionOp{
HostFunction: xdr.HostFunction{
Type: xdr.HostFunctionTypeHostFunctionTypeInvokeContract,
InvokeContract: &xdr.InvokeContractArgs{
ContractAddress: xdr.ScAddress{
Type: xdr.ScAddressTypeScAddressTypeContract,
ContractId: &xdr.Hash{0x1, 0x2},
},
FunctionName: "foo",
Args: nil,
},
},
Auth: []xdr.SorobanAuthorizationEntry{},
},
},
})
}
envelope := xdr.TransactionEnvelope{
Type: xdr.EnvelopeTypeEnvelopeTypeTx,
V1: &xdr.TransactionV1Envelope{
Tx: xdr.Transaction{
SourceAccount: xdr.MustMuxedAddress(keypair.MustRandom().Address()),
Operations: operations,
},
},
}
txHash, err := network.HashTransactionInEnvelope(envelope, network.FutureNetworkPassphrase)
if err != nil {
panic(err)
}

txProcessing = append(txProcessing, xdr.TransactionResultMeta{
TxApplyProcessing: item,
Result: xdr.TransactionResultPair{
TransactionHash: txHash,
},
})
components := []xdr.TxSetComponent{
{
Type: xdr.TxSetComponentTypeTxsetCompTxsMaybeDiscountedFee,
TxsMaybeDiscountedFee: &xdr.TxSetComponentTxsMaybeDiscountedFee{
Txs: []xdr.TransactionEnvelope{
envelope,
},
},
},
}
phases = append(phases, xdr.TransactionPhase{
V: 0,
V0Components: &components,
})
}

return xdr.LedgerCloseMeta{
V: 1,
V1: &xdr.LedgerCloseMetaV1{
LedgerHeader: xdr.LedgerHeaderHistoryEntry{
Hash: xdr.Hash{},
Header: xdr.LedgerHeader{
ScpValue: xdr.StellarValue{
CloseTime: xdr.TimePoint(closeTimestamp),
},
LedgerSeq: xdr.Uint32(sequence),
},
},
TxSet: xdr.GeneralizedTransactionSet{
V: 1,
V1TxSet: &xdr.TransactionSetV1{
PreviousLedgerHash: xdr.Hash{},
Phases: phases,
},
},
TxProcessing: txProcessing,
},
}
}

func TestInsertEvents(t *testing.T) {
db := NewTestDB(t)
ctx := context.TODO()
log := log.DefaultLogger
log.SetLevel(logrus.TraceLevel)
now := time.Now().UTC()

writer := NewReadWriter(log, db, interfaces.MakeNoOpDeamon(), 10, 10, passphrase)
write, err := writer.NewTx(ctx)
require.NoError(t, err)
contractID := xdr.Hash([32]byte{})
counter := xdr.ScSymbol("COUNTER")

var txMeta []xdr.TransactionMeta
for i := 0; i < 10; i++ {
txMeta = append(txMeta, transactionMetaWithEvents(
contractEvent(
contractID,
xdr.ScVec{xdr.ScVal{
Type: xdr.ScValTypeScvSymbol,
Sym: &counter,
}},
xdr.ScVal{
Type: xdr.ScValTypeScvSymbol,
Sym: &counter,
},
),
))
}
ledgerCloseMeta := ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...)

eventW := write.EventWriter()
err = eventW.InsertEvents(ledgerCloseMeta)
assert.NoError(t, err)

//TODO: Call getEvents and validate events data.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please resolve this before merging?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, actually ignore this TODO as getEvents is backed by memory in this diff, this will be resolved in this follow up PR #215 (WIP) @2opremio

}
16 changes: 16 additions & 0 deletions cmd/soroban-rpc/internal/db/migrations/03_events.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-- +migrate Up

-- indexing table to find events in ledgers by contract_id
CREATE TABLE events (
id TEXT PRIMARY KEY,
ledger_sequence INTEGER NOT NULL,
application_order INTEGER NOT NULL,
contract_id BLOB NOT NULL,
event_type INTEGER NOT NULL
psheth9 marked this conversation as resolved.
Show resolved Hide resolved
);

CREATE INDEX idx_ledger_sequence ON events(ledger_sequence);
CREATE INDEX idx_contract_id ON events(contract_id);

-- +migrate Down
drop table events cascade;
14 changes: 14 additions & 0 deletions cmd/soroban-rpc/internal/ingest/mock_db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@
mock.Mock
}

func (m MockTx) EventWriter() db.EventWriter {
args := m.Called()

Check failure on line 40 in cmd/soroban-rpc/internal/ingest/mock_db_test.go

View workflow job for this annotation

GitHub Actions / golangci

m.Called undefined (type MockTx has no field or method Called) (typecheck)
return args.Get(0).(db.EventWriter)
}

func (m MockTx) LedgerEntryWriter() db.LedgerEntryWriter {
args := m.Called()
return args.Get(0).(db.LedgerEntryWriter)
Expand Down Expand Up @@ -96,3 +101,12 @@
func (m MockTransactionWriter) RegisterMetrics(ingest, count prometheus.Observer) {
m.Called(ingest, count)
}

type MockEventWriter struct {
mock.Mock
}

func (m MockEventWriter) InsertEvents(ledger xdr.LedgerCloseMeta) error {
args := m.Called(ledger)

Check failure on line 110 in cmd/soroban-rpc/internal/ingest/mock_db_test.go

View workflow job for this annotation

GitHub Actions / golangci

m.Called undefined (type MockEventWriter has no field or method Called) (typecheck)
return args.Error(0)
}
4 changes: 4 additions & 0 deletions cmd/soroban-rpc/internal/ingest/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,10 @@ func (s *Service) ingestLedgerCloseMeta(tx db.WriteTx, ledgerCloseMeta xdr.Ledge
With(prometheus.Labels{"type": "transactions"}).
Observe(time.Since(startTime).Seconds())

if err := tx.EventWriter().InsertEvents(ledgerCloseMeta); err != nil {
return err
}

if err := s.eventStore.IngestEvents(ledgerCloseMeta); err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions cmd/soroban-rpc/internal/ingest/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,15 @@
mockLedgerEntryWriter := &MockLedgerEntryWriter{}
mockLedgerWriter := &MockLedgerWriter{}
mockTxWriter := &MockTransactionWriter{}
mockEventWriter := &MockEventWriter{}
ctx := context.Background()
mockDB.On("NewTx", ctx).Return(mockTx, nil).Once()
mockTx.On("Commit", sequence).Return(nil).Once()
mockTx.On("Rollback").Return(nil).Once()
mockTx.On("LedgerEntryWriter").Return(mockLedgerEntryWriter).Twice()
mockTx.On("LedgerWriter").Return(mockLedgerWriter).Once()
mockTx.On("TransactionWriter").Return(mockTxWriter).Once()
mockTx.On("EventWriter").Return(mockEventWriter).Once()

Check failure on line 92 in cmd/soroban-rpc/internal/ingest/service_test.go

View workflow job for this annotation

GitHub Actions / golangci

mockTx.On undefined (type *MockTx has no field or method On) (typecheck)

src := xdr.MustAddress("GBXGQJWVLWOYHFLVTKWV5FGHA3LNYY2JQKM7OAJAUEQFU6LPCSEFVXON")
firstTx := xdr.TransactionEnvelope{
Expand Down Expand Up @@ -254,6 +256,7 @@
Return(nil).Once()
mockLedgerWriter.On("InsertLedger", ledger).Return(nil).Once()
mockTxWriter.On("InsertTransactions", ledger).Return(nil).Once()
mockEventWriter.On("InsertEvents", ledger).Return(nil).Once()

Check failure on line 259 in cmd/soroban-rpc/internal/ingest/service_test.go

View workflow job for this annotation

GitHub Actions / golangci

mockEventWriter.On undefined (type *MockEventWriter has no field or method On) (typecheck)
assert.NoError(t, service.ingest(ctx, sequence))

mockDB.AssertExpectations(t)
Expand Down
Loading