From 77e3325487541f5c2b9f5a99fe4eb924f762d5b0 Mon Sep 17 00:00:00 2001 From: Cool Developer Date: Fri, 27 Sep 2024 21:45:42 -0400 Subject: [PATCH 1/3] wip --- baseapp/streaming.go | 42 ++++++++++++++++++++++++++++++++++++------ 1 file changed, 36 insertions(+), 6 deletions(-) diff --git a/baseapp/streaming.go b/baseapp/streaming.go index 6eeb8e37b449..e37673a4948e 100644 --- a/baseapp/streaming.go +++ b/baseapp/streaming.go @@ -149,15 +149,45 @@ type listenerWrapper struct { func (p listenerWrapper) ListenFinalizeBlock(_ context.Context, req abci.FinalizeBlockRequest, res abci.FinalizeBlockResponse) error { if p.listener.StartBlock != nil { - err := p.listener.StartBlock(appdata.StartBlockData{ - Height: uint64(req.Height), - }) - if err != nil { + if err := p.listener.StartBlock(appdata.StartBlockData{ + Height: uint64(req.Height), + HeaderBytes: nil, // TODO: need to define a header struct including enc/decoding + HeaderJSON: nil, // TODO: need to define a header json struct + }); err != nil { return err } } - - //// TODO txs, events + if p.listener.OnTx != nil { + for i, tx := range req.Txs { + if err := p.listener.OnTx(appdata.TxData{ + TxIndex: int32(i), + Bytes: func() ([]byte, error) { return tx, nil }, + JSON: nil, // TODO: need to define a tx json struct + }); err != nil { + return err + } + } + } + if p.listener.OnEvent != nil { + events := make([]appdata.Event, len(res.Events)) + for i, event := range res.Events { + events[i] = appdata.Event{ + BlockStage: appdata.UnknownBlockStage, + Type: event.Type, + Data: nil, + Attributes: func() ([]appdata.EventAttribute, error) { + attrs := make([]appdata.EventAttribute, len(event.Attributes)) + for j, attr := range event.Attributes { + attrs[j] = appdata.EventAttribute{ + Key: attr.Key, + Value: attr.Value, + } + } + return attrs, nil + }, + } + } + } return nil } From da91c43ef1e4bda2d4fced0bbbd15e4b192f8d1b Mon Sep 17 00:00:00 2001 From: Cool Developer Date: Wed, 2 Oct 2024 20:07:25 -0400 Subject: [PATCH 2/3] add tests --- baseapp/abci.go | 9 ++- baseapp/abci_test.go | 10 +-- baseapp/baseapp.go | 17 ++++ baseapp/baseapp_test.go | 28 ++++++- baseapp/streaming.go | 95 ++++++++++++++++++---- baseapp/streaming_test.go | 161 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 293 insertions(+), 27 deletions(-) diff --git a/baseapp/abci.go b/baseapp/abci.go index 27118f0f3b10..dd091294d563 100644 --- a/baseapp/abci.go +++ b/baseapp/abci.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "sort" + "strconv" "strings" "time" @@ -831,7 +832,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz // NOTE: Not all raw transactions may adhere to the sdk.Tx interface, e.g. // vote extensions, so skip those. txResults := make([]*abci.ExecTxResult, 0, len(req.Txs)) - for _, rawTx := range req.Txs { + for txIndex, rawTx := range req.Txs { response := app.deliverTx(rawTx) @@ -843,6 +844,12 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Finaliz // continue } + // append the tx index to the response.Events + for i, event := range response.Events { + response.Events[i].Attributes = append(event.Attributes, + abci.EventAttribute{Key: "tx_index", Value: strconv.Itoa(txIndex)}) + } + txResults = append(txResults, response) } diff --git a/baseapp/abci_test.go b/baseapp/abci_test.go index fb61080916df..d759f77b0a5e 100644 --- a/baseapp/abci_test.go +++ b/baseapp/abci_test.go @@ -676,7 +676,7 @@ func TestABCI_FinalizeBlock_DeliverTx(t *testing.T) { events := res.TxResults[i].GetEvents() require.Len(t, events, 3, "should contain ante handler, message type and counter events respectively") - require.Equal(t, sdk.MarkEventsToIndex(counterEvent("ante_handler", counter).ToABCIEvents(), map[string]struct{}{})[0], events[0], "ante handler event") + require.Equal(t, sdk.MarkEventsToIndex(counterEvent("ante_handler", counter).ToABCIEvents(), map[string]struct{}{})[0].Attributes[0], events[0].Attributes[0], "ante handler event") require.Equal(t, sdk.MarkEventsToIndex(counterEvent(sdk.EventTypeMessage, counter).ToABCIEvents(), map[string]struct{}{})[0].Attributes[0], events[2].Attributes[0], "msg handler update counter event") } @@ -2588,10 +2588,10 @@ func TestBaseApp_VoteExtensions(t *testing.T) { require.NoError(t, err) // Check if the average price was available in FinalizeBlock's context - avgPrice = getFinalizeBlockStateCtx(suite.baseApp).KVStore(capKey1).Get([]byte("avgPrice")) - require.NotNil(t, avgPrice) - require.GreaterOrEqual(t, binary.BigEndian.Uint64(avgPrice), uint64(10000000)) - require.Less(t, binary.BigEndian.Uint64(avgPrice), uint64(11000000)) + // avgPrice = getFinalizeBlockStateCtx(suite.baseApp).KVStore(capKey1).Get([]byte("avgPrice")) + // require.NotNil(t, avgPrice) + // require.GreaterOrEqual(t, binary.BigEndian.Uint64(avgPrice), uint64(10000000)) + // require.Less(t, binary.BigEndian.Uint64(avgPrice), uint64(11000000)) _, err = suite.baseApp.Commit() require.NoError(t, err) diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index b5d100d263a9..2ef933c205c3 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -718,6 +718,15 @@ func (app *BaseApp) preBlock(req *abci.FinalizeBlockRequest) ([]abci.Event, erro ctx = ctx.WithBlockGasMeter(gasMeter) app.finalizeBlockState.SetContext(ctx) events = ctx.EventManager().ABCIEvents() + + // append PreBlock attributes to all events + for i, event := range events { + events[i].Attributes = append( + event.Attributes, + abci.EventAttribute{Key: "mode", Value: "PreBlock"}, + abci.EventAttribute{Key: "event_index", Value: strconv.Itoa(i)}, + ) + } } return events, nil } @@ -739,6 +748,7 @@ func (app *BaseApp) beginBlock(_ *abci.FinalizeBlockRequest) (sdk.BeginBlock, er resp.Events[i].Attributes = append( event.Attributes, abci.EventAttribute{Key: "mode", Value: "BeginBlock"}, + abci.EventAttribute{Key: "event_index", Value: strconv.Itoa(i)}, ) } @@ -801,6 +811,7 @@ func (app *BaseApp) endBlock(_ context.Context) (sdk.EndBlock, error) { eb.Events[i].Attributes = append( event.Attributes, abci.EventAttribute{Key: "mode", Value: "EndBlock"}, + abci.EventAttribute{Key: "event_index", Value: strconv.Itoa(i)}, ) } @@ -1151,6 +1162,12 @@ func createEvents(cdc codec.Codec, events sdk.Events, msg sdk.Msg, reflectMsg pr } } + // append the event_index attribute to all events + msgEvent = msgEvent.AppendAttributes(sdk.NewAttribute("event_index", "0")) + for i, event := range events { + events[i] = event.AppendAttributes(sdk.NewAttribute("event_index", strconv.Itoa(i+1))) + } + return sdk.Events{msgEvent}.AppendEvents(events), nil } diff --git a/baseapp/baseapp_test.go b/baseapp/baseapp_test.go index 60e474b851e4..ec93d35ab3b8 100644 --- a/baseapp/baseapp_test.go +++ b/baseapp/baseapp_test.go @@ -85,6 +85,26 @@ func NewBaseAppSuite(t *testing.T, opts ...func(*baseapp.BaseApp)) *BaseAppSuite app.SetTxEncoder(txConfig.TxEncoder()) app.SetVersionModifier(newMockedVersionModifier(0)) + // for event tests + app.SetPreBlocker(func(ctx sdk.Context, req *abci.FinalizeBlockRequest) error { + ctx.EventManager().EmitEvent(sdk.NewEvent("pre-block")) + return nil + }) + app.SetBeginBlocker(func(_ sdk.Context) (sdk.BeginBlock, error) { + return sdk.BeginBlock{ + Events: []abci.Event{ + {Type: "begin-block"}, + }, + }, nil + }) + app.SetEndBlocker(func(_ sdk.Context) (sdk.EndBlock, error) { + return sdk.EndBlock{ + Events: []abci.Event{ + {Type: "end-block"}, + }, + }, nil + }) + // mount stores and seal require.Nil(t, app.LoadLatestVersion()) @@ -596,7 +616,7 @@ func TestBaseAppAnteHandler(t *testing.T) { res, err := suite.baseApp.FinalizeBlock(&abci.FinalizeBlockRequest{Height: 1, Txs: [][]byte{txBytes}}) require.NoError(t, err) - require.Empty(t, res.Events) + require.Len(t, res.Events, 3) require.False(t, res.TxResults[0].IsOK(), fmt.Sprintf("%v", res)) ctx := getFinalizeBlockStateCtx(suite.baseApp) @@ -613,7 +633,7 @@ func TestBaseAppAnteHandler(t *testing.T) { res, err = suite.baseApp.FinalizeBlock(&abci.FinalizeBlockRequest{Height: 1, Txs: [][]byte{txBytes}}) require.NoError(t, err) - require.Empty(t, res.Events) + require.Len(t, res.Events, 3) require.False(t, res.TxResults[0].IsOK(), fmt.Sprintf("%v", res)) ctx = getFinalizeBlockStateCtx(suite.baseApp) @@ -669,7 +689,7 @@ func TestBaseAppPostHandler(t *testing.T) { res, err := suite.baseApp.FinalizeBlock(&abci.FinalizeBlockRequest{Height: 1, Txs: [][]byte{txBytes}}) require.NoError(t, err) - require.Empty(t, res.Events) + require.Len(t, res.Events, 3) require.True(t, res.TxResults[0].IsOK(), fmt.Sprintf("%v", res)) // PostHandler runs on successful message execution @@ -682,7 +702,7 @@ func TestBaseAppPostHandler(t *testing.T) { require.NoError(t, err) res, err = suite.baseApp.FinalizeBlock(&abci.FinalizeBlockRequest{Height: 1, Txs: [][]byte{txBytes}}) require.NoError(t, err) - require.Empty(t, res.Events) + require.Len(t, res.Events, 3) require.False(t, res.TxResults[0].IsOK(), fmt.Sprintf("%v", res)) require.True(t, postHandlerRun) diff --git a/baseapp/streaming.go b/baseapp/streaming.go index 7b8e8da6f440..3e30f8888d58 100644 --- a/baseapp/streaming.go +++ b/baseapp/streaming.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sort" + "strconv" "strings" abci "github.com/cometbft/cometbft/api/cometbft/abci/v1" @@ -143,16 +144,74 @@ func exposeStoreKeysSorted(keysStr []string, keys map[string]*storetypes.KVStore return exposeStoreKeys } +func eventToAppDataEvent(event abci.Event) (appdata.Event, error) { + appdataEvent := appdata.Event{ + Type: event.Type, + Attributes: func() ([]appdata.EventAttribute, error) { + attrs := make([]appdata.EventAttribute, len(event.Attributes)) + for j, attr := range event.Attributes { + attrs[j] = appdata.EventAttribute{ + Key: attr.Key, + Value: attr.Value, + } + } + return attrs, nil + }, + } + + for _, attr := range event.Attributes { + if attr.Key == "mode" { + switch attr.Value { + case "PreBlock": + appdataEvent.BlockStage = appdata.PreBlockStage + case "BeginBlock": + appdataEvent.BlockStage = appdata.BeginBlockStage + case "EndBlock": + appdataEvent.BlockStage = appdata.EndBlockStage + default: + appdataEvent.BlockStage = appdata.UnknownBlockStage + } + } else if attr.Key == "tx_index" { + txIndex, err := strconv.Atoi(attr.Value) + if err != nil { + return appdata.Event{}, err + } + appdataEvent.TxIndex = int32(txIndex + 1) + appdataEvent.BlockStage = appdata.TxProcessingStage + } else if attr.Key == "msg_index" { + msgIndex, err := strconv.Atoi(attr.Value) + if err != nil { + return appdata.Event{}, err + } + appdataEvent.MsgIndex = int32(msgIndex + 1) + } else if attr.Key == "event_index" { + eventIndex, err := strconv.Atoi(attr.Value) + if err != nil { + return appdata.Event{}, err + } + appdataEvent.EventIndex = int32(eventIndex + 1) + } + } + + return appdataEvent, nil +} + type listenerWrapper struct { listener appdata.Listener } +// NewListenerWrapper creates a new listenerWrapper. +// It is only used for testing purposes. +func NewListenerWrapper(listener appdata.Listener) listenerWrapper { + return listenerWrapper{listener: listener} +} + func (p listenerWrapper) ListenFinalizeBlock(_ context.Context, req abci.FinalizeBlockRequest, res abci.FinalizeBlockResponse) error { if p.listener.StartBlock != nil { if err := p.listener.StartBlock(appdata.StartBlockData{ Height: uint64(req.Height), - HeaderBytes: nil, // TODO: need to define a header struct including enc/decoding - HeaderJSON: nil, // TODO: need to define a header json struct + HeaderBytes: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009 + HeaderJSON: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009 }); err != nil { return err } @@ -162,7 +221,7 @@ func (p listenerWrapper) ListenFinalizeBlock(_ context.Context, req abci.Finaliz if err := p.listener.OnTx(appdata.TxData{ TxIndex: int32(i), Bytes: func() ([]byte, error) { return tx, nil }, - JSON: nil, // TODO: need to define a tx json struct + JSON: nil, // TODO: https://github.com/cosmos/cosmos-sdk/issues/22009 }); err != nil { return err } @@ -170,23 +229,25 @@ func (p listenerWrapper) ListenFinalizeBlock(_ context.Context, req abci.Finaliz } if p.listener.OnEvent != nil { events := make([]appdata.Event, len(res.Events)) + var err error for i, event := range res.Events { - events[i] = appdata.Event{ - BlockStage: appdata.UnknownBlockStage, - Type: event.Type, - Data: nil, - Attributes: func() ([]appdata.EventAttribute, error) { - attrs := make([]appdata.EventAttribute, len(event.Attributes)) - for j, attr := range event.Attributes { - attrs[j] = appdata.EventAttribute{ - Key: attr.Key, - Value: attr.Value, - } - } - return attrs, nil - }, + events[i], err = eventToAppDataEvent(event) + if err != nil { + return err + } + } + for _, txResult := range res.TxResults { + for _, event := range txResult.Events { + appdataEvent, err := eventToAppDataEvent(event) + if err != nil { + return err + } + events = append(events, appdataEvent) } } + if err := p.listener.OnEvent(appdata.EventData{Events: events}); err != nil { + return err + } } return nil diff --git a/baseapp/streaming_test.go b/baseapp/streaming_test.go index b0779c6b91ca..846bb923db53 100644 --- a/baseapp/streaming_test.go +++ b/baseapp/streaming_test.go @@ -9,6 +9,7 @@ import ( tmproto "github.com/cometbft/cometbft/api/cometbft/types/v1" "github.com/stretchr/testify/require" + "cosmossdk.io/schema/appdata" storetypes "cosmossdk.io/store/types" "github.com/cosmos/cosmos-sdk/baseapp" @@ -146,3 +147,163 @@ func Test_Ctx_with_StreamingManager(t *testing.T) { require.NoError(t, err) } } + +type mockAppDataListener struct { + appdata.Listener + + startBlockData []appdata.StartBlockData + txData []appdata.TxData + eventData []appdata.EventData + kvPairData []appdata.KVPairData + commitData []appdata.CommitData +} + +func newMockAppDataListener() *mockAppDataListener { + listener := &mockAppDataListener{} + + // Initialize the Listener with custom behavior to store data + listener.Listener = appdata.Listener{ + StartBlock: func(data appdata.StartBlockData) error { + listener.startBlockData = append(listener.startBlockData, data) // Store StartBlockData + return nil + }, + OnTx: func(data appdata.TxData) error { + listener.txData = append(listener.txData, data) // Store TxData + return nil + }, + OnEvent: func(data appdata.EventData) error { + listener.eventData = append(listener.eventData, data) // Store EventData + return nil + }, + OnKVPair: func(data appdata.KVPairData) error { + listener.kvPairData = append(listener.kvPairData, data) // Store KVPairData + return nil + }, + Commit: func(data appdata.CommitData) (func() error, error) { + listener.commitData = append(listener.commitData, data) // Store CommitData + return nil, nil + }, + } + + return listener +} + +func TestAppDataListener(t *testing.T) { + anteKey := []byte("ante-key") + anteOpt := func(bapp *baseapp.BaseApp) { bapp.SetAnteHandler(anteHandlerTxTest(t, capKey1, anteKey)) } + distOpt := func(bapp *baseapp.BaseApp) { bapp.MountStores(distKey1) } + mockListener := newMockAppDataListener() + streamingManager := storetypes.StreamingManager{ABCIListeners: []storetypes.ABCIListener{baseapp.NewListenerWrapper(mockListener.Listener)}} + streamingManagerOpt := func(bapp *baseapp.BaseApp) { bapp.SetStreamingManager(streamingManager) } + addListenerOpt := func(bapp *baseapp.BaseApp) { bapp.CommitMultiStore().AddListeners([]storetypes.StoreKey{distKey1}) } + suite := NewBaseAppSuite(t, anteOpt, distOpt, streamingManagerOpt, addListenerOpt) + + _, err := suite.baseApp.InitChain( + &abci.InitChainRequest{ + ConsensusParams: &tmproto.ConsensusParams{}, + }, + ) + require.NoError(t, err) + deliverKey := []byte("deliver-key") + baseapptestutil.RegisterCounterServer(suite.baseApp.MsgServiceRouter(), CounterServerImpl{t, capKey1, deliverKey}) + + txCount := 5 + txs := make([][]byte, txCount) + + for i := 0; i < txCount; i++ { + tx := newTxCounter(t, suite.txConfig, suite.ac, int64(i), int64(i)) + + txBytes, err := suite.txConfig.TxEncoder()(tx) + require.NoError(t, err) + + sKey := []byte(fmt.Sprintf("distKey%d", i)) + sVal := []byte(fmt.Sprintf("distVal%d", i)) + store := getFinalizeBlockStateCtx(suite.baseApp).KVStore(distKey1) + store.Set(sKey, sVal) + + txs[i] = txBytes + } + + _, err = suite.baseApp.FinalizeBlock(&abci.FinalizeBlockRequest{Height: 1, Txs: txs}) + require.NoError(t, err) + _, err = suite.baseApp.Commit() + require.NoError(t, err) + + // StartBlockData + require.Len(t, mockListener.startBlockData, 1) + require.Equal(t, uint64(1), mockListener.startBlockData[0].Height) + // TxData + txData := mockListener.txData + require.Len(t, txData, len(txs)) + for i := 0; i < txCount; i++ { + require.Equal(t, int32(i), txData[i].TxIndex) + txBytes, err := txData[i].Bytes() + require.NoError(t, err) + require.Equal(t, txs[i], txBytes) + } + // KVPairData + require.Len(t, mockListener.kvPairData, 1) + updates := mockListener.kvPairData[0].Updates + for i := 0; i < txCount; i++ { + require.Equal(t, []byte(distKey1.Name()), updates[i].Actor) + require.Len(t, updates[i].StateChanges, 1) + sKey := []byte(fmt.Sprintf("distKey%d", i)) + sVal := []byte(fmt.Sprintf("distVal%d", i)) + require.Equal(t, sKey, updates[i].StateChanges[0].Key) + require.Equal(t, sVal, updates[i].StateChanges[0].Value) + } + // CommitData + require.Len(t, mockListener.commitData, 1) + // EventData + require.Len(t, mockListener.eventData, 1) + events := mockListener.eventData[0].Events + require.Len(t, events, 3+txCount*3) + + for i := 0; i < 3; i++ { + require.Equal(t, int32(0), events[i].TxIndex) + require.Equal(t, int32(0), events[i].MsgIndex) + require.Equal(t, int32(1), events[i].EventIndex) + attrs, err := events[i].Attributes() + require.NoError(t, err) + require.Len(t, attrs, 2) + switch i { + case 0: + require.Equal(t, appdata.PreBlockStage, events[i].BlockStage) + require.Equal(t, "pre-block", events[i].Type) + case 1: + require.Equal(t, appdata.BeginBlockStage, events[i].BlockStage) + require.Equal(t, "begin-block", events[i].Type) + case 2: + require.Equal(t, appdata.EndBlockStage, events[i].BlockStage) + require.Equal(t, "end-block", events[i].Type) + } + } + + for i := 3; i < 3+txCount*3; i++ { + require.Equal(t, appdata.TxProcessingStage, events[i].BlockStage) + require.Equal(t, int32(i/3), events[i].TxIndex) + switch i % 3 { + case 0: + require.Equal(t, "ante_handler", events[i].Type) + require.Equal(t, int32(0), events[i].MsgIndex) + require.Equal(t, int32(0), events[i].EventIndex) + attrs, err := events[i].Attributes() + require.NoError(t, err) + require.Len(t, attrs, 2) + case 1: + require.Equal(t, "message", events[i].Type) + require.Equal(t, int32(1), events[i].MsgIndex) + require.Equal(t, int32(1), events[i].EventIndex) + attrs, err := events[i].Attributes() + require.NoError(t, err) + require.Len(t, attrs, 5) + case 2: + require.Equal(t, "message", events[i].Type) + require.Equal(t, int32(1), events[i].MsgIndex) + require.Equal(t, int32(2), events[i].EventIndex) + attrs, err := events[i].Attributes() + require.NoError(t, err) + require.Len(t, attrs, 4) + } + } +} From b41a60bfe5f4b06aa2d42a244fb0f3f47685a380 Mon Sep 17 00:00:00 2001 From: Cool Developer Date: Wed, 2 Oct 2024 20:17:54 -0400 Subject: [PATCH 3/3] tests --- baseapp/abci_test.go | 8 ++++---- baseapp/baseapp_test.go | 28 ++++------------------------ baseapp/streaming_test.go | 26 +++++++++++++++++++++++++- 3 files changed, 33 insertions(+), 29 deletions(-) diff --git a/baseapp/abci_test.go b/baseapp/abci_test.go index d759f77b0a5e..111a86938356 100644 --- a/baseapp/abci_test.go +++ b/baseapp/abci_test.go @@ -2588,10 +2588,10 @@ func TestBaseApp_VoteExtensions(t *testing.T) { require.NoError(t, err) // Check if the average price was available in FinalizeBlock's context - // avgPrice = getFinalizeBlockStateCtx(suite.baseApp).KVStore(capKey1).Get([]byte("avgPrice")) - // require.NotNil(t, avgPrice) - // require.GreaterOrEqual(t, binary.BigEndian.Uint64(avgPrice), uint64(10000000)) - // require.Less(t, binary.BigEndian.Uint64(avgPrice), uint64(11000000)) + avgPrice = getFinalizeBlockStateCtx(suite.baseApp).KVStore(capKey1).Get([]byte("avgPrice")) + require.NotNil(t, avgPrice) + require.GreaterOrEqual(t, binary.BigEndian.Uint64(avgPrice), uint64(10000000)) + require.Less(t, binary.BigEndian.Uint64(avgPrice), uint64(11000000)) _, err = suite.baseApp.Commit() require.NoError(t, err) diff --git a/baseapp/baseapp_test.go b/baseapp/baseapp_test.go index ec93d35ab3b8..60e474b851e4 100644 --- a/baseapp/baseapp_test.go +++ b/baseapp/baseapp_test.go @@ -85,26 +85,6 @@ func NewBaseAppSuite(t *testing.T, opts ...func(*baseapp.BaseApp)) *BaseAppSuite app.SetTxEncoder(txConfig.TxEncoder()) app.SetVersionModifier(newMockedVersionModifier(0)) - // for event tests - app.SetPreBlocker(func(ctx sdk.Context, req *abci.FinalizeBlockRequest) error { - ctx.EventManager().EmitEvent(sdk.NewEvent("pre-block")) - return nil - }) - app.SetBeginBlocker(func(_ sdk.Context) (sdk.BeginBlock, error) { - return sdk.BeginBlock{ - Events: []abci.Event{ - {Type: "begin-block"}, - }, - }, nil - }) - app.SetEndBlocker(func(_ sdk.Context) (sdk.EndBlock, error) { - return sdk.EndBlock{ - Events: []abci.Event{ - {Type: "end-block"}, - }, - }, nil - }) - // mount stores and seal require.Nil(t, app.LoadLatestVersion()) @@ -616,7 +596,7 @@ func TestBaseAppAnteHandler(t *testing.T) { res, err := suite.baseApp.FinalizeBlock(&abci.FinalizeBlockRequest{Height: 1, Txs: [][]byte{txBytes}}) require.NoError(t, err) - require.Len(t, res.Events, 3) + require.Empty(t, res.Events) require.False(t, res.TxResults[0].IsOK(), fmt.Sprintf("%v", res)) ctx := getFinalizeBlockStateCtx(suite.baseApp) @@ -633,7 +613,7 @@ func TestBaseAppAnteHandler(t *testing.T) { res, err = suite.baseApp.FinalizeBlock(&abci.FinalizeBlockRequest{Height: 1, Txs: [][]byte{txBytes}}) require.NoError(t, err) - require.Len(t, res.Events, 3) + require.Empty(t, res.Events) require.False(t, res.TxResults[0].IsOK(), fmt.Sprintf("%v", res)) ctx = getFinalizeBlockStateCtx(suite.baseApp) @@ -689,7 +669,7 @@ func TestBaseAppPostHandler(t *testing.T) { res, err := suite.baseApp.FinalizeBlock(&abci.FinalizeBlockRequest{Height: 1, Txs: [][]byte{txBytes}}) require.NoError(t, err) - require.Len(t, res.Events, 3) + require.Empty(t, res.Events) require.True(t, res.TxResults[0].IsOK(), fmt.Sprintf("%v", res)) // PostHandler runs on successful message execution @@ -702,7 +682,7 @@ func TestBaseAppPostHandler(t *testing.T) { require.NoError(t, err) res, err = suite.baseApp.FinalizeBlock(&abci.FinalizeBlockRequest{Height: 1, Txs: [][]byte{txBytes}}) require.NoError(t, err) - require.Len(t, res.Events, 3) + require.Empty(t, res.Events) require.False(t, res.TxResults[0].IsOK(), fmt.Sprintf("%v", res)) require.True(t, postHandlerRun) diff --git a/baseapp/streaming_test.go b/baseapp/streaming_test.go index 846bb923db53..cb3c065be31f 100644 --- a/baseapp/streaming_test.go +++ b/baseapp/streaming_test.go @@ -14,6 +14,7 @@ import ( "github.com/cosmos/cosmos-sdk/baseapp" baseapptestutil "github.com/cosmos/cosmos-sdk/baseapp/testutil" + sdk "github.com/cosmos/cosmos-sdk/types" ) var _ storetypes.ABCIListener = (*MockABCIListener)(nil) @@ -196,7 +197,30 @@ func TestAppDataListener(t *testing.T) { streamingManager := storetypes.StreamingManager{ABCIListeners: []storetypes.ABCIListener{baseapp.NewListenerWrapper(mockListener.Listener)}} streamingManagerOpt := func(bapp *baseapp.BaseApp) { bapp.SetStreamingManager(streamingManager) } addListenerOpt := func(bapp *baseapp.BaseApp) { bapp.CommitMultiStore().AddListeners([]storetypes.StoreKey{distKey1}) } - suite := NewBaseAppSuite(t, anteOpt, distOpt, streamingManagerOpt, addListenerOpt) + + // for event tests + baseappOpts := func(app *baseapp.BaseApp) { + app.SetPreBlocker(func(ctx sdk.Context, req *abci.FinalizeBlockRequest) error { + ctx.EventManager().EmitEvent(sdk.NewEvent("pre-block")) + return nil + }) + app.SetBeginBlocker(func(_ sdk.Context) (sdk.BeginBlock, error) { + return sdk.BeginBlock{ + Events: []abci.Event{ + {Type: "begin-block"}, + }, + }, nil + }) + app.SetEndBlocker(func(_ sdk.Context) (sdk.EndBlock, error) { + return sdk.EndBlock{ + Events: []abci.Event{ + {Type: "end-block"}, + }, + }, nil + }) + } + + suite := NewBaseAppSuite(t, anteOpt, distOpt, streamingManagerOpt, addListenerOpt, baseappOpts) _, err := suite.baseApp.InitChain( &abci.InitChainRequest{