Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Prepare and Process Proposal ABCI++ methods #631

Merged
merged 19 commits into from
Mar 30, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion abci/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ type Client interface {
OfferSnapshotSync(types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error)
LoadSnapshotChunkSync(types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error)
ApplySnapshotChunkSync(types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error)
PreprocessTxsSync(types.RequestPreprocessTxs) (*types.ResponsePreprocessTxs, error)
PrepareProposalSync(types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error)
ProcessProposalSync(types.RequestProcessProposal) (*types.ResponseProcessProposal, error)
}

//----------------------------------------
Expand Down
61 changes: 50 additions & 11 deletions abci/client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import (
"sync"
"time"

"golang.org/x/net/context"
"context"

"google.golang.org/grpc"

"github.com/tendermint/tendermint/abci/types"
Expand Down Expand Up @@ -302,13 +303,43 @@ func (cli *grpcClient) ApplySnapshotChunkAsync(params types.RequestApplySnapshot
return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_ApplySnapshotChunk{ApplySnapshotChunk: res}})
}

func (cli *grpcClient) PreprocessTxsAsync(params types.RequestPreprocessTxs) *ReqRes {
req := types.ToRequestPreprocessTxs(params)
res, err := cli.client.PreprocessTxs(context.Background(), req.GetPreprocessTxs(), grpc.WaitForReady(true))
func (cli *grpcClient) PrepareProposalAsync(
params types.RequestPrepareProposal,
) *ReqRes {

req := types.ToRequestPrepareProposal(params)
res, err := cli.client.PrepareProposal(context.Background(), req.GetPrepareProposal(), grpc.WaitForReady(true))
if err != nil {
cli.StopForError(err)
return nil
}
return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_PreprocessTxs{PreprocessTxs: res}})
return cli.finishAsyncCall(
req,
&types.Response{
Value: &types.Response_PrepareProposal{
PrepareProposal: res,
},
},
)
}

func (cli *grpcClient) ProcessProposalAsync(
params types.RequestProcessProposal,
) *ReqRes {

req := types.ToRequestProcessProposal(params)
res, err := cli.client.ProcessProposal(context.Background(), req.GetProcessProposal(), grpc.WaitForReady(true))
if err != nil {
return nil
}

return cli.finishAsyncCall(
req,
&types.Response{
Value: &types.Response_ProcessProposal{
ProcessProposal: res,
},
},
)
}

// finishAsyncCall creates a ReqRes for an async call, and immediately populates it
Expand Down Expand Up @@ -427,9 +458,17 @@ func (cli *grpcClient) ApplySnapshotChunkSync(
return cli.finishSyncCall(reqres).GetApplySnapshotChunk(), cli.Error()
}

func (cli *grpcClient) PreprocessTxsSync(
params types.RequestPreprocessTxs,
) (*types.ResponsePreprocessTxs, error) {
reqres := cli.PreprocessTxsAsync(params)
return reqres.Response.GetPreprocessTxs(), cli.Error()
func (cli *grpcClient) PrepareProposalSync(
params types.RequestPrepareProposal,
) (*types.ResponsePrepareProposal, error) {

reqres := cli.PrepareProposalAsync(params)
return cli.finishSyncCall(reqres).GetPrepareProposal(), cli.Error()
}

func (cli *grpcClient) ProcessProposalSync(
params types.RequestProcessProposal,
) (*types.ResponseProcessProposal, error) {
reqres := cli.ProcessProposalAsync(params)
return cli.finishSyncCall(reqres).GetProcessProposal(), cli.Error()
}
44 changes: 35 additions & 9 deletions abci/client/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,15 +207,30 @@ func (app *localClient) ApplySnapshotChunkAsync(req types.RequestApplySnapshotCh
)
}

func (app *localClient) PreprocessTxsAsync(req types.RequestPreprocessTxs) (*ReqRes, error) {
func (app *localClient) PrepareProposalAsync(
req types.RequestPrepareProposal,
) *ReqRes {
app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.PreprocessTxs(req)
res := app.Application.PrepareProposal(req)
return app.callback(
types.ToRequestPreprocessTxs(req),
types.ToResponsePreprocessTx(res),
), nil
types.ToRequestPrepareProposal(req),
types.ToResponsePrepareProposal(res),
)
}

func (app *localClient) ProcessProposalAsync(
req types.RequestProcessProposal,
) *ReqRes {
app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.ProcessProposal(req)
return app.callback(
types.ToRequestProcessProposal(req),
types.ToResponseProcessProposal(res),
)
}

//-------------------------------------------------------
Expand Down Expand Up @@ -334,13 +349,24 @@ func (app *localClient) ApplySnapshotChunkSync(
return &res, nil
}

func (app *localClient) PreprocessTxsSync(
req types.RequestPreprocessTxs,
) (*types.ResponsePreprocessTxs, error) {
func (app *localClient) PrepareProposalSync(
req types.RequestPrepareProposal,
) (*types.ResponsePrepareProposal, error) {

app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.PrepareProposal(req)
return &res, nil
}

func (app *localClient) ProcessProposalSync(
req types.RequestProcessProposal,
) (*types.ResponseProcessProposal, error) {
app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.PreprocessTxs(req)
res := app.Application.ProcessProposal(req)
return &res, nil
}

Expand Down
35 changes: 29 additions & 6 deletions abci/client/mocks/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 26 additions & 14 deletions abci/client/socket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/tendermint/tendermint/libs/service"
tmsync "github.com/tendermint/tendermint/libs/sync"
"github.com/tendermint/tendermint/libs/timer"
"golang.org/x/net/context"
)

const (
Expand Down Expand Up @@ -280,11 +279,17 @@ func (cli *socketClient) ApplySnapshotChunkAsync(req types.RequestApplySnapshotC
return cli.queueRequest(types.ToRequestApplySnapshotChunk(req))
}

func (cli *socketClient) PreprocessTxsAsync(ctx context.Context, req types.RequestPreprocessTxs) *ReqRes {
return cli.queueRequest(types.ToRequestPreprocessTxs(req))
func (cli *socketClient) PrepareProposalAsync(
req types.RequestPrepareProposal,
) *ReqRes {
return cli.queueRequest(types.ToRequestPrepareProposal(req))
}

//----------------------------------------
func (cli *socketClient) ProcessProposalAsync(
req types.RequestProcessProposal,
) *ReqRes {
return cli.queueRequest(types.ToRequestProcessProposal(req))
}

func (cli *socketClient) FlushSync() error {
reqRes := cli.queueRequest(types.ToRequestFlush())
Expand Down Expand Up @@ -422,14 +427,21 @@ func (cli *socketClient) ApplySnapshotChunkSync(
return reqres.Response.GetApplySnapshotChunk(), cli.Error()
}

func (cli *socketClient) PreprocessTxsSync(
req types.RequestPreprocessTxs,
) (*types.ResponsePreprocessTxs, error) {
reqres := cli.queueRequest(types.ToRequestPreprocessTxs(req))
if err := cli.FlushSync(); err != nil {
return nil, err
}
return reqres.Response.GetPreprocessTxs(), nil
func (cli *socketClient) PrepareProposalSync(
req types.RequestPrepareProposal,
) (*types.ResponsePrepareProposal, error) {

reqres := cli.queueRequest(types.ToRequestPrepareProposal(req))
return reqres.Response.GetPrepareProposal(), nil
}

func (cli *socketClient) ProcessProposalSync(
req types.RequestProcessProposal,
) (*types.ResponseProcessProposal, error) {

reqres := cli.queueRequest(types.ToRequestProcessProposal(req))

return reqres.Response.GetProcessProposal(), nil
}

//----------------------------------------
Expand Down Expand Up @@ -507,8 +519,8 @@ func resMatchesReq(req *types.Request, res *types.Response) (ok bool) {
_, ok = res.Value.(*types.Response_ListSnapshots)
case *types.Request_OfferSnapshot:
_, ok = res.Value.(*types.Response_OfferSnapshot)
case *types.Request_PreprocessTxs:
_, ok = res.Value.(*types.Response_PreprocessTxs)
case *types.Request_PrepareProposal:
_, ok = res.Value.(*types.Response_PrepareProposal)
}
return ok
}
Expand Down
16 changes: 13 additions & 3 deletions abci/example/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,17 @@ func (app *Application) Query(reqQuery types.RequestQuery) (resQuery types.Respo
return resQuery
}

func (app *Application) PreprocessTxs(
req types.RequestPreprocessTxs) types.ResponsePreprocessTxs {
return types.ResponsePreprocessTxs{Txs: req.Txs}
func (app *Application) PrepareProposal(req types.RequestPrepareProposal) types.ResponsePrepareProposal {
return types.ResponsePrepareProposal{
BlockData: req.BlockData,
}
}

func (app *Application) ProcessProposal(req types.RequestProcessProposal) types.ResponseProcessProposal {
for _, tx := range req.BlockData.Txs {
if len(tx) == 0 {
return types.ResponseProcessProposal{Result: types.ResponseProcessProposal_REJECT}
}
}
return types.ResponseProcessProposal{Result: types.ResponseProcessProposal_ACCEPT}
}
16 changes: 13 additions & 3 deletions abci/example/kvstore/persistent_kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,19 @@ func (app *PersistentKVStoreApplication) ApplySnapshotChunk(
return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT}
}

func (app *PersistentKVStoreApplication) PreprocessTxs(
req types.RequestPreprocessTxs) types.ResponsePreprocessTxs {
return types.ResponsePreprocessTxs{Txs: req.Txs}
func (app *PersistentKVStoreApplication) PrepareProposal(
req types.RequestPrepareProposal) types.ResponsePrepareProposal {
return types.ResponsePrepareProposal{BlockData: req.BlockData}
}

func (app *PersistentKVStoreApplication) ProcessProposal(
req types.RequestProcessProposal) types.ResponseProcessProposal {
for _, tx := range req.BlockData.Txs {
if len(tx) == 0 {
return types.ResponseProcessProposal{Result: types.ResponseProcessProposal_REJECT}
}
}
return types.ResponseProcessProposal{Result: types.ResponseProcessProposal_ACCEPT}
}

//---------------------------------------------
Expand Down
6 changes: 3 additions & 3 deletions abci/server/socket_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,15 +230,15 @@ func (s *SocketServer) handleRequest(req *types.Request, responses chan<- *types
case *types.Request_OfferSnapshot:
res := s.app.OfferSnapshot(*r.OfferSnapshot)
responses <- types.ToResponseOfferSnapshot(res)
case *types.Request_PrepareProposal:
res := s.app.PrepareProposal(*r.PrepareProposal)
responses <- types.ToResponsePrepareProposal(res)
case *types.Request_LoadSnapshotChunk:
res := s.app.LoadSnapshotChunk(*r.LoadSnapshotChunk)
responses <- types.ToResponseLoadSnapshotChunk(res)
case *types.Request_ApplySnapshotChunk:
res := s.app.ApplySnapshotChunk(*r.ApplySnapshotChunk)
responses <- types.ToResponseApplySnapshotChunk(res)
case *types.Request_PreprocessTxs:
res := s.app.PreprocessTxs(*r.PreprocessTxs)
responses <- types.ToResponsePreprocessTx(res)
default:
responses <- types.ToResponseException("Unknown request")
}
Expand Down
Loading