From f0b9b8dfb24e2ca245d3da2f1bda460ddde1bb6c Mon Sep 17 00:00:00 2001 From: Evan Forbes <42654277+evan-forbes@users.noreply.github.com> Date: Wed, 16 Nov 2022 18:01:59 -0600 Subject: [PATCH] fix: patch data race (#869) * fix: protect global var with a sync.Once * fix: use a mutex instead of sync.Once to allow for arbitrary swapping of the env * fix: implement and use GetEnvironment * fix: lint by disabling lll * fix: invoke GetEnvironment() once per function scope * remove //nolint:lll * make proto-gen Co-authored-by: Rootul Patel --- rpc/core/abci.go | 4 ++-- rpc/core/blocks.go | 13 ++++++++++--- rpc/core/blocks_test.go | 6 ++++-- rpc/core/consensus.go | 10 +++++----- rpc/core/dev.go | 2 +- rpc/core/env.go | 34 ++++++++++++++++++++++++---------- rpc/core/events.go | 3 +++ rpc/core/evidence.go | 2 +- rpc/core/mempool.go | 9 ++++++--- rpc/core/net.go | 5 +++++ rpc/core/net_test.go | 2 ++ rpc/core/status.go | 2 ++ rpc/core/tx.go | 3 +++ 13 files changed, 68 insertions(+), 27 deletions(-) diff --git a/rpc/core/abci.go b/rpc/core/abci.go index 2cf00f93a8..34aad0f3de 100644 --- a/rpc/core/abci.go +++ b/rpc/core/abci.go @@ -17,7 +17,7 @@ func ABCIQuery( height int64, prove bool, ) (*ctypes.ResultABCIQuery, error) { - resQuery, err := env.ProxyAppQuery.QuerySync(abci.RequestQuery{ + resQuery, err := GetEnvironment().ProxyAppQuery.QuerySync(abci.RequestQuery{ Path: path, Data: data, Height: height, @@ -33,7 +33,7 @@ func ABCIQuery( // ABCIInfo gets some info about the application. // More: https://docs.tendermint.com/master/rpc/#/ABCI/abci_info func ABCIInfo(ctx *rpctypes.Context) (*ctypes.ResultABCIInfo, error) { - resInfo, err := env.ProxyAppQuery.InfoSync(proxy.RequestInfo) + resInfo, err := GetEnvironment().ProxyAppQuery.InfoSync(proxy.RequestInfo) if err != nil { return nil, err } diff --git a/rpc/core/blocks.go b/rpc/core/blocks.go index 1ac0b9a162..e389d3f082 100644 --- a/rpc/core/blocks.go +++ b/rpc/core/blocks.go @@ -23,6 +23,7 @@ func BlockchainInfo(ctx *rpctypes.Context, minHeight, maxHeight int64) (*ctypes. // maximum 20 block metas const limit int64 = 20 var err error + env := GetEnvironment() minHeight, maxHeight, err = filterMinMax( env.BlockStore.Base(), env.BlockStore.Height(), @@ -82,13 +83,13 @@ func filterMinMax(base, height, min, max, limit int64) (int64, int64, error) { // If no height is provided, it will fetch the latest block. // More: https://docs.tendermint.com/master/rpc/#/Info/block func Block(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlock, error) { - height, err := getHeight(env.BlockStore.Height(), heightPtr) + height, err := getHeight(GetEnvironment().BlockStore.Height(), heightPtr) if err != nil { return nil, err } - block := env.BlockStore.LoadBlock(height) - blockMeta := env.BlockStore.LoadBlockMeta(height) + block := GetEnvironment().BlockStore.LoadBlock(height) + blockMeta := GetEnvironment().BlockStore.LoadBlockMeta(height) if blockMeta == nil { return &ctypes.ResultBlock{BlockID: types.BlockID{}, Block: block}, nil } @@ -98,6 +99,7 @@ func Block(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlock, error) // BlockByHash gets block by hash. // More: https://docs.tendermint.com/master/rpc/#/Info/block_by_hash func BlockByHash(ctx *rpctypes.Context, hash []byte) (*ctypes.ResultBlock, error) { + env := GetEnvironment() block := env.BlockStore.LoadBlockByHash(hash) if block == nil { return &ctypes.ResultBlock{BlockID: types.BlockID{}, Block: nil}, nil @@ -111,6 +113,7 @@ func BlockByHash(ctx *rpctypes.Context, hash []byte) (*ctypes.ResultBlock, error // If no height is provided, it will fetch the commit for the latest block. // More: https://docs.tendermint.com/master/rpc/#/Info/commit func Commit(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultCommit, error) { + env := GetEnvironment() height, err := getHeight(env.BlockStore.Height(), heightPtr) if err != nil { return nil, err @@ -161,6 +164,7 @@ func generateHeightsList(beginBlock uint64, endBlock uint64) []int64 { // validateDataCommitmentRange runs basic checks on the asc sorted list of heights // that will be used subsequently in generating data commitments over the defined set of heights. func validateDataCommitmentRange(beginBlock uint64, endBlock uint64) error { + env := GetEnvironment() heightsRange := endBlock - beginBlock + 1 if heightsRange > uint64(consts.DataCommitmentBlocksLimit) { return fmt.Errorf("the query exceeds the limit of allowed blocks %d", consts.DataCommitmentBlocksLimit) @@ -209,6 +213,7 @@ func hashDataRoots(blocks []*ctypes.ResultBlock) []byte { // getBlock(h).Txs[5] // More: https://docs.tendermint.com/master/rpc/#/Info/block_results func BlockResults(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlockResults, error) { + env := GetEnvironment() height, err := getHeight(env.BlockStore.Height(), heightPtr) if err != nil { return nil, err @@ -268,6 +273,7 @@ func BlockSearch( // heightsByQuery returns a list of heights corresponding to the provided query. func heightsByQuery(ctx *rpctypes.Context, query string) ([]int64, error) { + env := GetEnvironment() // skip if block indexing is disabled if _, ok := env.BlockIndexer.(*blockidxnull.BlockerIndexer); ok { return nil, errors.New("block indexing is disabled") @@ -300,6 +306,7 @@ func sortBlocks(results []int64, orderBy string) error { // fetchBlocks takes a list of block heights and fetches them. func fetchBlocks(results []int64, pageSize int, skipCount int) []*ctypes.ResultBlock { + env := GetEnvironment() apiResults := make([]*ctypes.ResultBlock, 0, pageSize) for i := skipCount; i < skipCount+pageSize; i++ { block := env.BlockStore.LoadBlock(results[i]) diff --git a/rpc/core/blocks_test.go b/rpc/core/blocks_test.go index a654b76300..19082f2cbc 100644 --- a/rpc/core/blocks_test.go +++ b/rpc/core/blocks_test.go @@ -85,11 +85,12 @@ func TestBlockResults(t *testing.T) { BeginBlock: &abci.ResponseBeginBlock{}, } - env = &Environment{} + env := &Environment{} env.StateStore = sm.NewStore(dbm.NewMemDB()) err := env.StateStore.SaveABCIResponses(100, results) require.NoError(t, err) env.BlockStore = mockBlockStore{height: 100} + SetEnvironment(env) testCases := []struct { height int64 @@ -121,7 +122,7 @@ func TestBlockResults(t *testing.T) { } func TestDataCommitmentResults(t *testing.T) { - env = &Environment{} + env := &Environment{} height := int64(2826) blocks := randomBlocks(height) @@ -149,6 +150,7 @@ func TestDataCommitmentResults(t *testing.T) { beginQueryBlock: tc.beginQuery, endQueryBlock: tc.endQuery, } + SetEnvironment(env) actualCommitment, err := DataCommitment(&rpctypes.Context{}, uint64(tc.beginQuery), uint64(tc.endQuery)) if tc.expectPass { diff --git a/rpc/core/consensus.go b/rpc/core/consensus.go index 89b5d6a459..29797ac9c2 100644 --- a/rpc/core/consensus.go +++ b/rpc/core/consensus.go @@ -22,7 +22,7 @@ func Validators(ctx *rpctypes.Context, heightPtr *int64, pagePtr, perPagePtr *in return nil, err } - validators, err := env.StateStore.LoadValidators(height) + validators, err := GetEnvironment().StateStore.LoadValidators(height) if err != nil { return nil, err } @@ -50,7 +50,7 @@ func Validators(ctx *rpctypes.Context, heightPtr *int64, pagePtr, perPagePtr *in // More: https://docs.tendermint.com/master/rpc/#/Info/dump_consensus_state func DumpConsensusState(ctx *rpctypes.Context) (*ctypes.ResultDumpConsensusState, error) { // Get Peer consensus states. - peers := env.P2PPeers.Peers().List() + peers := GetEnvironment().P2PPeers.Peers().List() peerStates := make([]ctypes.PeerStateInfo, len(peers)) for i, peer := range peers { peerState, ok := peer.Get(types.PeerStateKey).(*cm.PeerState) @@ -69,7 +69,7 @@ func DumpConsensusState(ctx *rpctypes.Context) (*ctypes.ResultDumpConsensusState } } // Get self round state. - roundState, err := env.ConsensusState.GetRoundStateJSON() + roundState, err := GetEnvironment().ConsensusState.GetRoundStateJSON() if err != nil { return nil, err } @@ -83,7 +83,7 @@ func DumpConsensusState(ctx *rpctypes.Context) (*ctypes.ResultDumpConsensusState // More: https://docs.tendermint.com/master/rpc/#/Info/consensus_state func ConsensusState(ctx *rpctypes.Context) (*ctypes.ResultConsensusState, error) { // Get self round state. - bz, err := env.ConsensusState.GetRoundStateSimpleJSON() + bz, err := GetEnvironment().ConsensusState.GetRoundStateSimpleJSON() return &ctypes.ResultConsensusState{RoundState: bz}, err } @@ -98,7 +98,7 @@ func ConsensusParams(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultCon return nil, err } - consensusParams, err := env.StateStore.LoadConsensusParams(height) + consensusParams, err := GetEnvironment().StateStore.LoadConsensusParams(height) if err != nil { return nil, err } diff --git a/rpc/core/dev.go b/rpc/core/dev.go index b70f5f1e12..5383653525 100644 --- a/rpc/core/dev.go +++ b/rpc/core/dev.go @@ -7,6 +7,6 @@ import ( // UnsafeFlushMempool removes all transactions from the mempool. func UnsafeFlushMempool(ctx *rpctypes.Context) (*ctypes.ResultUnsafeFlushMempool, error) { - env.Mempool.Flush() + GetEnvironment().Mempool.Flush() return &ctypes.ResultUnsafeFlushMempool{}, nil } diff --git a/rpc/core/env.go b/rpc/core/env.go index 11a51bfe70..e945f65691 100644 --- a/rpc/core/env.go +++ b/rpc/core/env.go @@ -3,6 +3,7 @@ package core import ( "encoding/base64" "fmt" + "sync" "time" cfg "github.com/tendermint/tendermint/config" @@ -35,13 +36,23 @@ const ( var ( // set by Node - env *Environment + mut = &sync.Mutex{} + globalEnv *Environment ) -// SetEnvironment sets up the given Environment. -// It will race if multiple Node call SetEnvironment. +// SetEnvironment sets the global environment to e. The globalEnv var that this +// function modifies is protected by a sync.Once so multiple calls within the +// same process will not be effective. func SetEnvironment(e *Environment) { - env = e + mut.Lock() + defer mut.Unlock() + globalEnv = e +} + +func GetEnvironment() *Environment { + mut.Lock() + defer mut.Unlock() + return globalEnv } //---------------------------------------------- @@ -69,7 +80,7 @@ type peers interface { Peers() p2p.IPeerSet } -//---------------------------------------------- +// ---------------------------------------------- // Environment contains objects and interfaces used by the RPC. It is expected // to be setup once during startup. type Environment struct { @@ -142,15 +153,15 @@ func validatePerPage(perPagePtr *int) int { // InitGenesisChunks configures the environment and should be called on service // startup. func InitGenesisChunks() error { - if env.genChunks != nil { + if GetEnvironment().genChunks != nil { return nil } - if env.GenDoc == nil { + if GetEnvironment().GenDoc == nil { return nil } - data, err := tmjson.Marshal(env.GenDoc) + data, err := tmjson.Marshal(GetEnvironment().GenDoc) if err != nil { return err } @@ -162,7 +173,9 @@ func InitGenesisChunks() error { end = len(data) } - env.genChunks = append(env.genChunks, base64.StdEncoding.EncodeToString(data[i:end])) + mut.Lock() + defer mut.Unlock() + globalEnv.genChunks = append(globalEnv.genChunks, base64.StdEncoding.EncodeToString(data[i:end])) } return nil @@ -188,7 +201,7 @@ func getHeight(latestHeight int64, heightPtr *int64) (int64, error) { return 0, fmt.Errorf("height %d must be less than or equal to the current blockchain height %d", height, latestHeight) } - base := env.BlockStore.Base() + base := GetEnvironment().BlockStore.Base() if height < base { return 0, fmt.Errorf("height %d is not available, lowest height is %d", height, base) @@ -199,6 +212,7 @@ func getHeight(latestHeight int64, heightPtr *int64) (int64, error) { } func latestUncommittedHeight() int64 { + env := GetEnvironment() nodeIsSyncing := env.ConsensusReactor.WaitSync() if nodeIsSyncing { return env.BlockStore.Height() diff --git a/rpc/core/events.go b/rpc/core/events.go index 438979004a..91c7e62a44 100644 --- a/rpc/core/events.go +++ b/rpc/core/events.go @@ -22,6 +22,7 @@ const ( // More: https://docs.tendermint.com/master/rpc/#/Websocket/subscribe func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, error) { addr := ctx.RemoteAddr() + env := GetEnvironment() if env.EventBus.NumClients() >= env.Config.MaxSubscriptionClients { return nil, fmt.Errorf("max_subscription_clients %d reached", env.Config.MaxSubscriptionClients) @@ -108,6 +109,7 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er // More: https://docs.tendermint.com/master/rpc/#/Websocket/unsubscribe func Unsubscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultUnsubscribe, error) { addr := ctx.RemoteAddr() + env := GetEnvironment() env.Logger.Info("Unsubscribe from query", "remote", addr, "query", query) q, err := tmquery.New(query) if err != nil { @@ -124,6 +126,7 @@ func Unsubscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultUnsubscribe // More: https://docs.tendermint.com/master/rpc/#/Websocket/unsubscribe_all func UnsubscribeAll(ctx *rpctypes.Context) (*ctypes.ResultUnsubscribe, error) { addr := ctx.RemoteAddr() + env := GetEnvironment() env.Logger.Info("Unsubscribe from all", "remote", addr) err := env.EventBus.UnsubscribeAll(context.Background(), addr) if err != nil { diff --git a/rpc/core/evidence.go b/rpc/core/evidence.go index b8fd2e2e20..373e7c442d 100644 --- a/rpc/core/evidence.go +++ b/rpc/core/evidence.go @@ -20,7 +20,7 @@ func BroadcastEvidence(ctx *rpctypes.Context, ev types.Evidence) (*ctypes.Result return nil, fmt.Errorf("evidence.ValidateBasic failed: %w", err) } - if err := env.EvidencePool.AddEvidence(ev); err != nil { + if err := GetEnvironment().EvidencePool.AddEvidence(ev); err != nil { return nil, fmt.Errorf("failed to add evidence: %w", err) } return &ctypes.ResultBroadcastEvidence{Hash: ev.Hash()}, nil diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index f9fa4b37e5..276fa4f53e 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -20,7 +20,7 @@ import ( // CheckTx nor DeliverTx results. // More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_async func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { - err := env.Mempool.CheckTx(tx, nil, mempl.TxInfo{}) + err := GetEnvironment().Mempool.CheckTx(tx, nil, mempl.TxInfo{}) if err != nil { return nil, err @@ -33,7 +33,7 @@ func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadca // More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_sync func BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) { resCh := make(chan *abci.Response, 1) - err := env.Mempool.CheckTx(tx, func(res *abci.Response) { + err := GetEnvironment().Mempool.CheckTx(tx, func(res *abci.Response) { select { case <-ctx.Context().Done(): case resCh <- res: @@ -63,6 +63,7 @@ func BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcas // More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_commit func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { subscriber := ctx.RemoteAddr() + env := GetEnvironment() if env.EventBus.NumClients() >= env.Config.MaxSubscriptionClients { return nil, fmt.Errorf("max_subscription_clients %d reached", env.Config.MaxSubscriptionClients) @@ -153,6 +154,7 @@ func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadc func UnconfirmedTxs(ctx *rpctypes.Context, limitPtr *int) (*ctypes.ResultUnconfirmedTxs, error) { // reuse per_page validator limit := validatePerPage(limitPtr) + env := GetEnvironment() txs := env.Mempool.ReapMaxTxs(limit) return &ctypes.ResultUnconfirmedTxs{ @@ -165,6 +167,7 @@ func UnconfirmedTxs(ctx *rpctypes.Context, limitPtr *int) (*ctypes.ResultUnconfi // NumUnconfirmedTxs gets number of unconfirmed transactions. // More: https://docs.tendermint.com/master/rpc/#/Info/num_unconfirmed_txs func NumUnconfirmedTxs(ctx *rpctypes.Context) (*ctypes.ResultUnconfirmedTxs, error) { + env := GetEnvironment() return &ctypes.ResultUnconfirmedTxs{ Count: env.Mempool.Size(), Total: env.Mempool.Size(), @@ -175,7 +178,7 @@ func NumUnconfirmedTxs(ctx *rpctypes.Context) (*ctypes.ResultUnconfirmedTxs, err // be added to the mempool either. // More: https://docs.tendermint.com/master/rpc/#/Tx/check_tx func CheckTx(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultCheckTx, error) { - res, err := env.ProxyAppMempool.CheckTxSync(abci.RequestCheckTx{Tx: tx}) + res, err := GetEnvironment().ProxyAppMempool.CheckTxSync(abci.RequestCheckTx{Tx: tx}) if err != nil { return nil, err } diff --git a/rpc/core/net.go b/rpc/core/net.go index 2a0e2c92d4..fd80d63877 100644 --- a/rpc/core/net.go +++ b/rpc/core/net.go @@ -13,6 +13,7 @@ import ( // NetInfo returns network info. // More: https://docs.tendermint.com/master/rpc/#/Info/net_info func NetInfo(ctx *rpctypes.Context) (*ctypes.ResultNetInfo, error) { + env := GetEnvironment() peersList := env.P2PPeers.Peers().List() peers := make([]ctypes.Peer, 0, len(peersList)) for _, peer := range peersList { @@ -43,6 +44,7 @@ func UnsafeDialSeeds(ctx *rpctypes.Context, seeds []string) (*ctypes.ResultDialS if len(seeds) == 0 { return &ctypes.ResultDialSeeds{}, errors.New("no seeds provided") } + env := GetEnvironment() env.Logger.Info("DialSeeds", "seeds", seeds) if err := env.P2PPeers.DialPeersAsync(seeds); err != nil { return &ctypes.ResultDialSeeds{}, err @@ -63,6 +65,7 @@ func UnsafeDialPeers(ctx *rpctypes.Context, peers []string, persistent, uncondit return &ctypes.ResultDialPeers{}, err } + env := GetEnvironment() env.Logger.Info("DialPeers", "peers", peers, "persistent", persistent, "unconditional", unconditional, "private", private) @@ -94,6 +97,7 @@ func UnsafeDialPeers(ctx *rpctypes.Context, peers []string, persistent, uncondit // Genesis returns genesis file. // More: https://docs.tendermint.com/master/rpc/#/Info/genesis func Genesis(ctx *rpctypes.Context) (*ctypes.ResultGenesis, error) { + env := GetEnvironment() if len(env.genChunks) > 1 { return nil, errors.New("genesis response is large, please use the genesis_chunked API instead") } @@ -102,6 +106,7 @@ func Genesis(ctx *rpctypes.Context) (*ctypes.ResultGenesis, error) { } func GenesisChunked(ctx *rpctypes.Context, chunk uint) (*ctypes.ResultGenesisChunk, error) { + env := GetEnvironment() if env.genChunks == nil { return nil, fmt.Errorf("service configuration error, genesis chunks are not initialized") } diff --git a/rpc/core/net_test.go b/rpc/core/net_test.go index c971776f3e..e44d5cf5b6 100644 --- a/rpc/core/net_test.go +++ b/rpc/core/net_test.go @@ -22,6 +22,7 @@ func TestUnsafeDialSeeds(t *testing.T) { t.Error(err) } }) + env := GetEnvironment() env.Logger = log.TestingLogger() env.P2PPeers = sw @@ -62,6 +63,7 @@ func TestUnsafeDialPeers(t *testing.T) { } }) + env := GetEnvironment() env.Logger = log.TestingLogger() env.P2PPeers = sw diff --git a/rpc/core/status.go b/rpc/core/status.go index 72f50f5468..37a07b0238 100644 --- a/rpc/core/status.go +++ b/rpc/core/status.go @@ -21,6 +21,7 @@ func Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) { earliestBlockTimeNano int64 ) + env := GetEnvironment() if earliestBlockMeta := env.BlockStore.LoadBaseMeta(); earliestBlockMeta != nil { earliestBlockHeight = earliestBlockMeta.Header.Height earliestAppHash = earliestBlockMeta.Header.AppHash @@ -75,6 +76,7 @@ func Status(ctx *rpctypes.Context) (*ctypes.ResultStatus, error) { } func validatorAtHeight(h int64) *types.Validator { + env := GetEnvironment() vals, err := env.StateStore.LoadValidators(h) if err != nil { return nil diff --git a/rpc/core/tx.go b/rpc/core/tx.go index 9d6aebabd7..9016916db2 100644 --- a/rpc/core/tx.go +++ b/rpc/core/tx.go @@ -22,6 +22,7 @@ import ( // place. // More: https://docs.tendermint.com/v0.34/rpc/#/Info/tx func Tx(ctx *rpctypes.Context, hash []byte, prove bool) (*ctypes.ResultTx, error) { + env := GetEnvironment() // if index is disabled, return error if _, ok := env.TxIndexer.(*null.TxIndex); ok { return nil, fmt.Errorf("transaction indexing is disabled") @@ -70,6 +71,7 @@ func TxSearch( orderBy string, ) (*ctypes.ResultTxSearch, error) { + env := GetEnvironment() // if index is disabled, return error if _, ok := env.TxIndexer.(*null.TxIndex); ok { return nil, errors.New("transaction indexing is disabled") @@ -149,6 +151,7 @@ func proveTx(height int64, index uint32) (types.TxProof, error) { pTxProof tmproto.TxProof txProof types.TxProof ) + env := GetEnvironment() rawBlock, err := loadRawBlock(env.BlockStore, height) if err != nil { return txProof, err