-
Notifications
You must be signed in to change notification settings - Fork 26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ingest events into DB #192
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
package db | ||
|
||
import ( | ||
sq "github.com/Masterminds/squirrel" | ||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/stellar/go/ingest" | ||
"github.com/stellar/go/support/db" | ||
"github.com/stellar/go/support/errors" | ||
"github.com/stellar/go/support/log" | ||
"github.com/stellar/go/xdr" | ||
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/events" | ||
"io" | ||
) | ||
|
||
const eventTableName = "events" | ||
|
||
type EventWriter interface { | ||
InsertEvents(lcm xdr.LedgerCloseMeta) error | ||
} | ||
|
||
type EventReader interface { | ||
GetEvents(lcm xdr.LedgerCloseMeta) error | ||
} | ||
|
||
type eventHandler struct { | ||
log *log.Entry | ||
db db.SessionInterface | ||
stmtCache *sq.StmtCache | ||
passphrase string | ||
ingestMetric, countMetric prometheus.Observer | ||
} | ||
|
||
func (eventHandler *eventHandler) InsertEvents(lcm xdr.LedgerCloseMeta) error { | ||
txCount := lcm.CountTransactions() | ||
|
||
if eventHandler.stmtCache == nil { | ||
return errors.New("EventWriter incorrectly initialized without stmtCache") | ||
} else if txCount == 0 { | ||
return nil | ||
} | ||
|
||
var txReader *ingest.LedgerTransactionReader | ||
txReader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(eventHandler.passphrase, lcm) | ||
if err != nil { | ||
return errors.Wrapf(err, | ||
"failed to open transaction reader for ledger %d", | ||
lcm.LedgerSequence()) | ||
} | ||
defer func() { | ||
closeErr := txReader.Close() | ||
if err == nil { | ||
err = closeErr | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't overwrite the error, just use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, note that err is shadowed down the line, so this may not be the |
||
} | ||
}() | ||
|
||
for { | ||
var tx ingest.LedgerTransaction | ||
tx, err = txReader.Read() | ||
if err == io.EOF { | ||
err = nil | ||
break | ||
} | ||
if err != nil { | ||
return err | ||
} | ||
|
||
if !tx.Result.Successful() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah this is the existing behavior. Can update it if it is not true anymore. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
continue | ||
} | ||
|
||
txEvents, err := tx.GetDiagnosticEvents() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This shadows the |
||
if err != nil { | ||
return err | ||
} | ||
|
||
query := sq.Insert(eventTableName). | ||
Columns("id", "ledger_sequence", "application_order", "contract_id", "event_type") | ||
|
||
for index, e := range txEvents { | ||
id := events.Cursor{Ledger: lcm.LedgerSequence(), Tx: tx.Index, Op: 0, Event: uint32(index)}.String() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @psheth9 I have just realized that the table already contains a ledger row, so the ledger info here is redundant. Also:
Thus, unless there is a good reason for the opposite, I would suggest storing the tx index and event index instead of the id field. |
||
query = query.Values(id, lcm.LedgerSequence(), tx.Index, e.Event.ContractId[:], int(e.Event.Type)) | ||
} | ||
|
||
_, err = query.RunWith(eventHandler.stmtCache).Exec() | ||
if err != nil { | ||
return err | ||
} | ||
} | ||
|
||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,167 @@ | ||
package db | ||
|
||
import ( | ||
"context" | ||
"github.com/sirupsen/logrus" | ||
"github.com/stellar/go/keypair" | ||
"github.com/stellar/go/network" | ||
"github.com/stellar/go/support/log" | ||
"github.com/stellar/go/xdr" | ||
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/daemon/interfaces" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
"testing" | ||
"time" | ||
) | ||
|
||
func transactionMetaWithEvents(events ...xdr.ContractEvent) xdr.TransactionMeta { | ||
return xdr.TransactionMeta{ | ||
V: 3, | ||
Operations: &[]xdr.OperationMeta{}, | ||
V3: &xdr.TransactionMetaV3{ | ||
SorobanMeta: &xdr.SorobanTransactionMeta{ | ||
Events: events, | ||
}, | ||
}, | ||
} | ||
} | ||
|
||
func contractEvent(contractID xdr.Hash, topic []xdr.ScVal, body xdr.ScVal) xdr.ContractEvent { | ||
return xdr.ContractEvent{ | ||
ContractId: &contractID, | ||
Type: xdr.ContractEventTypeContract, | ||
Body: xdr.ContractEventBody{ | ||
V: 0, | ||
V0: &xdr.ContractEventV0{ | ||
Topics: topic, | ||
Data: body, | ||
}, | ||
}, | ||
} | ||
} | ||
|
||
func ledgerCloseMetaWithEvents(sequence uint32, closeTimestamp int64, txMeta ...xdr.TransactionMeta) xdr.LedgerCloseMeta { | ||
var txProcessing []xdr.TransactionResultMeta | ||
var phases []xdr.TransactionPhase | ||
|
||
for _, item := range txMeta { | ||
var operations []xdr.Operation | ||
for range item.MustV3().SorobanMeta.Events { | ||
operations = append(operations, | ||
xdr.Operation{ | ||
Body: xdr.OperationBody{ | ||
Type: xdr.OperationTypeInvokeHostFunction, | ||
InvokeHostFunctionOp: &xdr.InvokeHostFunctionOp{ | ||
HostFunction: xdr.HostFunction{ | ||
Type: xdr.HostFunctionTypeHostFunctionTypeInvokeContract, | ||
InvokeContract: &xdr.InvokeContractArgs{ | ||
ContractAddress: xdr.ScAddress{ | ||
Type: xdr.ScAddressTypeScAddressTypeContract, | ||
ContractId: &xdr.Hash{0x1, 0x2}, | ||
}, | ||
FunctionName: "foo", | ||
Args: nil, | ||
}, | ||
}, | ||
Auth: []xdr.SorobanAuthorizationEntry{}, | ||
}, | ||
}, | ||
}) | ||
} | ||
envelope := xdr.TransactionEnvelope{ | ||
Type: xdr.EnvelopeTypeEnvelopeTypeTx, | ||
V1: &xdr.TransactionV1Envelope{ | ||
Tx: xdr.Transaction{ | ||
SourceAccount: xdr.MustMuxedAddress(keypair.MustRandom().Address()), | ||
Operations: operations, | ||
}, | ||
}, | ||
} | ||
txHash, err := network.HashTransactionInEnvelope(envelope, network.FutureNetworkPassphrase) | ||
if err != nil { | ||
panic(err) | ||
} | ||
|
||
txProcessing = append(txProcessing, xdr.TransactionResultMeta{ | ||
TxApplyProcessing: item, | ||
Result: xdr.TransactionResultPair{ | ||
TransactionHash: txHash, | ||
}, | ||
}) | ||
components := []xdr.TxSetComponent{ | ||
{ | ||
Type: xdr.TxSetComponentTypeTxsetCompTxsMaybeDiscountedFee, | ||
TxsMaybeDiscountedFee: &xdr.TxSetComponentTxsMaybeDiscountedFee{ | ||
Txs: []xdr.TransactionEnvelope{ | ||
envelope, | ||
}, | ||
}, | ||
}, | ||
} | ||
phases = append(phases, xdr.TransactionPhase{ | ||
V: 0, | ||
V0Components: &components, | ||
}) | ||
} | ||
|
||
return xdr.LedgerCloseMeta{ | ||
V: 1, | ||
V1: &xdr.LedgerCloseMetaV1{ | ||
LedgerHeader: xdr.LedgerHeaderHistoryEntry{ | ||
Hash: xdr.Hash{}, | ||
Header: xdr.LedgerHeader{ | ||
ScpValue: xdr.StellarValue{ | ||
CloseTime: xdr.TimePoint(closeTimestamp), | ||
}, | ||
LedgerSeq: xdr.Uint32(sequence), | ||
}, | ||
}, | ||
TxSet: xdr.GeneralizedTransactionSet{ | ||
V: 1, | ||
V1TxSet: &xdr.TransactionSetV1{ | ||
PreviousLedgerHash: xdr.Hash{}, | ||
Phases: phases, | ||
}, | ||
}, | ||
TxProcessing: txProcessing, | ||
}, | ||
} | ||
} | ||
|
||
func TestInsertEvents(t *testing.T) { | ||
db := NewTestDB(t) | ||
ctx := context.TODO() | ||
log := log.DefaultLogger | ||
log.SetLevel(logrus.TraceLevel) | ||
now := time.Now().UTC() | ||
|
||
writer := NewReadWriter(log, db, interfaces.MakeNoOpDeamon(), 10, 10, passphrase) | ||
write, err := writer.NewTx(ctx) | ||
require.NoError(t, err) | ||
contractID := xdr.Hash([32]byte{}) | ||
counter := xdr.ScSymbol("COUNTER") | ||
|
||
var txMeta []xdr.TransactionMeta | ||
for i := 0; i < 10; i++ { | ||
txMeta = append(txMeta, transactionMetaWithEvents( | ||
contractEvent( | ||
contractID, | ||
xdr.ScVec{xdr.ScVal{ | ||
Type: xdr.ScValTypeScvSymbol, | ||
Sym: &counter, | ||
}}, | ||
xdr.ScVal{ | ||
Type: xdr.ScValTypeScvSymbol, | ||
Sym: &counter, | ||
}, | ||
), | ||
)) | ||
} | ||
ledgerCloseMeta := ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...) | ||
|
||
eventW := write.EventWriter() | ||
err = eventW.InsertEvents(ledgerCloseMeta) | ||
assert.NoError(t, err) | ||
|
||
//TODO: Call getEvents and validate events data. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you please resolve this before merging? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
-- +migrate Up | ||
|
||
-- indexing table to find events in ledgers by contract_id | ||
CREATE TABLE events ( | ||
id TEXT PRIMARY KEY, | ||
ledger_sequence INTEGER NOT NULL, | ||
application_order INTEGER NOT NULL, | ||
contract_id BLOB NOT NULL, | ||
event_type INTEGER NOT NULL | ||
psheth9 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
); | ||
|
||
CREATE INDEX idx_ledger_sequence ON events(ledger_sequence); | ||
CREATE INDEX idx_contract_id ON events(contract_id); | ||
|
||
-- +migrate Down | ||
drop table events cascade; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the interface should not be defined in the same file as its implementation: https://go.dev/wiki/CodeReviewComments#interfaces
Would like to get a recommendation from @tamirms about this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, but that's how it's done everywhere else in the db package