Skip to content

Commit

Permalink
add the basic decoded tx
Browse files Browse the repository at this point in the history
  • Loading branch information
facundomedica committed Dec 3, 2024
1 parent af5d2fd commit 6bf2f3e
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 54 deletions.
18 changes: 14 additions & 4 deletions baseapp/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/cosmos/cosmos-sdk/client/flags"
servertypes "github.com/cosmos/cosmos-sdk/server/types"
sdk "github.com/cosmos/cosmos-sdk/types"
)

const (
Expand Down Expand Up @@ -49,7 +50,7 @@ func (app *BaseApp) EnableIndexer(indexerOpts interface{}, keys map[string]*stor
app.cms.AddListeners(exposedKeys)

app.streamingManager = storetypes.StreamingManager{
ABCIListeners: []storetypes.ABCIListener{listenerWrapper{listener.Listener}},
ABCIListeners: []storetypes.ABCIListener{listenerWrapper{listener.Listener, app.txDecoder}},
StopNodeOnErr: true,
}

Expand Down Expand Up @@ -199,7 +200,8 @@ func eventToAppDataEvent(event abci.Event, height int64) (appdata.Event, error)
}

type listenerWrapper struct {
listener appdata.Listener
listener appdata.Listener
txDecoder sdk.TxDecoder
}

// NewListenerWrapper creates a new listenerWrapper.
Expand All @@ -210,7 +212,7 @@ func NewListenerWrapper(listener appdata.Listener) listenerWrapper {

func (p listenerWrapper) ListenFinalizeBlock(_ context.Context, req abci.FinalizeBlockRequest, res abci.FinalizeBlockResponse) error {
if p.listener.StartBlock != nil {
// this is to avoid putting all txs along with the block data
// clean up redundant data
reqWithoutTxs := req
reqWithoutTxs.Txs = nil

Expand All @@ -230,7 +232,15 @@ func (p listenerWrapper) ListenFinalizeBlock(_ context.Context, req abci.Finaliz
BlockNumber: uint64(req.Height),
TxIndex: int32(i),
Bytes: func() ([]byte, error) { return tx, nil },
JSON: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
JSON: func() (json.RawMessage, error) {
sdkTx, err := p.txDecoder(tx)
if err != nil {
// if the transaction cannot be decoded, return the error as JSON
// as there are some txs that might not be decodeable by the txDecoder
return json.Marshal(err)
}
return json.Marshal(sdkTx)
}, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
}); err != nil {
return err
}
Expand Down
108 changes: 58 additions & 50 deletions indexer/postgres/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,69 +82,77 @@ func (i *indexerImpl) listener() appdata.Listener {
i.tx, err = i.db.BeginTx(i.ctx, nil)
return nil, err
},
OnTx: func(td appdata.TxData) error {
var bz []byte
if td.Bytes != nil {
var err error
bz, err = td.Bytes()
if err != nil {
return err
}
OnTx: txListener(i),
OnEvent: eventListener(i),
}
}

func txListener(i *indexerImpl) func(data appdata.TxData) error {
return func(td appdata.TxData) error {
var bz []byte
if td.Bytes != nil {
var err error
bz, err = td.Bytes()
if err != nil {
return err
}
}

var jsonData json.RawMessage
if td.JSON != nil {
var err error
jsonData, err = td.JSON()
if err != nil {
return err
}
}

_, err := i.tx.Exec("INSERT INTO tx (block_number, index_in_block, data, bytes) VALUES ($1, $2, $3, $4)",
td.BlockNumber, td.TxIndex, jsonData, bz)

return err
}
}

func eventListener(i *indexerImpl) func(data appdata.EventData) error {
return func(data appdata.EventData) error {
for _, e := range data.Events {
var (
jsonData json.RawMessage
jsonAttributes json.RawMessage
)

var jsonData json.RawMessage
if td.JSON != nil {
if e.Data != nil {
var err error
jsonData, err = td.JSON()
jsonData, err = e.Data()
if err != nil {
return err
return fmt.Errorf("failed to get event data: %w", err)
}
}

_, err := i.tx.Exec("INSERT INTO tx (block_number, index_in_block, data, bytes) VALUES ($1, $2, $3, $4)",
td.BlockNumber, td.TxIndex, jsonData, bz)

return err
},
OnEvent: func(data appdata.EventData) error {
for _, e := range data.Events {
var (
jsonData json.RawMessage
jsonAttributes json.RawMessage
)

if e.Data != nil {
var err error
jsonData, err = e.Data()
if err != nil {
return fmt.Errorf("failed to get event data: %w", err)
}
if e.Attributes != nil {
attrs, err := e.Attributes()
if err != nil {
return fmt.Errorf("failed to get event attributes: %w", err)
}

if e.Attributes != nil {
attrs, err := e.Attributes()
if err != nil {
return fmt.Errorf("failed to get event attributes: %w", err)
}

attrsMap := map[string]interface{}{}
for _, attr := range attrs {
attrsMap[attr.Key] = attr.Value
}

jsonAttributes, err = json.Marshal(attrsMap)
if err != nil {
return fmt.Errorf("failed to marshal event attributes: %w", err)
}
attrsMap := map[string]interface{}{}
for _, attr := range attrs {
attrsMap[attr.Key] = attr.Value
}

_, err := i.tx.Exec("INSERT INTO event (block_number, block_stage, tx_index, msg_index, event_index, type, data, attributes) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
e.BlockNumber, e.BlockStage, e.TxIndex, e.MsgIndex, e.EventIndex, e.Type, jsonData, jsonAttributes)
jsonAttributes, err = json.Marshal(attrsMap)
if err != nil {
return fmt.Errorf("failed to index event: %w", err)
return fmt.Errorf("failed to marshal event attributes: %w", err)
}
}
return nil
},

_, err := i.tx.Exec("INSERT INTO event (block_number, block_stage, tx_index, msg_index, event_index, type, data, attributes) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
e.BlockNumber, e.BlockStage, e.TxIndex, e.MsgIndex, e.EventIndex, e.Type, jsonData, jsonAttributes)
if err != nil {
return fmt.Errorf("failed to index event: %w", err)
}
}
return nil
}
}

0 comments on commit 6bf2f3e

Please sign in to comment.