Skip to content

Commit

Permalink
feat(indexer/postgres)!: add basic support for header, txs and events
Browse files Browse the repository at this point in the history
  • Loading branch information
facundomedica committed Nov 29, 2024
1 parent 9caec06 commit af5d2fd
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 16 deletions.
4 changes: 4 additions & 0 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,10 @@ func (app *BaseApp) Commit() (*abci.CommitResponse, error) {
app.logger.Error("Commit listening hook failed", "height", blockHeight, "err", err)
if app.streamingManager.StopNodeOnErr {
err = fmt.Errorf("Commit listening hook failed: %w", err)
if blockHeight == 1 {
// can't rollback to height 0, so just return the error
return nil, fmt.Errorf("failed to commit block 1, can't automatically rollback: %w", err)
}
rollbackErr := app.cms.RollbackToVersion(blockHeight - 1)
if rollbackErr != nil {
return nil, errors.Join(err, rollbackErr)
Expand Down
25 changes: 17 additions & 8 deletions baseapp/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package baseapp

import (
"context"
"encoding/json"
"fmt"
"sort"
"strconv"
Expand Down Expand Up @@ -144,9 +145,10 @@ func exposeStoreKeysSorted(keysStr []string, keys map[string]*storetypes.KVStore
return exposeStoreKeys
}

func eventToAppDataEvent(event abci.Event) (appdata.Event, error) {
func eventToAppDataEvent(event abci.Event, height int64) (appdata.Event, error) {
appdataEvent := appdata.Event{
Type: event.Type,
BlockNumber: uint64(height),
Type: event.Type,
Attributes: func() ([]appdata.EventAttribute, error) {
attrs := make([]appdata.EventAttribute, len(event.Attributes))
for j, attr := range event.Attributes {
Expand Down Expand Up @@ -208,20 +210,27 @@ func NewListenerWrapper(listener appdata.Listener) listenerWrapper {

func (p listenerWrapper) ListenFinalizeBlock(_ context.Context, req abci.FinalizeBlockRequest, res abci.FinalizeBlockResponse) error {
if p.listener.StartBlock != nil {
// this is to avoid putting all txs along with the block data
reqWithoutTxs := req
reqWithoutTxs.Txs = nil

if err := p.listener.StartBlock(appdata.StartBlockData{
Height: uint64(req.Height),
HeaderBytes: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
HeaderJSON: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
HeaderJSON: func() (json.RawMessage, error) {
return json.Marshal(reqWithoutTxs)
},
}); err != nil {
return err
}
}
if p.listener.OnTx != nil {
for i, tx := range req.Txs {
if err := p.listener.OnTx(appdata.TxData{
TxIndex: int32(i),
Bytes: func() ([]byte, error) { return tx, nil },
JSON: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
BlockNumber: uint64(req.Height),
TxIndex: int32(i),
Bytes: func() ([]byte, error) { return tx, nil },
JSON: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009
}); err != nil {
return err
}
Expand All @@ -231,14 +240,14 @@ func (p listenerWrapper) ListenFinalizeBlock(_ context.Context, req abci.Finaliz
events := make([]appdata.Event, len(res.Events))
var err error
for i, event := range res.Events {
events[i], err = eventToAppDataEvent(event)
events[i], err = eventToAppDataEvent(event, req.Height)
if err != nil {
return err
}
}
for _, txResult := range res.TxResults {
for _, event := range txResult.Events {
appdataEvent, err := eventToAppDataEvent(event)
appdataEvent, err := eventToAppDataEvent(event, req.Height)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ require (
// TODO remove after all modules have their own go.mods
replace (
cosmossdk.io/api => ./api
cosmossdk.io/schema => ./schema
cosmossdk.io/store => ./store
cosmossdk.io/x/bank => ./x/bank
cosmossdk.io/x/staking => ./x/staking
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ cosmossdk.io/log v1.5.0 h1:dVdzPJW9kMrnAYyMf1duqacoidB9uZIl+7c6z0mnq0g=
cosmossdk.io/log v1.5.0/go.mod h1:Tr46PUJjiUthlwQ+hxYtUtPn4D/oCZXAkYevBeh5+FI=
cosmossdk.io/math v1.4.0 h1:XbgExXFnXmF/CccPPEto40gOO7FpWu9yWNAZPN3nkNQ=
cosmossdk.io/math v1.4.0/go.mod h1:O5PkD4apz2jZs4zqFdTr16e1dcaQCc5z6lkEnrrppuk=
cosmossdk.io/schema v0.3.1-0.20241128094659-bd76b47e1d8b h1:svpFdulZRrYz+RTHu2u9CeKkMKrIHx5354vjiHerovo=
cosmossdk.io/schema v0.3.1-0.20241128094659-bd76b47e1d8b/go.mod h1:RDAhxIeNB4bYqAlF4NBJwRrgtnciMcyyg0DOKnhNZQQ=
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 h1:/vQbFIOMbk2FiG/kXiLl8BRyzTWDw7gX/Hz7Dd5eDMs=
Expand Down
15 changes: 9 additions & 6 deletions indexer/postgres/base_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,20 @@ CREATE TABLE IF NOT EXISTS tx
id BIGSERIAL PRIMARY KEY,
block_number BIGINT NOT NULL REFERENCES block (number),
index_in_block BIGINT NOT NULL,
data JSONB NOT NULL
data JSONB NULL,
bytes BYTEA NULL
);
CREATE TABLE IF NOT EXISTS event
(
id BIGSERIAL PRIMARY KEY,
block_number BIGINT NOT NULL REFERENCES block (number),
tx_id BIGINT NULL REFERENCES tx (id),
msg_index BIGINT NULL,
event_index BIGINT NULL,
type TEXT NOT NULL,
data JSONB NOT NULL
block_stage INTEGER NOT NULL,
tx_index BIGINT NOT NULL,
msg_index BIGINT NOT NULL,
event_index BIGINT NOT NULL,
type TEXT NULL,
data JSONB NULL,
attributes JSONB NULL
);
`
65 changes: 65 additions & 0 deletions indexer/postgres/listener.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package postgres

import (
"encoding/json"
"fmt"

"cosmossdk.io/schema/appdata"
Expand Down Expand Up @@ -81,5 +82,69 @@ func (i *indexerImpl) listener() appdata.Listener {
i.tx, err = i.db.BeginTx(i.ctx, nil)
return nil, err
},
OnTx: func(td appdata.TxData) error {
var bz []byte
if td.Bytes != nil {
var err error
bz, err = td.Bytes()
if err != nil {
return err
}
}

var jsonData json.RawMessage
if td.JSON != nil {
var err error
jsonData, err = td.JSON()
if err != nil {
return err
}
}

_, err := i.tx.Exec("INSERT INTO tx (block_number, index_in_block, data, bytes) VALUES ($1, $2, $3, $4)",
td.BlockNumber, td.TxIndex, jsonData, bz)

return err
},
OnEvent: func(data appdata.EventData) error {
for _, e := range data.Events {
var (
jsonData json.RawMessage
jsonAttributes json.RawMessage
)

if e.Data != nil {
var err error
jsonData, err = e.Data()
if err != nil {
return fmt.Errorf("failed to get event data: %w", err)
}
}

if e.Attributes != nil {
attrs, err := e.Attributes()
if err != nil {
return fmt.Errorf("failed to get event attributes: %w", err)
}

attrsMap := map[string]interface{}{}
for _, attr := range attrs {
attrsMap[attr.Key] = attr.Value
}

jsonAttributes, err = json.Marshal(attrsMap)
if err != nil {
return fmt.Errorf("failed to marshal event attributes: %w", err)
}
}

_, err := i.tx.Exec("INSERT INTO event (block_number, block_stage, tx_index, msg_index, event_index, type, data, attributes) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
e.BlockNumber, e.BlockStage, e.TxIndex, e.MsgIndex, e.EventIndex, e.Type, jsonData, jsonAttributes)
if err != nil {
return fmt.Errorf("failed to index event: %w", err)
}
}
return nil
},
}
}
6 changes: 6 additions & 0 deletions schema/appdata/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ type StartBlockData struct {

// TxData represents the raw transaction data that is passed to a listener.
type TxData struct {
// BlockNumber is the block number to which this event is associated.
BlockNumber uint64

// TxIndex is the index of the transaction in the block.
TxIndex int32

Expand All @@ -53,6 +56,9 @@ type Event struct {
// If the block stage is unknown, it should be set to UnknownBlockStage.
BlockStage BlockStage

// BlockNumber is the block number to which this event is associated.
BlockNumber uint64

// TxIndex is the 1-based index of the transaction in the block to which this event is associated.
// If TxIndex is zero, it means that we do not know the transaction index.
// Otherwise, the index should start with 1.
Expand Down
2 changes: 2 additions & 0 deletions simapp/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@ replace (
cosmossdk.io/api => ../api
cosmossdk.io/client/v2 => ../client/v2
cosmossdk.io/collections => ../collections
cosmossdk.io/schema => ../schema
cosmossdk.io/indexer/postgres => ../indexer/postgres
cosmossdk.io/store => ../store
cosmossdk.io/tools/confix => ../tools/confix
cosmossdk.io/x/accounts => ../x/accounts
Expand Down

0 comments on commit af5d2fd

Please sign in to comment.