Skip to content

Commit

Permalink
Merge pull request #549 from celestiaorg/evan/celestia-0.34.4-fork
Browse files Browse the repository at this point in the history
Celestia but based on tendermint v0.34.12
  • Loading branch information
evan-forbes authored Sep 28, 2021
2 parents bb978f5 + 1c9aed3 commit 8fd91e6
Show file tree
Hide file tree
Showing 62 changed files with 7,570 additions and 3,311 deletions.
8 changes: 8 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ install_abci:
@go install -mod=readonly ./abci/cmd/...
.PHONY: install_abci

###############################################################################
### Mocks ###
###############################################################################

mockery:
go generate -run="./scripts/mockery_generate.sh" ./...
.PHONY: mockery

###############################################################################
### Distribution ###
###############################################################################
Expand Down
3 changes: 3 additions & 0 deletions abci/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ const (
echoRetryIntervalSeconds = 1
)

//go:generate ../../scripts/mockery_generate.sh Client

// Client defines an interface for an ABCI client.
// All `Async` methods return a `ReqRes` object.
// All `Sync` methods return the appropriate protobuf ResponseXxx struct and an error.
Expand Down Expand Up @@ -56,6 +58,7 @@ type Client interface {
OfferSnapshotSync(types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error)
LoadSnapshotChunkSync(types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error)
ApplySnapshotChunkSync(types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error)
PreprocessTxsSync(types.RequestPreprocessTxs) (*types.ResponsePreprocessTxs, error)
}

//----------------------------------------
Expand Down
16 changes: 16 additions & 0 deletions abci/client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,15 @@ func (cli *grpcClient) ApplySnapshotChunkAsync(params types.RequestApplySnapshot
return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_ApplySnapshotChunk{ApplySnapshotChunk: res}})
}

func (cli *grpcClient) PreprocessTxsAsync(params types.RequestPreprocessTxs) *ReqRes {
req := types.ToRequestPreprocessTxs(params)
res, err := cli.client.PreprocessTxs(context.Background(), req.GetPreprocessTxs(), grpc.WaitForReady(true))
if err != nil {
cli.StopForError(err)
}
return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_PreprocessTxs{PreprocessTxs: res}})
}

// finishAsyncCall creates a ReqRes for an async call, and immediately populates it
// with the response. We don't complete it until it's been ordered via the channel.
func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) *ReqRes {
Expand Down Expand Up @@ -417,3 +426,10 @@ func (cli *grpcClient) ApplySnapshotChunkSync(
reqres := cli.ApplySnapshotChunkAsync(params)
return cli.finishSyncCall(reqres).GetApplySnapshotChunk(), cli.Error()
}

func (cli *grpcClient) PreprocessTxsSync(
params types.RequestPreprocessTxs,
) (*types.ResponsePreprocessTxs, error) {
reqres := cli.PreprocessTxsAsync(params)
return reqres.Response.GetPreprocessTxs(), cli.Error()
}
21 changes: 21 additions & 0 deletions abci/client/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,17 @@ func (app *localClient) ApplySnapshotChunkAsync(req types.RequestApplySnapshotCh
)
}

func (app *localClient) PreprocessTxsAsync(req types.RequestPreprocessTxs) (*ReqRes, error) {
app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.PreprocessTxs(req)
return app.callback(
types.ToRequestPreprocessTxs(req),
types.ToResponsePreprocessTx(res),
), nil
}

//-------------------------------------------------------

func (app *localClient) FlushSync() error {
Expand Down Expand Up @@ -317,6 +328,16 @@ func (app *localClient) ApplySnapshotChunkSync(
return &res, nil
}

func (app *localClient) PreprocessTxsSync(
req types.RequestPreprocessTxs,
) (*types.ResponsePreprocessTxs, error) {
app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.PreprocessTxs(req)
return &res, nil
}

//-------------------------------------------------------

func (app *localClient) callback(req *types.Request, res *types.Response) *ReqRes {
Expand Down
25 changes: 24 additions & 1 deletion abci/client/mocks/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions abci/client/socket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/tendermint/tendermint/libs/service"
tmsync "github.com/tendermint/tendermint/libs/sync"
"github.com/tendermint/tendermint/libs/timer"
"golang.org/x/net/context"
)

const (
Expand Down Expand Up @@ -279,6 +280,10 @@ func (cli *socketClient) ApplySnapshotChunkAsync(req types.RequestApplySnapshotC
return cli.queueRequest(types.ToRequestApplySnapshotChunk(req))
}

func (cli *socketClient) PreprocessTxsAsync(ctx context.Context, req types.RequestPreprocessTxs) *ReqRes {
return cli.queueRequest(types.ToRequestPreprocessTxs(req))
}

//----------------------------------------

func (cli *socketClient) FlushSync() error {
Expand Down Expand Up @@ -417,6 +422,16 @@ func (cli *socketClient) ApplySnapshotChunkSync(
return reqres.Response.GetApplySnapshotChunk(), cli.Error()
}

func (cli *socketClient) PreprocessTxsSync(
req types.RequestPreprocessTxs,
) (*types.ResponsePreprocessTxs, error) {
reqres := cli.queueRequest(types.ToRequestPreprocessTxs(req))
if err := cli.FlushSync(); err != nil {
return nil, err
}
return reqres.Response.GetPreprocessTxs(), nil
}

//----------------------------------------

func (cli *socketClient) queueRequest(req *types.Request) *ReqRes {
Expand Down Expand Up @@ -492,6 +507,8 @@ func resMatchesReq(req *types.Request, res *types.Response) (ok bool) {
_, ok = res.Value.(*types.Response_ListSnapshots)
case *types.Request_OfferSnapshot:
_, ok = res.Value.(*types.Response_OfferSnapshot)
case *types.Request_PreprocessTxs:
_, ok = res.Value.(*types.Response_PreprocessTxs)
}
return ok
}
Expand Down
5 changes: 5 additions & 0 deletions abci/example/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,8 @@ func (app *Application) Query(reqQuery types.RequestQuery) (resQuery types.Respo

return resQuery
}

func (app *Application) PreprocessTxs(
req types.RequestPreprocessTxs) types.ResponsePreprocessTxs {
return types.ResponsePreprocessTxs{Txs: req.Txs}
}
5 changes: 5 additions & 0 deletions abci/example/kvstore/persistent_kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ func (app *PersistentKVStoreApplication) ApplySnapshotChunk(
return types.ResponseApplySnapshotChunk{Result: types.ResponseApplySnapshotChunk_ABORT}
}

func (app *PersistentKVStoreApplication) PreprocessTxs(
req types.RequestPreprocessTxs) types.ResponsePreprocessTxs {
return types.ResponsePreprocessTxs{Txs: req.Txs}
}

//---------------------------------------------
// update validators

Expand Down
3 changes: 3 additions & 0 deletions abci/server/socket_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,9 @@ func (s *SocketServer) handleRequest(req *types.Request, responses chan<- *types
case *types.Request_ApplySnapshotChunk:
res := s.app.ApplySnapshotChunk(*r.ApplySnapshotChunk)
responses <- types.ToResponseApplySnapshotChunk(res)
case *types.Request_PreprocessTxs:
res := s.app.PreprocessTxs(*r.PreprocessTxs)
responses <- types.ToResponsePreprocessTx(res)
default:
responses <- types.ToResponseException("Unknown request")
}
Expand Down
22 changes: 17 additions & 5 deletions abci/types/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
// to be driven by a blockchain-based replication engine via the ABCI.
// All methods take a RequestXxx argument and return a ResponseXxx argument,
// except CheckTx/DeliverTx, which take `tx []byte`, and `Commit`, which takes nothing.
// nolint:lll // ignore for interface
type Application interface {
// Info/Query Connection
Info(RequestInfo) ResponseInfo // Return application info
Expand All @@ -18,11 +19,12 @@ type Application interface {
CheckTx(RequestCheckTx) ResponseCheckTx // Validate a tx for the mempool

// Consensus Connection
InitChain(RequestInitChain) ResponseInitChain // Initialize blockchain w validators/other info from TendermintCore
BeginBlock(RequestBeginBlock) ResponseBeginBlock // Signals the beginning of a block
DeliverTx(RequestDeliverTx) ResponseDeliverTx // Deliver a tx for full processing
EndBlock(RequestEndBlock) ResponseEndBlock // Signals the end of a block, returns changes to the validator set
Commit() ResponseCommit // Commit the state and return the application Merkle root hash
InitChain(RequestInitChain) ResponseInitChain // Initialize blockchain w validators/other info from TendermintCore
BeginBlock(RequestBeginBlock) ResponseBeginBlock // Signals the beginning of a block
DeliverTx(RequestDeliverTx) ResponseDeliverTx // Deliver a tx for full processing
EndBlock(RequestEndBlock) ResponseEndBlock // Signals the end of a block, returns changes to the validator set
Commit() ResponseCommit // Commit the state and return the application Merkle root hash
PreprocessTxs(RequestPreprocessTxs) ResponsePreprocessTxs // State machine preprocessing of txs

// State Sync Connection
ListSnapshots(RequestListSnapshots) ResponseListSnapshots // List available snapshots
Expand Down Expand Up @@ -95,6 +97,10 @@ func (BaseApplication) ApplySnapshotChunk(req RequestApplySnapshotChunk) Respons
return ResponseApplySnapshotChunk{}
}

func (BaseApplication) PreprocessTxs(req RequestPreprocessTxs) ResponsePreprocessTxs {
return ResponsePreprocessTxs{}
}

//-------------------------------------------------------

// GRPCApplication is a GRPC wrapper for Application
Expand Down Expand Up @@ -182,3 +188,9 @@ func (app *GRPCApplication) ApplySnapshotChunk(
res := app.app.ApplySnapshotChunk(*req)
return &res, nil
}

func (app *GRPCApplication) PreprocessTxs(
ctx context.Context, req *RequestPreprocessTxs) (*ResponsePreprocessTxs, error) {
res := app.app.PreprocessTxs(*req)
return &res, nil
}
12 changes: 12 additions & 0 deletions abci/types/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ func ToRequestApplySnapshotChunk(req RequestApplySnapshotChunk) *Request {
}
}

func ToRequestPreprocessTxs(res RequestPreprocessTxs) *Request {
return &Request{
Value: &Request_PreprocessTxs{&res},
}
}

//----------------------------------------

func ToResponseException(errStr string) *Response {
Expand Down Expand Up @@ -256,3 +262,9 @@ func ToResponseApplySnapshotChunk(res ResponseApplySnapshotChunk) *Response {
Value: &Response_ApplySnapshotChunk{&res},
}
}

func ToResponsePreprocessTx(res ResponsePreprocessTxs) *Response {
return &Response{
Value: &Response_PreprocessTxs{&res},
}
}
Loading

0 comments on commit 8fd91e6

Please sign in to comment.