Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: patch data race #869

Merged
merged 8 commits into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion proto/tendermint/types/types.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions rpc/core/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func ABCIQuery(
height int64,
prove bool,
) (*ctypes.ResultABCIQuery, error) {
resQuery, err := env.ProxyAppQuery.QuerySync(abci.RequestQuery{
resQuery, err := GetEnvironment().ProxyAppQuery.QuerySync(abci.RequestQuery{
Path: path,
Data: data,
Height: height,
Expand All @@ -33,7 +33,7 @@ func ABCIQuery(
// ABCIInfo gets some info about the application.
// More: https://docs.tendermint.com/master/rpc/#/ABCI/abci_info
func ABCIInfo(ctx *rpctypes.Context) (*ctypes.ResultABCIInfo, error) {
resInfo, err := env.ProxyAppQuery.InfoSync(proxy.RequestInfo)
resInfo, err := GetEnvironment().ProxyAppQuery.InfoSync(proxy.RequestInfo)
if err != nil {
return nil, err
}
Expand Down
13 changes: 10 additions & 3 deletions rpc/core/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func BlockchainInfo(ctx *rpctypes.Context, minHeight, maxHeight int64) (*ctypes.
// maximum 20 block metas
const limit int64 = 20
var err error
env := GetEnvironment()
minHeight, maxHeight, err = filterMinMax(
env.BlockStore.Base(),
env.BlockStore.Height(),
Expand Down Expand Up @@ -82,13 +83,13 @@ func filterMinMax(base, height, min, max, limit int64) (int64, int64, error) {
// If no height is provided, it will fetch the latest block.
// More: https://docs.tendermint.com/master/rpc/#/Info/block
func Block(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlock, error) {
height, err := getHeight(env.BlockStore.Height(), heightPtr)
height, err := getHeight(GetEnvironment().BlockStore.Height(), heightPtr)
if err != nil {
return nil, err
}

block := env.BlockStore.LoadBlock(height)
blockMeta := env.BlockStore.LoadBlockMeta(height)
block := GetEnvironment().BlockStore.LoadBlock(height)
blockMeta := GetEnvironment().BlockStore.LoadBlockMeta(height)
if blockMeta == nil {
return &ctypes.ResultBlock{BlockID: types.BlockID{}, Block: block}, nil
}
Expand All @@ -98,6 +99,7 @@ func Block(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlock, error)
// BlockByHash gets block by hash.
// More: https://docs.tendermint.com/master/rpc/#/Info/block_by_hash
func BlockByHash(ctx *rpctypes.Context, hash []byte) (*ctypes.ResultBlock, error) {
env := GetEnvironment()
block := env.BlockStore.LoadBlockByHash(hash)
if block == nil {
return &ctypes.ResultBlock{BlockID: types.BlockID{}, Block: nil}, nil
Expand All @@ -111,6 +113,7 @@ func BlockByHash(ctx *rpctypes.Context, hash []byte) (*ctypes.ResultBlock, error
// If no height is provided, it will fetch the commit for the latest block.
// More: https://docs.tendermint.com/master/rpc/#/Info/commit
func Commit(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultCommit, error) {
env := GetEnvironment()
height, err := getHeight(env.BlockStore.Height(), heightPtr)
if err != nil {
return nil, err
Expand Down Expand Up @@ -161,6 +164,7 @@ func generateHeightsList(beginBlock uint64, endBlock uint64) []int64 {
// validateDataCommitmentRange runs basic checks on the asc sorted list of heights
// that will be used subsequently in generating data commitments over the defined set of heights.
func validateDataCommitmentRange(beginBlock uint64, endBlock uint64) error {
env := GetEnvironment()
heightsRange := endBlock - beginBlock + 1
if heightsRange > uint64(consts.DataCommitmentBlocksLimit) {
return fmt.Errorf("the query exceeds the limit of allowed blocks %d", consts.DataCommitmentBlocksLimit)
Expand Down Expand Up @@ -209,6 +213,7 @@ func hashDataRoots(blocks []*ctypes.ResultBlock) []byte {
// getBlock(h).Txs[5]
// More: https://docs.tendermint.com/master/rpc/#/Info/block_results
func BlockResults(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlockResults, error) {
env := GetEnvironment()
height, err := getHeight(env.BlockStore.Height(), heightPtr)
if err != nil {
return nil, err
Expand Down Expand Up @@ -268,6 +273,7 @@ func BlockSearch(

// heightsByQuery returns a list of heights corresponding to the provided query.
func heightsByQuery(ctx *rpctypes.Context, query string) ([]int64, error) {
env := GetEnvironment()
// skip if block indexing is disabled
if _, ok := env.BlockIndexer.(*blockidxnull.BlockerIndexer); ok {
return nil, errors.New("block indexing is disabled")
Expand Down Expand Up @@ -300,6 +306,7 @@ func sortBlocks(results []int64, orderBy string) error {

// fetchBlocks takes a list of block heights and fetches them.
func fetchBlocks(results []int64, pageSize int, skipCount int) []*ctypes.ResultBlock {
env := GetEnvironment()
apiResults := make([]*ctypes.ResultBlock, 0, pageSize)
for i := skipCount; i < skipCount+pageSize; i++ {
block := env.BlockStore.LoadBlock(results[i])
Expand Down
6 changes: 4 additions & 2 deletions rpc/core/blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,12 @@ func TestBlockResults(t *testing.T) {
BeginBlock: &abci.ResponseBeginBlock{},
}

env = &Environment{}
env := &Environment{}
env.StateStore = sm.NewStore(dbm.NewMemDB())
err := env.StateStore.SaveABCIResponses(100, results)
require.NoError(t, err)
env.BlockStore = mockBlockStore{height: 100}
SetEnvironment(env)

testCases := []struct {
height int64
Expand Down Expand Up @@ -121,7 +122,7 @@ func TestBlockResults(t *testing.T) {
}

func TestDataCommitmentResults(t *testing.T) {
env = &Environment{}
env := &Environment{}
height := int64(2826)

blocks := randomBlocks(height)
Expand Down Expand Up @@ -149,6 +150,7 @@ func TestDataCommitmentResults(t *testing.T) {
beginQueryBlock: tc.beginQuery,
endQueryBlock: tc.endQuery,
}
SetEnvironment(env)

actualCommitment, err := DataCommitment(&rpctypes.Context{}, uint64(tc.beginQuery), uint64(tc.endQuery))
if tc.expectPass {
Expand Down
10 changes: 5 additions & 5 deletions rpc/core/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func Validators(ctx *rpctypes.Context, heightPtr *int64, pagePtr, perPagePtr *in
return nil, err
}

validators, err := env.StateStore.LoadValidators(height)
validators, err := GetEnvironment().StateStore.LoadValidators(height)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -50,7 +50,7 @@ func Validators(ctx *rpctypes.Context, heightPtr *int64, pagePtr, perPagePtr *in
// More: https://docs.tendermint.com/master/rpc/#/Info/dump_consensus_state
func DumpConsensusState(ctx *rpctypes.Context) (*ctypes.ResultDumpConsensusState, error) {
// Get Peer consensus states.
peers := env.P2PPeers.Peers().List()
peers := GetEnvironment().P2PPeers.Peers().List()
peerStates := make([]ctypes.PeerStateInfo, len(peers))
for i, peer := range peers {
peerState, ok := peer.Get(types.PeerStateKey).(*cm.PeerState)
Expand All @@ -69,7 +69,7 @@ func DumpConsensusState(ctx *rpctypes.Context) (*ctypes.ResultDumpConsensusState
}
}
// Get self round state.
roundState, err := env.ConsensusState.GetRoundStateJSON()
roundState, err := GetEnvironment().ConsensusState.GetRoundStateJSON()
if err != nil {
return nil, err
}
Expand All @@ -83,7 +83,7 @@ func DumpConsensusState(ctx *rpctypes.Context) (*ctypes.ResultDumpConsensusState
// More: https://docs.tendermint.com/master/rpc/#/Info/consensus_state
func ConsensusState(ctx *rpctypes.Context) (*ctypes.ResultConsensusState, error) {
// Get self round state.
bz, err := env.ConsensusState.GetRoundStateSimpleJSON()
bz, err := GetEnvironment().ConsensusState.GetRoundStateSimpleJSON()
return &ctypes.ResultConsensusState{RoundState: bz}, err
}

Expand All @@ -98,7 +98,7 @@ func ConsensusParams(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultCon
return nil, err
}

consensusParams, err := env.StateStore.LoadConsensusParams(height)
consensusParams, err := GetEnvironment().StateStore.LoadConsensusParams(height)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion rpc/core/dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ import (

// UnsafeFlushMempool removes all transactions from the mempool.
func UnsafeFlushMempool(ctx *rpctypes.Context) (*ctypes.ResultUnsafeFlushMempool, error) {
env.Mempool.Flush()
GetEnvironment().Mempool.Flush()
return &ctypes.ResultUnsafeFlushMempool{}, nil
}
34 changes: 24 additions & 10 deletions rpc/core/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package core
import (
"encoding/base64"
"fmt"
"sync"
"time"

cfg "github.com/tendermint/tendermint/config"
Expand Down Expand Up @@ -35,13 +36,23 @@ const (

var (
// set by Node
env *Environment
mut = &sync.Mutex{}
rootulp marked this conversation as resolved.
Show resolved Hide resolved
globalEnv *Environment
)

// SetEnvironment sets up the given Environment.
// It will race if multiple Node call SetEnvironment.
// SetEnvironment sets the global environment to e. The globalEnv var that this
// function modifies is protected by a sync.Once so multiple calls within the
evan-forbes marked this conversation as resolved.
Show resolved Hide resolved
// same process will not be effective.
func SetEnvironment(e *Environment) {
env = e
mut.Lock()
rach-id marked this conversation as resolved.
Show resolved Hide resolved
defer mut.Unlock()
globalEnv = e
}

func GetEnvironment() *Environment {
mut.Lock()
defer mut.Unlock()
return globalEnv
}

//----------------------------------------------
Expand Down Expand Up @@ -69,7 +80,7 @@ type peers interface {
Peers() p2p.IPeerSet
}

//----------------------------------------------
// ----------------------------------------------
// Environment contains objects and interfaces used by the RPC. It is expected
// to be setup once during startup.
type Environment struct {
Expand Down Expand Up @@ -142,15 +153,15 @@ func validatePerPage(perPagePtr *int) int {
// InitGenesisChunks configures the environment and should be called on service
// startup.
func InitGenesisChunks() error {
if env.genChunks != nil {
if GetEnvironment().genChunks != nil {
return nil
}

if env.GenDoc == nil {
if GetEnvironment().GenDoc == nil {
return nil
}

data, err := tmjson.Marshal(env.GenDoc)
data, err := tmjson.Marshal(GetEnvironment().GenDoc)
if err != nil {
return err
}
Expand All @@ -162,7 +173,9 @@ func InitGenesisChunks() error {
end = len(data)
}

env.genChunks = append(env.genChunks, base64.StdEncoding.EncodeToString(data[i:end]))
mut.Lock()
defer mut.Unlock()
globalEnv.genChunks = append(globalEnv.genChunks, base64.StdEncoding.EncodeToString(data[i:end]))
Comment on lines -165 to +178
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh no

}

return nil
Expand All @@ -188,7 +201,7 @@ func getHeight(latestHeight int64, heightPtr *int64) (int64, error) {
return 0, fmt.Errorf("height %d must be less than or equal to the current blockchain height %d",
height, latestHeight)
}
base := env.BlockStore.Base()
base := GetEnvironment().BlockStore.Base()
if height < base {
return 0, fmt.Errorf("height %d is not available, lowest height is %d",
height, base)
Expand All @@ -199,6 +212,7 @@ func getHeight(latestHeight int64, heightPtr *int64) (int64, error) {
}

func latestUncommittedHeight() int64 {
env := GetEnvironment()
nodeIsSyncing := env.ConsensusReactor.WaitSync()
if nodeIsSyncing {
return env.BlockStore.Height()
Expand Down
3 changes: 3 additions & 0 deletions rpc/core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
// More: https://docs.tendermint.com/master/rpc/#/Websocket/subscribe
func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, error) {
addr := ctx.RemoteAddr()
env := GetEnvironment()

if env.EventBus.NumClients() >= env.Config.MaxSubscriptionClients {
return nil, fmt.Errorf("max_subscription_clients %d reached", env.Config.MaxSubscriptionClients)
Expand Down Expand Up @@ -108,6 +109,7 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er
// More: https://docs.tendermint.com/master/rpc/#/Websocket/unsubscribe
func Unsubscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultUnsubscribe, error) {
addr := ctx.RemoteAddr()
env := GetEnvironment()
env.Logger.Info("Unsubscribe from query", "remote", addr, "query", query)
q, err := tmquery.New(query)
if err != nil {
Expand All @@ -124,6 +126,7 @@ func Unsubscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultUnsubscribe
// More: https://docs.tendermint.com/master/rpc/#/Websocket/unsubscribe_all
func UnsubscribeAll(ctx *rpctypes.Context) (*ctypes.ResultUnsubscribe, error) {
addr := ctx.RemoteAddr()
env := GetEnvironment()
env.Logger.Info("Unsubscribe from all", "remote", addr)
err := env.EventBus.UnsubscribeAll(context.Background(), addr)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion rpc/core/evidence.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func BroadcastEvidence(ctx *rpctypes.Context, ev types.Evidence) (*ctypes.Result
return nil, fmt.Errorf("evidence.ValidateBasic failed: %w", err)
}

if err := env.EvidencePool.AddEvidence(ev); err != nil {
if err := GetEnvironment().EvidencePool.AddEvidence(ev); err != nil {
return nil, fmt.Errorf("failed to add evidence: %w", err)
}
return &ctypes.ResultBroadcastEvidence{Hash: ev.Hash()}, nil
Expand Down
9 changes: 6 additions & 3 deletions rpc/core/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
// CheckTx nor DeliverTx results.
// More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_async
func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
err := env.Mempool.CheckTx(tx, nil, mempl.TxInfo{})
err := GetEnvironment().Mempool.CheckTx(tx, nil, mempl.TxInfo{})

if err != nil {
return nil, err
Expand All @@ -33,7 +33,7 @@ func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadca
// More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_sync
func BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
resCh := make(chan *abci.Response, 1)
err := env.Mempool.CheckTx(tx, func(res *abci.Response) {
err := GetEnvironment().Mempool.CheckTx(tx, func(res *abci.Response) {
select {
case <-ctx.Context().Done():
case resCh <- res:
Expand Down Expand Up @@ -63,6 +63,7 @@ func BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcas
// More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_commit
func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
subscriber := ctx.RemoteAddr()
env := GetEnvironment()

if env.EventBus.NumClients() >= env.Config.MaxSubscriptionClients {
return nil, fmt.Errorf("max_subscription_clients %d reached", env.Config.MaxSubscriptionClients)
Expand Down Expand Up @@ -153,6 +154,7 @@ func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadc
func UnconfirmedTxs(ctx *rpctypes.Context, limitPtr *int) (*ctypes.ResultUnconfirmedTxs, error) {
// reuse per_page validator
limit := validatePerPage(limitPtr)
env := GetEnvironment()

txs := env.Mempool.ReapMaxTxs(limit)
return &ctypes.ResultUnconfirmedTxs{
Expand All @@ -165,6 +167,7 @@ func UnconfirmedTxs(ctx *rpctypes.Context, limitPtr *int) (*ctypes.ResultUnconfi
// NumUnconfirmedTxs gets number of unconfirmed transactions.
// More: https://docs.tendermint.com/master/rpc/#/Info/num_unconfirmed_txs
func NumUnconfirmedTxs(ctx *rpctypes.Context) (*ctypes.ResultUnconfirmedTxs, error) {
env := GetEnvironment()
return &ctypes.ResultUnconfirmedTxs{
Count: env.Mempool.Size(),
Total: env.Mempool.Size(),
Expand All @@ -175,7 +178,7 @@ func NumUnconfirmedTxs(ctx *rpctypes.Context) (*ctypes.ResultUnconfirmedTxs, err
// be added to the mempool either.
// More: https://docs.tendermint.com/master/rpc/#/Tx/check_tx
func CheckTx(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultCheckTx, error) {
res, err := env.ProxyAppMempool.CheckTxSync(abci.RequestCheckTx{Tx: tx})
res, err := GetEnvironment().ProxyAppMempool.CheckTxSync(abci.RequestCheckTx{Tx: tx})
if err != nil {
return nil, err
}
Expand Down
Loading