From 80726f7ebd586787734e7df69c70d31aabcd8362 Mon Sep 17 00:00:00 2001 From: cool-developer <51834436+cool-develope@users.noreply.github.com> Date: Thu, 3 Oct 2024 09:10:22 -0400 Subject: [PATCH] feat(baseapp): integrate the `appdata.Listener` in baseapp (#21965) --- baseapp/abci.go | 9 +- baseapp/abci_test.go | 2 +- baseapp/baseapp.go | 17 ++++ baseapp/streaming.go | 103 +++++++++++++++++++-- baseapp/streaming_test.go | 185 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 308 insertions(+), 8 deletions(-) diff --git a/baseapp/abci.go b/baseapp/abci.go index 27118f0f3b10..dd091294d563 100644 --- a/baseapp/abci.go +++ b/baseapp/abci.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "sort" + "strconv" "strings" "time" @@ -831,7 +832,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz // NOTE: Not all raw transactions may adhere to the sdk.Tx interface, e.g. // vote extensions, so skip those. txResults := make([]*abci.ExecTxResult, 0, len(req.Txs)) - for _, rawTx := range req.Txs { + for txIndex, rawTx := range req.Txs { response := app.deliverTx(rawTx) @@ -843,6 +844,12 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz // continue } + // append the tx index to the response.Events + for i, event := range response.Events { + response.Events[i].Attributes = append(event.Attributes, + abci.EventAttribute{Key: "tx_index", Value: strconv.Itoa(txIndex)}) + } + txResults = append(txResults, response) } diff --git a/baseapp/abci_test.go b/baseapp/abci_test.go index fb61080916df..111a86938356 100644 --- a/baseapp/abci_test.go +++ b/baseapp/abci_test.go @@ -676,7 +676,7 @@ func TestABCI_FinalizeBlock_DeliverTx(t *testing.T) { events := res.TxResults[i].GetEvents() require.Len(t, events, 3, "should contain ante handler, message type and counter events respectively") - require.Equal(t, sdk.MarkEventsToIndex(counterEvent("ante_handler", counter).ToABCIEvents(), map[string]struct{}{})[0], events[0], "ante handler event") + require.Equal(t, sdk.MarkEventsToIndex(counterEvent("ante_handler", counter).ToABCIEvents(), map[string]struct{}{})[0].Attributes[0], events[0].Attributes[0], "ante handler event") require.Equal(t, sdk.MarkEventsToIndex(counterEvent(sdk.EventTypeMessage, counter).ToABCIEvents(), map[string]struct{}{})[0].Attributes[0], events[2].Attributes[0], "msg handler update counter event") } diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index b5d100d263a9..2ef933c205c3 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -718,6 +718,15 @@ func (app *BaseApp) preBlock(req *abci.FinalizeBlockRequest) ([]abci.Event, erro ctx = ctx.WithBlockGasMeter(gasMeter) app.finalizeBlockState.SetContext(ctx) events = ctx.EventManager().ABCIEvents() + + // append PreBlock attributes to all events + for i, event := range events { + events[i].Attributes = append( + event.Attributes, + abci.EventAttribute{Key: "mode", Value: "PreBlock"}, + abci.EventAttribute{Key: "event_index", Value: strconv.Itoa(i)}, + ) + } } return events, nil } @@ -739,6 +748,7 @@ func (app *BaseApp) beginBlock(_ *abci.FinalizeBlockRequest) (sdk.BeginBlock, er resp.Events[i].Attributes = append( event.Attributes, abci.EventAttribute{Key: "mode", Value: "BeginBlock"}, + abci.EventAttribute{Key: "event_index", Value: strconv.Itoa(i)}, ) } @@ -801,6 +811,7 @@ func (app *BaseApp) endBlock(_ context.Context) (sdk.EndBlock, error) { eb.Events[i].Attributes = append( event.Attributes, abci.EventAttribute{Key: "mode", Value: "EndBlock"}, + abci.EventAttribute{Key: "event_index", Value: strconv.Itoa(i)}, ) } @@ -1151,6 +1162,12 @@ func createEvents(cdc codec.Codec, events sdk.Events, msg sdk.Msg, reflectMsg pr } } + // append the event_index attribute to all events + msgEvent = msgEvent.AppendAttributes(sdk.NewAttribute("event_index", "0")) + for i, event := range events { + events[i] = event.AppendAttributes(sdk.NewAttribute("event_index", strconv.Itoa(i+1))) + } + return sdk.Events{msgEvent}.AppendEvents(events), nil } diff --git a/baseapp/streaming.go b/baseapp/streaming.go index b6f40cb87fff..3e30f8888d58 100644 --- a/baseapp/streaming.go +++ b/baseapp/streaming.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sort" + "strconv" "strings" abci "github.com/cometbft/cometbft/api/cometbft/abci/v1" @@ -143,21 +144,111 @@ func exposeStoreKeysSorted(keysStr []string, keys map[string]*storetypes.KVStore return exposeStoreKeys } +func eventToAppDataEvent(event abci.Event) (appdata.Event, error) { + appdataEvent := appdata.Event{ + Type: event.Type, + Attributes: func() ([]appdata.EventAttribute, error) { + attrs := make([]appdata.EventAttribute, len(event.Attributes)) + for j, attr := range event.Attributes { + attrs[j] = appdata.EventAttribute{ + Key: attr.Key, + Value: attr.Value, + } + } + return attrs, nil + }, + } + + for _, attr := range event.Attributes { + if attr.Key == "mode" { + switch attr.Value { + case "PreBlock": + appdataEvent.BlockStage = appdata.PreBlockStage + case "BeginBlock": + appdataEvent.BlockStage = appdata.BeginBlockStage + case "EndBlock": + appdataEvent.BlockStage = appdata.EndBlockStage + default: + appdataEvent.BlockStage = appdata.UnknownBlockStage + } + } else if attr.Key == "tx_index" { + txIndex, err := strconv.Atoi(attr.Value) + if err != nil { + return appdata.Event{}, err + } + appdataEvent.TxIndex = int32(txIndex + 1) + appdataEvent.BlockStage = appdata.TxProcessingStage + } else if attr.Key == "msg_index" { + msgIndex, err := strconv.Atoi(attr.Value) + if err != nil { + return appdata.Event{}, err + } + appdataEvent.MsgIndex = int32(msgIndex + 1) + } else if attr.Key == "event_index" { + eventIndex, err := strconv.Atoi(attr.Value) + if err != nil { + return appdata.Event{}, err + } + appdataEvent.EventIndex = int32(eventIndex + 1) + } + } + + return appdataEvent, nil +} + type listenerWrapper struct { listener appdata.Listener } +// NewListenerWrapper creates a new listenerWrapper. +// It is only used for testing purposes. +func NewListenerWrapper(listener appdata.Listener) listenerWrapper { + return listenerWrapper{listener: listener} +} + func (p listenerWrapper) ListenFinalizeBlock(_ context.Context, req abci.FinalizeBlockRequest, res abci.FinalizeBlockResponse) error { if p.listener.StartBlock != nil { - err := p.listener.StartBlock(appdata.StartBlockData{ - Height: uint64(req.Height), - }) - if err != nil { + if err := p.listener.StartBlock(appdata.StartBlockData{ + Height: uint64(req.Height), + HeaderBytes: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009 + HeaderJSON: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009 + }); err != nil { + return err + } + } + if p.listener.OnTx != nil { + for i, tx := range req.Txs { + if err := p.listener.OnTx(appdata.TxData{ + TxIndex: int32(i), + Bytes: func() ([]byte, error) { return tx, nil }, + JSON: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009 + }); err != nil { + return err + } + } + } + if p.listener.OnEvent != nil { + events := make([]appdata.Event, len(res.Events)) + var err error + for i, event := range res.Events { + events[i], err = eventToAppDataEvent(event) + if err != nil { + return err + } + } + for _, txResult := range res.TxResults { + for _, event := range txResult.Events { + appdataEvent, err := eventToAppDataEvent(event) + if err != nil { + return err + } + events = append(events, appdataEvent) + } + } + if err := p.listener.OnEvent(appdata.EventData{Events: events}); err != nil { return err } } - - //// TODO txs, events return nil } diff --git a/baseapp/streaming_test.go b/baseapp/streaming_test.go index b0779c6b91ca..cb3c065be31f 100644 --- a/baseapp/streaming_test.go +++ b/baseapp/streaming_test.go @@ -9,10 +9,12 @@ import ( tmproto "github.com/cometbft/cometbft/api/cometbft/types/v1" "github.com/stretchr/testify/require" + "cosmossdk.io/schema/appdata" storetypes "cosmossdk.io/store/types" "github.com/cosmos/cosmos-sdk/baseapp" baseapptestutil "github.com/cosmos/cosmos-sdk/baseapp/testutil" + sdk "github.com/cosmos/cosmos-sdk/types" ) var _ storetypes.ABCIListener = (*MockABCIListener)(nil) @@ -146,3 +148,186 @@ func Test_Ctx_with_StreamingManager(t *testing.T) { require.NoError(t, err) } } + +type mockAppDataListener struct { + appdata.Listener + + startBlockData []appdata.StartBlockData + txData []appdata.TxData + eventData []appdata.EventData + kvPairData []appdata.KVPairData + commitData []appdata.CommitData +} + +func newMockAppDataListener() *mockAppDataListener { + listener := &mockAppDataListener{} + + // Initialize the Listener with custom behavior to store data + listener.Listener = appdata.Listener{ + StartBlock: func(data appdata.StartBlockData) error { + listener.startBlockData = append(listener.startBlockData, data) // Store StartBlockData + return nil + }, + OnTx: func(data appdata.TxData) error { + listener.txData = append(listener.txData, data) // Store TxData + return nil + }, + OnEvent: func(data appdata.EventData) error { + listener.eventData = append(listener.eventData, data) // Store EventData + return nil + }, + OnKVPair: func(data appdata.KVPairData) error { + listener.kvPairData = append(listener.kvPairData, data) // Store KVPairData + return nil + }, + Commit: func(data appdata.CommitData) (func() error, error) { + listener.commitData = append(listener.commitData, data) // Store CommitData + return nil, nil + }, + } + + return listener +} + +func TestAppDataListener(t *testing.T) { + anteKey := []byte("ante-key") + anteOpt := func(bapp *baseapp.BaseApp) { bapp.SetAnteHandler(anteHandlerTxTest(t, capKey1, anteKey)) } + distOpt := func(bapp *baseapp.BaseApp) { bapp.MountStores(distKey1) } + mockListener := newMockAppDataListener() + streamingManager := storetypes.StreamingManager{ABCIListeners: []storetypes.ABCIListener{baseapp.NewListenerWrapper(mockListener.Listener)}} + streamingManagerOpt := func(bapp *baseapp.BaseApp) { bapp.SetStreamingManager(streamingManager) } + addListenerOpt := func(bapp *baseapp.BaseApp) { bapp.CommitMultiStore().AddListeners([]storetypes.StoreKey{distKey1}) } + + // for event tests + baseappOpts := func(app *baseapp.BaseApp) { + app.SetPreBlocker(func(ctx sdk.Context, req *abci.FinalizeBlockRequest) error { + ctx.EventManager().EmitEvent(sdk.NewEvent("pre-block")) + return nil + }) + app.SetBeginBlocker(func(_ sdk.Context) (sdk.BeginBlock, error) { + return sdk.BeginBlock{ + Events: []abci.Event{ + {Type: "begin-block"}, + }, + }, nil + }) + app.SetEndBlocker(func(_ sdk.Context) (sdk.EndBlock, error) { + return sdk.EndBlock{ + Events: []abci.Event{ + {Type: "end-block"}, + }, + }, nil + }) + } + + suite := NewBaseAppSuite(t, anteOpt, distOpt, streamingManagerOpt, addListenerOpt, baseappOpts) + + _, err := suite.baseApp.InitChain( + &abci.InitChainRequest{ + ConsensusParams: &tmproto.ConsensusParams{}, + }, + ) + require.NoError(t, err) + deliverKey := []byte("deliver-key") + baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey}) + + txCount := 5 + txs := make([][]byte, txCount) + + for i := 0; i < txCount; i++ { + tx := newTxCounter(t, suite.txConfig, suite.ac, int64(i), int64(i)) + + txBytes, err := suite.txConfig.TxEncoder()(tx) + require.NoError(t, err) + + sKey := []byte(fmt.Sprintf("distKey%d", i)) + sVal := []byte(fmt.Sprintf("distVal%d", i)) + store := getFinalizeBlockStateCtx(suite.baseApp).KVStore(distKey1) + store.Set(sKey, sVal) + + txs[i] = txBytes + } + + _, err = suite.baseApp.FinalizeBlock(&abci.FinalizeBlockRequest{Height: 1, Txs: txs}) + require.NoError(t, err) + _, err = suite.baseApp.Commit() + require.NoError(t, err) + + // StartBlockData + require.Len(t, mockListener.startBlockData, 1) + require.Equal(t, uint64(1), mockListener.startBlockData[0].Height) + // TxData + txData := mockListener.txData + require.Len(t, txData, len(txs)) + for i := 0; i < txCount; i++ { + require.Equal(t, int32(i), txData[i].TxIndex) + txBytes, err := txData[i].Bytes() + require.NoError(t, err) + require.Equal(t, txs[i], txBytes) + } + // KVPairData + require.Len(t, mockListener.kvPairData, 1) + updates := mockListener.kvPairData[0].Updates + for i := 0; i < txCount; i++ { + require.Equal(t, []byte(distKey1.Name()), updates[i].Actor) + require.Len(t, updates[i].StateChanges, 1) + sKey := []byte(fmt.Sprintf("distKey%d", i)) + sVal := []byte(fmt.Sprintf("distVal%d", i)) + require.Equal(t, sKey, updates[i].StateChanges[0].Key) + require.Equal(t, sVal, updates[i].StateChanges[0].Value) + } + // CommitData + require.Len(t, mockListener.commitData, 1) + // EventData + require.Len(t, mockListener.eventData, 1) + events := mockListener.eventData[0].Events + require.Len(t, events, 3+txCount*3) + + for i := 0; i < 3; i++ { + require.Equal(t, int32(0), events[i].TxIndex) + require.Equal(t, int32(0), events[i].MsgIndex) + require.Equal(t, int32(1), events[i].EventIndex) + attrs, err := events[i].Attributes() + require.NoError(t, err) + require.Len(t, attrs, 2) + switch i { + case 0: + require.Equal(t, appdata.PreBlockStage, events[i].BlockStage) + require.Equal(t, "pre-block", events[i].Type) + case 1: + require.Equal(t, appdata.BeginBlockStage, events[i].BlockStage) + require.Equal(t, "begin-block", events[i].Type) + case 2: + require.Equal(t, appdata.EndBlockStage, events[i].BlockStage) + require.Equal(t, "end-block", events[i].Type) + } + } + + for i := 3; i < 3+txCount*3; i++ { + require.Equal(t, appdata.TxProcessingStage, events[i].BlockStage) + require.Equal(t, int32(i/3), events[i].TxIndex) + switch i % 3 { + case 0: + require.Equal(t, "ante_handler", events[i].Type) + require.Equal(t, int32(0), events[i].MsgIndex) + require.Equal(t, int32(0), events[i].EventIndex) + attrs, err := events[i].Attributes() + require.NoError(t, err) + require.Len(t, attrs, 2) + case 1: + require.Equal(t, "message", events[i].Type) + require.Equal(t, int32(1), events[i].MsgIndex) + require.Equal(t, int32(1), events[i].EventIndex) + attrs, err := events[i].Attributes() + require.NoError(t, err) + require.Len(t, attrs, 5) + case 2: + require.Equal(t, "message", events[i].Type) + require.Equal(t, int32(1), events[i].MsgIndex) + require.Equal(t, int32(2), events[i].EventIndex) + attrs, err := events[i].Attributes() + require.NoError(t, err) + require.Len(t, attrs, 4) + } + } +}