Skip to content

Commit

Permalink
refactor: move Events to from ResponseFinalizeBlock to ResponseProces…
Browse files Browse the repository at this point in the history
…sProposal
  • Loading branch information
lklimek committed Mar 21, 2024
1 parent da62f56 commit 6247e04
Show file tree
Hide file tree
Showing 28 changed files with 426 additions and 508 deletions.
7 changes: 3 additions & 4 deletions abci/example/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,12 +373,14 @@ func (app *Application) ProcessProposal(_ context.Context, req *abci.RequestProc
Status: abci.ResponseProcessProposal_REJECT,
}, err
}

resp := &abci.ResponseProcessProposal{
Status: abci.ResponseProcessProposal_ACCEPT,
AppHash: roundState.GetAppHash(),
TxResults: txResults,
ConsensusParamUpdates: app.getConsensusParamsUpdate(req.Height),
ValidatorSetUpdate: app.getValidatorSetUpdate(req.Height),
Events: []abci.Event{app.eventValUpdate(req.Height)},
}

if app.cfg.ProcessProposalDelayMS != 0 {
Expand Down Expand Up @@ -425,10 +427,7 @@ func (app *Application) FinalizeBlock(_ context.Context, req *abci.RequestFinali
return nil, err
}
}
events := []abci.Event{app.eventValUpdate(req.Height)}
resp := &abci.ResponseFinalizeBlock{
Events: events,
}
resp := &abci.ResponseFinalizeBlock{}
if app.RetainBlocks > 0 && app.LastCommittedState.GetHeight() >= app.RetainBlocks {
resp.RetainHeight = app.LastCommittedState.GetHeight() - app.RetainBlocks + 1
}
Expand Down
8 changes: 4 additions & 4 deletions abci/example/kvstore/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,16 @@ func testKVStore(ctx context.Context, t *testing.T, app types.Application, tx []
require.NoError(t, err)
require.Len(t, respProcess.TxResults, 1)
require.False(t, respProcess.TxResults[0].IsErr(), respProcess.TxResults[0].Log)
require.Len(t, respProcess.Events, 1)

// Duplicate ProcessProposal calls should return error
_, err = app.ProcessProposal(ctx, reqProcess)
require.ErrorContains(t, err, "duplicate ProcessProposal call")

reqFin := &types.RequestFinalizeBlock{Height: height}
reqFin.Block, reqFin.BlockID = makeBlock(t, height, [][]byte{tx}, respPrep.AppHash)
respFin, err := app.FinalizeBlock(ctx, reqFin)
_, err = app.FinalizeBlock(ctx, reqFin)
require.NoError(t, err)
require.Equal(t, 1, len(respFin.Events))

// repeating tx raises an error
_, err = app.FinalizeBlock(ctx, reqFin)
Expand Down Expand Up @@ -290,12 +290,12 @@ func makeApplyBlock(
require.NoError(t, err)
require.NotZero(t, respProcessProposal)
require.Equal(t, types.ResponseProcessProposal_ACCEPT, respProcessProposal.Status)
require.Len(t, respProcessProposal.Events, 1)

rfb := &types.RequestFinalizeBlock{Hash: hash, Height: height}
rfb.Block, rfb.BlockID = makeBlock(t, height, txs, respProcessProposal.AppHash)
resFinalizeBlock, err := kvstore.FinalizeBlock(ctx, rfb)
require.NoError(t, err)
require.Len(t, resFinalizeBlock.Events, 1)

return respProcessProposal, resFinalizeBlock
}
Expand Down Expand Up @@ -413,13 +413,13 @@ func testClient(ctx context.Context, t *testing.T, app abciclient.Client, height
require.NotZero(t, rpp)
require.Equal(t, 1, len(rpp.TxResults))
require.False(t, rpp.TxResults[0].IsErr())
require.Len(t, rpp.Events, 1)

rfb := &types.RequestFinalizeBlock{Height: height}
rfb.Block, rfb.BlockID = makeBlock(t, height, [][]byte{tx}, rpp.AppHash)
ar, err := app.FinalizeBlock(ctx, rfb)
require.NoError(t, err)
require.Zero(t, ar.RetainHeight)
require.Len(t, ar.Events, 1)

info, err := app.Info(ctx, &types.RequestInfo{})
require.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions abci/example/kvstore/verify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func TestVerifyBlockCommit(t *testing.T) {
assert.Len(t, respPrep.TxRecords, 1)
require.Equal(t, 1, len(respPrep.TxResults))
require.False(t, respPrep.TxResults[0].IsErr(), respPrep.TxResults[0].Log)

pbBlock, err := block.ToProto()
require.NoError(t, err)
blockID := block.BlockID(nil)
Expand All @@ -70,9 +71,8 @@ func TestVerifyBlockCommit(t *testing.T) {
Block: pbBlock,
BlockID: &pbBlockID,
}
respFb, err := kvstore.FinalizeBlock(ctx, reqFb)
_, err = kvstore.FinalizeBlock(ctx, reqFb)
require.NoError(t, err)
require.Equal(t, 1, len(respFb.Events))
}

type blockExecutor struct {
Expand Down
591 changes: 296 additions & 295 deletions abci/types/types.pb.go

Large diffs are not rendered by default.

4 changes: 1 addition & 3 deletions cmd/tenderdash/commands/reindex_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,7 @@ func eventReIndex(cmd *cobra.Command, args eventReIndexArgs) error {
NumTxs: int64(len(b.Txs)),
ResultProcessProposal: *r.ProcessProposal,
}
if r.FinalizeBlock != nil {
e.ResultFinalizeBlock = *r.FinalizeBlock
}

var batch *indexer.Batch
if e.NumTxs > 0 {
batch = indexer.NewBatch(e.NumTxs)
Expand Down
7 changes: 2 additions & 5 deletions internal/consensus/replay_stubs.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,7 @@ func (mock *mockProxyApp) ProcessProposal(_ context.Context, _req *abci.RequestP
}

func (mock *mockProxyApp) FinalizeBlock(_ context.Context, _req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) {
r := mock.abciResponses.FinalizeBlock
mock.txCount++
if r == nil {
return &abci.ResponseFinalizeBlock{}, nil
}
return r, nil

return &abci.ResponseFinalizeBlock{}, nil
}
21 changes: 9 additions & 12 deletions internal/consensus/replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,19 +234,18 @@ func (r *BlockReplayer) replayBlocks(
var (
block *types.Block
commit *types.Commit
fbResp *abci.ResponseFinalizeBlock
ucState sm.CurrentRoundState
)
for i := firstBlock; i <= finalBlock; i++ {
block = r.store.LoadBlock(i)
commit = r.store.LoadSeenCommitAt(i)
ucState, fbResp, err = r.replayBlock(ctx, block, commit, state, i)
ucState, err = r.replayBlock(ctx, block, commit, state, i)
if err != nil {
return nil, err
}
}
if !mutateState {
err = r.publishEvents(block, ucState, fbResp)
err = r.publishEvents(block, ucState)
if err != nil {
return nil, err
}
Expand All @@ -273,27 +272,26 @@ func (r *BlockReplayer) replayBlock(
commit *types.Commit,
state sm.State,
height int64,
) (sm.CurrentRoundState, *abci.ResponseFinalizeBlock, error) {
) (sm.CurrentRoundState, error) {
r.logger.Info("Replay: applying block", "height", height)
// Extra check to ensure the app was not changed in a way it shouldn't have.
ucState, err := r.blockExec.ProcessProposal(ctx, block, commit.Round, state, false)
if err != nil {
return sm.CurrentRoundState{}, nil, fmt.Errorf("blockReplayer process proposal: %w", err)
return sm.CurrentRoundState{}, fmt.Errorf("blockReplayer process proposal: %w", err)
}

// We emit events for the index services at the final block due to the sync issue when
// the node shutdown during the block committing status.
// For all other cases, we disable emitting events by providing blockExec=nil in ExecReplayedCommitBlock
fbResp, err := sm.ExecReplayedCommitBlock(ctx, r.appClient, block, commit, r.logger)
_, err = sm.ExecReplayedCommitBlock(ctx, r.appClient, block, commit, r.logger)
if err != nil {
return sm.CurrentRoundState{}, nil, err
return sm.CurrentRoundState{}, err
}
// Extra check to ensure the app was not changed in a way it shouldn't have.
if err := checkAppHashEqualsOneFromBlock(ucState.AppHash, block); err != nil {
return sm.CurrentRoundState{}, nil, err
return sm.CurrentRoundState{}, err
}
r.nBlocks++
return ucState, fbResp, nil
return ucState, nil
}

// syncStateAt loads block's data for a height H to sync it with the application.
Expand Down Expand Up @@ -374,10 +372,9 @@ func (r *BlockReplayer) execInitChain(ctx context.Context, rs *replayState, stat
func (r *BlockReplayer) publishEvents(
block *types.Block,
ucState sm.CurrentRoundState,
fbResp *abci.ResponseFinalizeBlock,
) error {
blockID := block.BlockID(nil)
es := sm.NewFullEventSet(block, blockID, ucState, fbResp, ucState.NextValidators)
es := sm.NewFullEventSet(block, blockID, ucState, ucState.NextValidators)
err := es.Publish(r.publisher)
if err != nil {
r.logger.Error("failed publishing event", "err", err)
Expand Down
4 changes: 2 additions & 2 deletions internal/eventbus/event_bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (b *EventBus) Publish(eventValue string, eventData types.EventData) error {
}

func (b *EventBus) PublishEventNewBlock(data types.EventDataNewBlock) error {
events := data.ResultFinalizeBlock.Events
events := data.ResultProcessProposal.Events

// add Tendermint-reserved new block event
events = append(events, types.EventNewBlock)
Expand All @@ -93,7 +93,7 @@ func (b *EventBus) PublishEventNewBlock(data types.EventDataNewBlock) error {
func (b *EventBus) PublishEventNewBlockHeader(data types.EventDataNewBlockHeader) error {
// no explicit deadline for publishing events

events := data.ResultFinalizeBlock.Events
events := data.ResultProcessProposal.Events

// add Tendermint-reserved new block header event
events = append(events, types.EventNewBlockHeader)
Expand Down
27 changes: 12 additions & 15 deletions internal/eventbus/event_bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestEventBusPublishEventNewBlock(t *testing.T) {
bps, err := block.MakePartSet(types.BlockPartSizeBytes)
require.NoError(t, err)
blockID := types.BlockID{Hash: block.Hash(), PartSetHeader: bps.Header()}
resultFinalizeBlock := abci.ResponseFinalizeBlock{
respProcessProposal := abci.ResponseProcessProposal{
Events: []abci.Event{
{Type: "testType", Attributes: []abci.EventAttribute{
{Key: "baz", Value: "1"},
Expand All @@ -112,13 +112,13 @@ func TestEventBusPublishEventNewBlock(t *testing.T) {
edt := msg.Data().(types.EventDataNewBlock)
assert.Equal(t, block, edt.Block)
assert.Equal(t, blockID, edt.BlockID)
assert.Equal(t, resultFinalizeBlock, edt.ResultFinalizeBlock)
assert.Equal(t, respProcessProposal, edt.ResultProcessProposal)
}()

err = eventBus.PublishEventNewBlock(types.EventDataNewBlock{
Block: block,
BlockID: blockID,
ResultFinalizeBlock: resultFinalizeBlock,
Block: block,
BlockID: blockID,
ResultProcessProposal: respProcessProposal,
})
assert.NoError(t, err)

Expand Down Expand Up @@ -256,21 +256,20 @@ func TestEventBusPublishEventNewBlockHeader(t *testing.T) {

block := types.MakeBlock(0, []types.Tx{}, nil, []types.Evidence{})
block.SetDashParams(0, nil, 1, nil)
resultFinalizeBlock := abci.ResponseFinalizeBlock{
Events: []abci.Event{
{Type: "testType", Attributes: []abci.EventAttribute{
{Key: "baz", Value: "1"},
{Key: "foz", Value: "2"},
}},
},
}

resultProcessProposal := abci.ResponseProcessProposal{
Status: abci.ResponseProcessProposal_ACCEPT,
AppHash: make([]byte, crypto.DefaultAppHashSize),
TxResults: []*abci.ExecTxResult{
{Code: abci.CodeTypeOK, Data: []byte("baz=1")},
{Code: abci.CodeTypeOK, Data: []byte("foz=2")},
},
Events: []abci.Event{
{Type: "testType", Attributes: []abci.EventAttribute{
{Key: "baz", Value: "1"},
{Key: "foz", Value: "2"},
}},
},
}

// PublishEventNewBlockHeader adds the tm.event compositeKey, so the query below should work
Expand All @@ -289,13 +288,11 @@ func TestEventBusPublishEventNewBlockHeader(t *testing.T) {

edt := msg.Data().(types.EventDataNewBlockHeader)
assert.Equal(t, block.Header, edt.Header)
assert.Equal(t, resultFinalizeBlock, edt.ResultFinalizeBlock)
}()

err = eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{
Header: block.Header,
ResultProcessProposal: resultProcessProposal,
ResultFinalizeBlock: resultFinalizeBlock,
})
assert.NoError(t, err)

Expand Down
1 change: 0 additions & 1 deletion internal/inspect/inspect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ func TestBlockResults(t *testing.T) {
testGasUsed := int64(100)
stateStoreMock := &statemocks.Store{}
stateStoreMock.On("LoadABCIResponses", testHeight).Return(&state.ABCIResponses{
FinalizeBlock: &abcitypes.ResponseFinalizeBlock{},
ProcessProposal: &abcitypes.ResponseProcessProposal{
TxResults: []*abcitypes.ExecTxResult{
{
Expand Down
2 changes: 1 addition & 1 deletion internal/rpc/core/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (env *Environment) BlockResults(_ctx context.Context, req *coretypes.Reques
Height: height,
TxsResults: results.ProcessProposal.TxResults,
TotalGasUsed: totalGasUsed,
FinalizeBlockEvents: results.FinalizeBlock.Events,
BlockEvents: results.ProcessProposal.Events,
ValidatorSetUpdate: results.ProcessProposal.ValidatorSetUpdate,
ConsensusParamUpdates: consensusParamsPtrFromProtoPtr(results.ProcessProposal.ConsensusParamUpdates),
}, nil
Expand Down
3 changes: 1 addition & 2 deletions internal/rpc/core/blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ func TestBlockchainInfo(t *testing.T) {

func TestBlockResults(t *testing.T) {
results := state.ABCIResponses{
FinalizeBlock: &abci.ResponseFinalizeBlock{},
ProcessProposal: &abci.ResponseProcessProposal{
TxResults: []*abci.ExecTxResult{
{Code: 0, Data: []byte{0x01}, Log: "ok", GasUsed: 10},
Expand Down Expand Up @@ -102,7 +101,7 @@ func TestBlockResults(t *testing.T) {
Height: 100,
TxsResults: results.ProcessProposal.TxResults,
TotalGasUsed: 15,
FinalizeBlockEvents: results.FinalizeBlock.Events,
BlockEvents: results.ProcessProposal.Events,
ValidatorSetUpdate: results.ProcessProposal.ValidatorSetUpdate,
ConsensusParamUpdates: consensusParamsPtrFromProtoPtr(results.ProcessProposal.ConsensusParamUpdates),
}},
Expand Down
17 changes: 7 additions & 10 deletions internal/state/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,13 @@ func NewFullEventSet(
block *types.Block,
blockID types.BlockID,
uncommittedState CurrentRoundState,
fbResp *abci.ResponseFinalizeBlock,
validatorsSet *types.ValidatorSet,
) EventSet {
rpp := uncommittedState.Params.ToProcessProposal()
responseProcessProposal := uncommittedState.Params.ToProcessProposal()
es := EventSet{}
es.
WithNewBlock(block, blockID, *fbResp).
WthNewBlockHeader(block, *rpp, *fbResp).
WithNewBlock(block, blockID, *responseProcessProposal).
WthNewBlockHeader(block, *responseProcessProposal).
WithNewEvidences(block).
WithTxs(block, uncommittedState.TxResults).
WithValidatorSetUpdate(validatorsSet)
Expand All @@ -41,12 +40,12 @@ func NewFullEventSet(
func (e *EventSet) WithNewBlock(
block *types.Block,
blockID types.BlockID,
fbResp abci.ResponseFinalizeBlock,
responseProcessProposal abci.ResponseProcessProposal,
) *EventSet {
e.NewBlock = &types.EventDataNewBlock{
Block: block,
BlockID: blockID,
ResultFinalizeBlock: fbResp,
Block: block,
BlockID: blockID,
ResultProcessProposal: responseProcessProposal,
}
return e
}
Expand All @@ -55,13 +54,11 @@ func (e *EventSet) WithNewBlock(
func (e *EventSet) WthNewBlockHeader(
block *types.Block,
ppResp abci.ResponseProcessProposal,
fbResp abci.ResponseFinalizeBlock,
) *EventSet {
e.NewBlockHeader = &types.EventDataNewBlockHeader{
Header: block.Header,
NumTxs: int64(len(block.Txs)),
ResultProcessProposal: ppResp,
ResultFinalizeBlock: fbResp,
}
return e
}
Expand Down
Loading

0 comments on commit 6247e04

Please sign in to comment.