Skip to content

Commit

Permalink
feat: parallel checktx (#3)
Browse files Browse the repository at this point in the history
* feat: concurrent checktx

* feat: concurrent rechecktx

* feat: checkTxAsyncReactor

* fix: Use Begin/EndRecheckTxSync for app_conn

* chore: revise abci.Client, Async() interfaces

* test: Fix TestCacheAfterUpdate

* test: Fix TestClientServer

* test: Fix tests related with Application

* test: Fix TestHangingAsyncCalls

* test: Fix clist_mempool_test.go

* fix: Add handler for BeginRecheckTx/EndRecheckTx

* test: Fix TestExtendVoteCalledWhenEnabled

* test: Fix state_test.go

* test: Fix TestCallbackInvokedWhenSetEarly

* chore: Fix lint

* chore: Fix lint

* refactor: Use *sync.WaitGroup directly

* chore: Use CheckTxSync from mempool.reactor.Receive()

* chore: Remove original rechecktx logics already commented out

* chore: Sync GetGlobalCallback call with other client types

* refactor: Sync with original cometbft about callback

* fix: Use own mutex for ResponseCallback

* chore: Comment Begin/EndRecheckTx lock

* chore: Revert ostracon#226

* test: Un-comment socket_client_test.gochore: Sync some codes with original cometbft

* chore: Sync some codes with cometbft original

* refactor: Sync with original cometbft

* test: Fix kvstore test

* chore: Remove useless codes

* refactor: Use ResponseCheckTx pointer directly

* refactor: Use original cometbft codes for rpc/BroadcastTxSync

* refactor: Use original cometbft codes for rpc/BroadcastTxCommit

* refactor: Sync with original cometbft for mempool/CheckTxSync

* test: Fix test

* refactor: Sync with original cometbft

* refactor: Simplify CheckTxAsync

* refactor: Sync with original cometbft for reqResCb

* chore: Fix comments
  • Loading branch information
dudong2 authored Jan 14, 2025
1 parent fdf90d1 commit 2209f5c
Show file tree
Hide file tree
Showing 41 changed files with 2,417 additions and 555 deletions.
14 changes: 14 additions & 0 deletions abci/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,14 @@ type Client interface {
// for the v0 mempool. We should explore refactoring the
// mempool to remove this vestige behavior.
SetResponseCallback(Callback)

CheckTxSync(context.Context, *types.RequestCheckTx) (*types.ResponseCheckTx, error)
BeginRecheckTxSync(context.Context, *types.RequestBeginRecheckTx) (*types.ResponseBeginRecheckTx, error) // Signals the beginning of rechecking
EndRecheckTxSync(context.Context, *types.RequestEndRecheckTx) (*types.ResponseEndRecheckTx, error) // Signals the end of rechecking

CheckTxAsync(context.Context, *types.RequestCheckTx) (*ReqRes, error)
BeginRecheckTxAsync(context.Context, *types.RequestBeginRecheckTx) (*ReqRes, error)
EndRecheckTxAsync(context.Context, *types.RequestEndRecheckTx) (*ReqRes, error)
}

//----------------------------------------
Expand Down Expand Up @@ -114,6 +121,13 @@ func (r *ReqRes) InvokeCallback() {
r.callbackInvoked = true
}

// SetDone marks the ReqRes object as done.
func (r *ReqRes) SetInvoked() {
r.mtx.Lock()
r.callbackInvoked = true
r.mtx.Unlock()
}

// GetCallback returns the configured callback of the ReqRes object which may be
// nil. Note, it is not safe to concurrently call this in cases where it is
// marked done and SetCallback is called before calling GetCallback as that
Expand Down
73 changes: 60 additions & 13 deletions abci/client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (cli *grpcClient) OnStart() error {
cli.mtx.Lock()
defer cli.mtx.Unlock()

reqres.SetInvoked()
reqres.Done()

// Notify client listener if set
Expand Down Expand Up @@ -161,15 +162,6 @@ func (cli *grpcClient) SetResponseCallback(resCb Callback) {

//----------------------------------------

func (cli *grpcClient) CheckTxAsync(ctx context.Context, req *types.RequestCheckTx) (*ReqRes, error) {
res, err := cli.client.CheckTx(ctx, req, grpc.WaitForReady(true))
if err != nil {
cli.StopForError(err)
return nil, err
}
return cli.finishAsyncCall(types.ToRequestCheckTx(req), &types.Response{Value: &types.Response_CheckTx{CheckTx: res}}), nil
}

// finishAsyncCall creates a ReqRes for an async call, and immediately populates it
// with the response. We don't complete it until it's been ordered via the channel.
func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) *ReqRes {
Expand All @@ -194,10 +186,6 @@ func (cli *grpcClient) Info(ctx context.Context, req *types.RequestInfo) (*types
return cli.client.Info(ctx, req, grpc.WaitForReady(true))
}

func (cli *grpcClient) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
return cli.client.CheckTx(ctx, req, grpc.WaitForReady(true))
}

func (cli *grpcClient) Query(ctx context.Context, req *types.RequestQuery) (*types.ResponseQuery, error) {
return cli.client.Query(ctx, types.ToRequestQuery(req).GetQuery(), grpc.WaitForReady(true))
}
Expand Down Expand Up @@ -245,3 +233,62 @@ func (cli *grpcClient) VerifyVoteExtension(ctx context.Context, req *types.Reque
func (cli *grpcClient) FinalizeBlock(ctx context.Context, req *types.RequestFinalizeBlock) (*types.ResponseFinalizeBlock, error) {
return cli.client.FinalizeBlock(ctx, types.ToRequestFinalizeBlock(req).GetFinalizeBlock(), grpc.WaitForReady(true))
}

func (cli *grpcClient) CheckTxSync(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
return cli.client.CheckTx(ctx, req, grpc.WaitForReady(true))
}

func (cli *grpcClient) BeginRecheckTxSync(ctx context.Context, params *types.RequestBeginRecheckTx) (*types.ResponseBeginRecheckTx, error) {
reqres, _ := cli.BeginRecheckTxAsync(ctx, params)
reqres.Wait()
return reqres.Response.GetBeginRecheckTx(), cli.Error()
}

func (cli *grpcClient) EndRecheckTxSync(ctx context.Context, params *types.RequestEndRecheckTx) (*types.ResponseEndRecheckTx, error) {
reqres, _ := cli.EndRecheckTxAsync(ctx, params)
reqres.Wait()
return reqres.Response.GetEndRecheckTx(), cli.Error()
}

func (cli *grpcClient) CheckTxAsync(ctx context.Context, req *types.RequestCheckTx) (*ReqRes, error) {
res, err := cli.client.CheckTx(ctx, req, grpc.WaitForReady(true))
if err != nil {
cli.StopForError(err)
return nil, err
}
return cli.finishAsyncCall(types.ToRequestCheckTx(req), &types.Response{Value: &types.Response_CheckTx{CheckTx: res}}), nil
}

func (cli *grpcClient) BeginRecheckTxAsync(ctx context.Context, params *types.RequestBeginRecheckTx) (*ReqRes, error) {
req := types.ToRequestBeginRecheckTx(params)
res, err := cli.client.BeginRecheckTx(ctx, req.GetBeginRecheckTx(), grpc.WaitForReady(true))
if err != nil {
cli.StopForError(err)
}
return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_BeginRecheckTx{BeginRecheckTx: res}}), nil
}

func (cli *grpcClient) EndRecheckTxAsync(ctx context.Context, params *types.RequestEndRecheckTx) (*ReqRes, error) {
req := types.ToRequestEndRecheckTx(params)
res, err := cli.client.EndRecheckTx(ctx, req.GetEndRecheckTx(), grpc.WaitForReady(true))
if err != nil {
cli.StopForError(err)
}
return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_EndRecheckTx{EndRecheckTx: res}}), nil
}

func (cli *grpcClient) CheckTxSyncForApp(context.Context, *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
panic("not implemented")
}

func (cli *grpcClient) CheckTxAsyncForApp(context.Context, *types.RequestCheckTx, types.CheckTxCallback) {
panic("not implemented")
}

func (cli *grpcClient) BeginRecheckTx(ctx context.Context, params *types.RequestBeginRecheckTx) (*types.ResponseBeginRecheckTx, error) {
panic("not implemented")
}

func (cli *grpcClient) EndRecheckTx(ctx context.Context, params *types.RequestEndRecheckTx) (*types.ResponseEndRecheckTx, error) {
panic("not implemented")
}
101 changes: 80 additions & 21 deletions abci/client/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,6 @@ func (app *localClient) SetResponseCallback(cb Callback) {
app.mtx.Unlock()
}

func (app *localClient) CheckTxAsync(ctx context.Context, req *types.RequestCheckTx) (*ReqRes, error) {
app.mtx.Lock()
defer app.mtx.Unlock()

res, err := app.Application.CheckTx(ctx, req)
if err != nil {
return nil, err
}
return app.callback(
types.ToRequestCheckTx(req),
types.ToResponseCheckTx(res),
), nil
}

func (app *localClient) callback(req *types.Request, res *types.Response) *ReqRes {
app.Callback(req, res)
rr := newLocalReqRes(req, res)
Expand Down Expand Up @@ -92,13 +78,6 @@ func (app *localClient) Info(ctx context.Context, req *types.RequestInfo) (*type
return app.Application.Info(ctx, req)
}

func (app *localClient) CheckTx(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
app.mtx.Lock()
defer app.mtx.Unlock()

return app.Application.CheckTx(ctx, req)
}

func (app *localClient) Query(ctx context.Context, req *types.RequestQuery) (*types.ResponseQuery, error) {
app.mtx.Lock()
defer app.mtx.Unlock()
Expand Down Expand Up @@ -184,3 +163,83 @@ func (app *localClient) FinalizeBlock(ctx context.Context, req *types.RequestFin

return app.Application.FinalizeBlock(ctx, req)
}

func (app *localClient) CheckTxSync(ctx context.Context, req *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
// CONTRACT: Application should handle concurrent `CheckTx`
// In this abci client layer, we don't protect `CheckTx` with a mutex for concurrency
// app.mtx.Lock()
// defer app.mtx.Unlock()
return app.Application.CheckTxSyncForApp(ctx, req)
}

func (app *localClient) BeginRecheckTxSync(ctx context.Context, req *types.RequestBeginRecheckTx) (*types.ResponseBeginRecheckTx, error) {
// NOTE: commented out for performance. delete all after commenting out all `app.mtx`
// app.mtx.Lock()
// defer app.mtx.Unlock()

return app.Application.BeginRecheckTx(ctx, req)
}

func (app *localClient) EndRecheckTxSync(ctx context.Context, req *types.RequestEndRecheckTx) (*types.ResponseEndRecheckTx, error) {
// NOTE: commented out for performance. delete all after commenting out all `app.mtx`
// app.mtx.Lock()
// defer app.mtx.Unlock()

return app.Application.EndRecheckTx(ctx, req)
}

func (app *localClient) CheckTxAsync(ctx context.Context, reqCheckTx *types.RequestCheckTx) (*ReqRes, error) {
req := types.ToRequestCheckTx(reqCheckTx)
reqRes := NewReqRes(req)

app.Application.CheckTxAsyncForApp(ctx, reqCheckTx, func(resCheckTx *types.ResponseCheckTx) {
res := types.ToResponseCheckTx(resCheckTx)
app.Callback(req, res)
reqRes.Response = res
reqRes.SetInvoked()
reqRes.Done()

// Notify reqRes listener if set
reqRes.InvokeCallback()
})

return reqRes, nil
}

func (app *localClient) BeginRecheckTxAsync(ctx context.Context, req *types.RequestBeginRecheckTx) (*ReqRes, error) {
res, err := app.Application.BeginRecheckTx(ctx, req)
if err != nil {
return nil, err
}
return app.callback(
types.ToRequestBeginRecheckTx(req),
types.ToResponseBeginRecheckTx(res),
), nil
}

func (app *localClient) EndRecheckTxAsync(ctx context.Context, req *types.RequestEndRecheckTx) (*ReqRes, error) {
res, err := app.Application.EndRecheckTx(ctx, req)
if err != nil {
return nil, err
}
return app.callback(
types.ToRequestEndRecheckTx(req),
types.ToResponseEndRecheckTx(res),
), nil
}

func (app *localClient) CheckTxSyncForApp(context.Context, *types.RequestCheckTx) (*types.ResponseCheckTx, error) {
panic("not implemented")
}

func (app *localClient) CheckTxAsyncForApp(context.Context, *types.RequestCheckTx, types.CheckTxCallback) {
panic("not implemented")
}

func (app *localClient) BeginRecheckTx(ctx context.Context, params *types.RequestBeginRecheckTx) (*types.ResponseBeginRecheckTx, error) {
panic("not implemented")
}

func (app *localClient) EndRecheckTx(ctx context.Context, params *types.RequestEndRecheckTx) (*types.ResponseEndRecheckTx, error) {
panic("not implemented")
}
Loading

0 comments on commit 2209f5c

Please sign in to comment.