Skip to content

Commit

Permalink
Add Prepare and Process Proposal ABCI++ methods (#631)
Browse files Browse the repository at this point in the history
* abci: PrepareProposal (#6544)

* regenerate mocks, proto, mod/sum, and clean up remaining preprocesstxs

* fix tests and revert to old go.mod

* mockery

* add processproposal proto/boilerplate/logic

* gofmt

* fix test

* move UNKNOWN response behaviour to reject

* fix test and add testing util code

* pass full block data when proposing or processing proposals

* linter

* add the process proposal method to the e2e app

* add missing kvstore abci method

* pass block data and results for bass app

* use correct kvstore process logic for kvstore app

* remove linting comment

Co-authored-by: Ismail Khoffi <[email protected]>

* formatting

Co-authored-by: John Adler <[email protected]>

* use go generate instead of make mockery

* add link

Co-authored-by: Marko <[email protected]>
Co-authored-by: mconcat <[email protected]>
Co-authored-by: Ismail Khoffi <[email protected]>
Co-authored-by: John Adler <[email protected]>
  • Loading branch information
5 people authored Mar 30, 2022
1 parent 721562a commit 7356306
Show file tree
Hide file tree
Showing 41 changed files with 2,752 additions and 984 deletions.
3 changes: 2 additions & 1 deletion abci/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ type Client interface {
OfferSnapshotSync(types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error)
LoadSnapshotChunkSync(types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error)
ApplySnapshotChunkSync(types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error)
PreprocessTxsSync(types.RequestPreprocessTxs) (*types.ResponsePreprocessTxs, error)
PrepareProposalSync(types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error)
ProcessProposalSync(types.RequestProcessProposal) (*types.ResponseProcessProposal, error)
}

//----------------------------------------
Expand Down
61 changes: 50 additions & 11 deletions abci/client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import (
"sync"
"time"

"golang.org/x/net/context"
"context"

"google.golang.org/grpc"

"github.com/tendermint/tendermint/abci/types"
Expand Down Expand Up @@ -302,13 +303,43 @@ func (cli *grpcClient) ApplySnapshotChunkAsync(params types.RequestApplySnapshot
return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_ApplySnapshotChunk{ApplySnapshotChunk: res}})
}

func (cli *grpcClient) PreprocessTxsAsync(params types.RequestPreprocessTxs) *ReqRes {
req := types.ToRequestPreprocessTxs(params)
res, err := cli.client.PreprocessTxs(context.Background(), req.GetPreprocessTxs(), grpc.WaitForReady(true))
func (cli *grpcClient) PrepareProposalAsync(
params types.RequestPrepareProposal,
) *ReqRes {

req := types.ToRequestPrepareProposal(params)
res, err := cli.client.PrepareProposal(context.Background(), req.GetPrepareProposal(), grpc.WaitForReady(true))
if err != nil {
cli.StopForError(err)
return nil
}
return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_PreprocessTxs{PreprocessTxs: res}})
return cli.finishAsyncCall(
req,
&types.Response{
Value: &types.Response_PrepareProposal{
PrepareProposal: res,
},
},
)
}

func (cli *grpcClient) ProcessProposalAsync(
params types.RequestProcessProposal,
) *ReqRes {

req := types.ToRequestProcessProposal(params)
res, err := cli.client.ProcessProposal(context.Background(), req.GetProcessProposal(), grpc.WaitForReady(true))
if err != nil {
return nil
}

return cli.finishAsyncCall(
req,
&types.Response{
Value: &types.Response_ProcessProposal{
ProcessProposal: res,
},
},
)
}

// finishAsyncCall creates a ReqRes for an async call, and immediately populates it
Expand Down Expand Up @@ -427,9 +458,17 @@ func (cli *grpcClient) ApplySnapshotChunkSync(
return cli.finishSyncCall(reqres).GetApplySnapshotChunk(), cli.Error()
}

func (cli *grpcClient) PreprocessTxsSync(
params types.RequestPreprocessTxs,
) (*types.ResponsePreprocessTxs, error) {
reqres := cli.PreprocessTxsAsync(params)
return reqres.Response.GetPreprocessTxs(), cli.Error()
func (cli *grpcClient) PrepareProposalSync(
params types.RequestPrepareProposal,
) (*types.ResponsePrepareProposal, error) {

reqres := cli.PrepareProposalAsync(params)
return cli.finishSyncCall(reqres).GetPrepareProposal(), cli.Error()
}

func (cli *grpcClient) ProcessProposalSync(
params types.RequestProcessProposal,
) (*types.ResponseProcessProposal, error) {
reqres := cli.ProcessProposalAsync(params)
return cli.finishSyncCall(reqres).GetProcessProposal(), cli.Error()
}
44 changes: 35 additions & 9 deletions abci/client/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,15 +207,30 @@ func (app *localClient) ApplySnapshotChunkAsync(req types.RequestApplySnapshotCh
)
}

func (app *localClient) PreprocessTxsAsync(req types.RequestPreprocessTxs) (*ReqRes, error) {
func (app *localClient) PrepareProposalAsync(
req types.RequestPrepareProposal,
) *ReqRes {
app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.PreprocessTxs(req)
res := app.Application.PrepareProposal(req)
return app.callback(
types.ToRequestPreprocessTxs(req),
types.ToResponsePreprocessTx(res),
), nil
types.ToRequestPrepareProposal(req),
types.ToResponsePrepareProposal(res),
)
}

func (app *localClient) ProcessProposalAsync(
req types.RequestProcessProposal,
) *ReqRes {
app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.ProcessProposal(req)
return app.callback(
types.ToRequestProcessProposal(req),
types.ToResponseProcessProposal(res),
)
}

//-------------------------------------------------------
Expand Down Expand Up @@ -334,13 +349,24 @@ func (app *localClient) ApplySnapshotChunkSync(
return &res, nil
}

func (app *localClient) PreprocessTxsSync(
req types.RequestPreprocessTxs,
) (*types.ResponsePreprocessTxs, error) {
func (app *localClient) PrepareProposalSync(
req types.RequestPrepareProposal,
) (*types.ResponsePrepareProposal, error) {

app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.PrepareProposal(req)
return &res, nil
}

func (app *localClient) ProcessProposalSync(
req types.RequestProcessProposal,
) (*types.ResponseProcessProposal, error) {
app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.PreprocessTxs(req)
res := app.Application.ProcessProposal(req)
return &res, nil
}

Expand Down
35 changes: 29 additions & 6 deletions abci/client/mocks/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 26 additions & 14 deletions abci/client/socket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/tendermint/tendermint/libs/service"
tmsync "github.com/tendermint/tendermint/libs/sync"
"github.com/tendermint/tendermint/libs/timer"
"golang.org/x/net/context"
)

const (
Expand Down Expand Up @@ -280,11 +279,17 @@ func (cli *socketClient) ApplySnapshotChunkAsync(req types.RequestApplySnapshotC
return cli.queueRequest(types.ToRequestApplySnapshotChunk(req))
}

func (cli *socketClient) PreprocessTxsAsync(ctx context.Context, req types.RequestPreprocessTxs) *ReqRes {
return cli.queueRequest(types.ToRequestPreprocessTxs(req))
func (cli *socketClient) PrepareProposalAsync(
req types.RequestPrepareProposal,
) *ReqRes {
return cli.queueRequest(types.ToRequestPrepareProposal(req))
}

//----------------------------------------
func (cli *socketClient) ProcessProposalAsync(
req types.RequestProcessProposal,
) *ReqRes {
return cli.queueRequest(types.ToRequestProcessProposal(req))
}

func (cli *socketClient) FlushSync() error {
reqRes := cli.queueRequest(types.ToRequestFlush())
Expand Down Expand Up @@ -422,14 +427,21 @@ func (cli *socketClient) ApplySnapshotChunkSync(
return reqres.Response.GetApplySnapshotChunk(), cli.Error()
}

func (cli *socketClient) PreprocessTxsSync(
req types.RequestPreprocessTxs,
) (*types.ResponsePreprocessTxs, error) {
reqres := cli.queueRequest(types.ToRequestPreprocessTxs(req))
if err := cli.FlushSync(); err != nil {
return nil, err
}
return reqres.Response.GetPreprocessTxs(), nil
func (cli *socketClient) PrepareProposalSync(
req types.RequestPrepareProposal,
) (*types.ResponsePrepareProposal, error) {

reqres := cli.queueRequest(types.ToRequestPrepareProposal(req))
return reqres.Response.GetPrepareProposal(), nil
}

func (cli *socketClient) ProcessProposalSync(
req types.RequestProcessProposal,
) (*types.ResponseProcessProposal, error) {

reqres := cli.queueRequest(types.ToRequestProcessProposal(req))

return reqres.Response.GetProcessProposal(), nil
}

//----------------------------------------
Expand Down Expand Up @@ -507,8 +519,8 @@ func resMatchesReq(req *types.Request, res *types.Response) (ok bool) {
_, ok = res.Value.(*types.Response_ListSnapshots)
case *types.Request_OfferSnapshot:
_, ok = res.Value.(*types.Response_OfferSnapshot)
case *types.Request_PreprocessTxs:
_, ok = res.Value.(*types.Response_PreprocessTxs)
case *types.Request_PrepareProposal:
_, ok = res.Value.(*types.Response_PrepareProposal)
}
return ok
}
Expand Down
16 changes: 13 additions & 3 deletions abci/example/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,17 @@ func (app *Application) Query(reqQuery types.RequestQuery) (resQuery types.Respo
return resQuery
}

func (app *Application) PreprocessTxs(
req types.RequestPreprocessTxs) types.ResponsePreprocessTxs {
return types.ResponsePreprocessTxs{Txs: req.Txs}
func (app *Application) PrepareProposal(req types.RequestPrepareProposal) types.ResponsePrepareProposal {
return types.ResponsePrepareProposal{
BlockData: req.BlockData,
}
}

func (app *Application) ProcessProposal(req types.RequestProcessProposal) types.ResponseProcessProposal {
for _, tx := range req.BlockData.Txs {
if len(tx) == 0 {
return types.ResponseProcessProposal{Result: types.ResponseProcessProposal_REJECT}
}
}
return types.ResponseProcessProposal{Result: types.ResponseProcessProposal_ACCEPT}
}
16 changes: 13 additions & 3 deletions abci/example/kvstore/persistent_kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,19 @@ func (app *PersistentKVStoreApplication) ApplySnapshotChunk(
return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT}
}

func (app *PersistentKVStoreApplication) PreprocessTxs(
req types.RequestPreprocessTxs) types.ResponsePreprocessTxs {
return types.ResponsePreprocessTxs{Txs: req.Txs}
func (app *PersistentKVStoreApplication) PrepareProposal(
req types.RequestPrepareProposal) types.ResponsePrepareProposal {
return types.ResponsePrepareProposal{BlockData: req.BlockData}
}

func (app *PersistentKVStoreApplication) ProcessProposal(
req types.RequestProcessProposal) types.ResponseProcessProposal {
for _, tx := range req.BlockData.Txs {
if len(tx) == 0 {
return types.ResponseProcessProposal{Result: types.ResponseProcessProposal_REJECT}
}
}
return types.ResponseProcessProposal{Result: types.ResponseProcessProposal_ACCEPT}
}

//---------------------------------------------
Expand Down
6 changes: 3 additions & 3 deletions abci/server/socket_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,15 +230,15 @@ func (s *SocketServer) handleRequest(req *types.Request, responses chan<- *types
case *types.Request_OfferSnapshot:
res := s.app.OfferSnapshot(*r.OfferSnapshot)
responses <- types.ToResponseOfferSnapshot(res)
case *types.Request_PrepareProposal:
res := s.app.PrepareProposal(*r.PrepareProposal)
responses <- types.ToResponsePrepareProposal(res)
case *types.Request_LoadSnapshotChunk:
res := s.app.LoadSnapshotChunk(*r.LoadSnapshotChunk)
responses <- types.ToResponseLoadSnapshotChunk(res)
case *types.Request_ApplySnapshotChunk:
res := s.app.ApplySnapshotChunk(*r.ApplySnapshotChunk)
responses <- types.ToResponseApplySnapshotChunk(res)
case *types.Request_PreprocessTxs:
res := s.app.PreprocessTxs(*r.PreprocessTxs)
responses <- types.ToResponsePreprocessTx(res)
default:
responses <- types.ToResponseException("Unknown request")
}
Expand Down
Loading

0 comments on commit 7356306

Please sign in to comment.