Skip to content

Commit

Permalink
fix: patch data race (#869)
Browse files Browse the repository at this point in the history
* fix: protect global var with a sync.Once

* fix: use a mutex instead of sync.Once to allow for arbitrary swapping of the env

* fix: implement and use GetEnvironment

* fix: lint by disabling lll

* fix: invoke GetEnvironment() once per function scope

* remove //nolint:lll

* make proto-gen

Co-authored-by: Rootul Patel <[email protected]>
  • Loading branch information
evan-forbes and rootulp authored Nov 17, 2022
1 parent 507b620 commit f0b9b8d
Show file tree
Hide file tree
Showing 13 changed files with 68 additions and 27 deletions.
4 changes: 2 additions & 2 deletions rpc/core/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func ABCIQuery(
height int64,
prove bool,
) (*ctypes.ResultABCIQuery, error) {
resQuery, err := env.ProxyAppQuery.QuerySync(abci.RequestQuery{
resQuery, err := GetEnvironment().ProxyAppQuery.QuerySync(abci.RequestQuery{
Path: path,
Data: data,
Height: height,
Expand All @@ -33,7 +33,7 @@ func ABCIQuery(
// ABCIInfo gets some info about the application.
// More: https://docs.tendermint.com/master/rpc/#/ABCI/abci_info
func ABCIInfo(ctx *rpctypes.Context) (*ctypes.ResultABCIInfo, error) {
resInfo, err := env.ProxyAppQuery.InfoSync(proxy.RequestInfo)
resInfo, err := GetEnvironment().ProxyAppQuery.InfoSync(proxy.RequestInfo)
if err != nil {
return nil, err
}
Expand Down
13 changes: 10 additions & 3 deletions rpc/core/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func BlockchainInfo(ctx *rpctypes.Context, minHeight, maxHeight int64) (*ctypes.
// maximum 20 block metas
const limit int64 = 20
var err error
env := GetEnvironment()
minHeight, maxHeight, err = filterMinMax(
env.BlockStore.Base(),
env.BlockStore.Height(),
Expand Down Expand Up @@ -82,13 +83,13 @@ func filterMinMax(base, height, min, max, limit int64) (int64, int64, error) {
// If no height is provided, it will fetch the latest block.
// More: https://docs.tendermint.com/master/rpc/#/Info/block
func Block(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlock, error) {
height, err := getHeight(env.BlockStore.Height(), heightPtr)
height, err := getHeight(GetEnvironment().BlockStore.Height(), heightPtr)
if err != nil {
return nil, err
}

block := env.BlockStore.LoadBlock(height)
blockMeta := env.BlockStore.LoadBlockMeta(height)
block := GetEnvironment().BlockStore.LoadBlock(height)
blockMeta := GetEnvironment().BlockStore.LoadBlockMeta(height)
if blockMeta == nil {
return &ctypes.ResultBlock{BlockID: types.BlockID{}, Block: block}, nil
}
Expand All @@ -98,6 +99,7 @@ func Block(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlock, error)
// BlockByHash gets block by hash.
// More: https://docs.tendermint.com/master/rpc/#/Info/block_by_hash
func BlockByHash(ctx *rpctypes.Context, hash []byte) (*ctypes.ResultBlock, error) {
env := GetEnvironment()
block := env.BlockStore.LoadBlockByHash(hash)
if block == nil {
return &ctypes.ResultBlock{BlockID: types.BlockID{}, Block: nil}, nil
Expand All @@ -111,6 +113,7 @@ func BlockByHash(ctx *rpctypes.Context, hash []byte) (*ctypes.ResultBlock, error
// If no height is provided, it will fetch the commit for the latest block.
// More: https://docs.tendermint.com/master/rpc/#/Info/commit
func Commit(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultCommit, error) {
env := GetEnvironment()
height, err := getHeight(env.BlockStore.Height(), heightPtr)
if err != nil {
return nil, err
Expand Down Expand Up @@ -161,6 +164,7 @@ func generateHeightsList(beginBlock uint64, endBlock uint64) []int64 {
// validateDataCommitmentRange runs basic checks on the asc sorted list of heights
// that will be used subsequently in generating data commitments over the defined set of heights.
func validateDataCommitmentRange(beginBlock uint64, endBlock uint64) error {
env := GetEnvironment()
heightsRange := endBlock - beginBlock + 1
if heightsRange > uint64(consts.DataCommitmentBlocksLimit) {
return fmt.Errorf("the query exceeds the limit of allowed blocks %d", consts.DataCommitmentBlocksLimit)
Expand Down Expand Up @@ -209,6 +213,7 @@ func hashDataRoots(blocks []*ctypes.ResultBlock) []byte {
// getBlock(h).Txs[5]
// More: https://docs.tendermint.com/master/rpc/#/Info/block_results
func BlockResults(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlockResults, error) {
env := GetEnvironment()
height, err := getHeight(env.BlockStore.Height(), heightPtr)
if err != nil {
return nil, err
Expand Down Expand Up @@ -268,6 +273,7 @@ func BlockSearch(

// heightsByQuery returns a list of heights corresponding to the provided query.
func heightsByQuery(ctx *rpctypes.Context, query string) ([]int64, error) {
env := GetEnvironment()
// skip if block indexing is disabled
if _, ok := env.BlockIndexer.(*blockidxnull.BlockerIndexer); ok {
return nil, errors.New("block indexing is disabled")
Expand Down Expand Up @@ -300,6 +306,7 @@ func sortBlocks(results []int64, orderBy string) error {

// fetchBlocks takes a list of block heights and fetches them.
func fetchBlocks(results []int64, pageSize int, skipCount int) []*ctypes.ResultBlock {
env := GetEnvironment()
apiResults := make([]*ctypes.ResultBlock, 0, pageSize)
for i := skipCount; i < skipCount+pageSize; i++ {
block := env.BlockStore.LoadBlock(results[i])
Expand Down
6 changes: 4 additions & 2 deletions rpc/core/blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,12 @@ func TestBlockResults(t *testing.T) {
BeginBlock: &abci.ResponseBeginBlock{},
}

env = &Environment{}
env := &Environment{}
env.StateStore = sm.NewStore(dbm.NewMemDB())
err := env.StateStore.SaveABCIResponses(100, results)
require.NoError(t, err)
env.BlockStore = mockBlockStore{height: 100}
SetEnvironment(env)

testCases := []struct {
height int64
Expand Down Expand Up @@ -121,7 +122,7 @@ func TestBlockResults(t *testing.T) {
}

func TestDataCommitmentResults(t *testing.T) {
env = &Environment{}
env := &Environment{}
height := int64(2826)

blocks := randomBlocks(height)
Expand Down Expand Up @@ -149,6 +150,7 @@ func TestDataCommitmentResults(t *testing.T) {
beginQueryBlock: tc.beginQuery,
endQueryBlock: tc.endQuery,
}
SetEnvironment(env)

actualCommitment, err := DataCommitment(&rpctypes.Context{}, uint64(tc.beginQuery), uint64(tc.endQuery))
if tc.expectPass {
Expand Down
10 changes: 5 additions & 5 deletions rpc/core/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func Validators(ctx *rpctypes.Context, heightPtr *int64, pagePtr, perPagePtr *in
return nil, err
}

validators, err := env.StateStore.LoadValidators(height)
validators, err := GetEnvironment().StateStore.LoadValidators(height)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -50,7 +50,7 @@ func Validators(ctx *rpctypes.Context, heightPtr *int64, pagePtr, perPagePtr *in
// More: https://docs.tendermint.com/master/rpc/#/Info/dump_consensus_state
func DumpConsensusState(ctx *rpctypes.Context) (*ctypes.ResultDumpConsensusState, error) {
// Get Peer consensus states.
peers := env.P2PPeers.Peers().List()
peers := GetEnvironment().P2PPeers.Peers().List()
peerStates := make([]ctypes.PeerStateInfo, len(peers))
for i, peer := range peers {
peerState, ok := peer.Get(types.PeerStateKey).(*cm.PeerState)
Expand All @@ -69,7 +69,7 @@ func DumpConsensusState(ctx *rpctypes.Context) (*ctypes.ResultDumpConsensusState
}
}
// Get self round state.
roundState, err := env.ConsensusState.GetRoundStateJSON()
roundState, err := GetEnvironment().ConsensusState.GetRoundStateJSON()
if err != nil {
return nil, err
}
Expand All @@ -83,7 +83,7 @@ func DumpConsensusState(ctx *rpctypes.Context) (*ctypes.ResultDumpConsensusState
// More: https://docs.tendermint.com/master/rpc/#/Info/consensus_state
func ConsensusState(ctx *rpctypes.Context) (*ctypes.ResultConsensusState, error) {
// Get self round state.
bz, err := env.ConsensusState.GetRoundStateSimpleJSON()
bz, err := GetEnvironment().ConsensusState.GetRoundStateSimpleJSON()
return &ctypes.ResultConsensusState{RoundState: bz}, err
}

Expand All @@ -98,7 +98,7 @@ func ConsensusParams(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultCon
return nil, err
}

consensusParams, err := env.StateStore.LoadConsensusParams(height)
consensusParams, err := GetEnvironment().StateStore.LoadConsensusParams(height)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion rpc/core/dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ import (

// UnsafeFlushMempool removes all transactions from the mempool.
func UnsafeFlushMempool(ctx *rpctypes.Context) (*ctypes.ResultUnsafeFlushMempool, error) {
env.Mempool.Flush()
GetEnvironment().Mempool.Flush()
return &ctypes.ResultUnsafeFlushMempool{}, nil
}
34 changes: 24 additions & 10 deletions rpc/core/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package core
import (
"encoding/base64"
"fmt"
"sync"
"time"

cfg "github.com/tendermint/tendermint/config"
Expand Down Expand Up @@ -35,13 +36,23 @@ const (

var (
// set by Node
env *Environment
mut = &sync.Mutex{}
globalEnv *Environment
)

// SetEnvironment sets up the given Environment.
// It will race if multiple Node call SetEnvironment.
// SetEnvironment sets the global environment to e. The globalEnv var that this
// function modifies is protected by a sync.Once so multiple calls within the
// same process will not be effective.
func SetEnvironment(e *Environment) {
env = e
mut.Lock()
defer mut.Unlock()
globalEnv = e
}

func GetEnvironment() *Environment {
mut.Lock()
defer mut.Unlock()
return globalEnv
}

//----------------------------------------------
Expand Down Expand Up @@ -69,7 +80,7 @@ type peers interface {
Peers() p2p.IPeerSet
}

//----------------------------------------------
// ----------------------------------------------
// Environment contains objects and interfaces used by the RPC. It is expected
// to be setup once during startup.
type Environment struct {
Expand Down Expand Up @@ -142,15 +153,15 @@ func validatePerPage(perPagePtr *int) int {
// InitGenesisChunks configures the environment and should be called on service
// startup.
func InitGenesisChunks() error {
if env.genChunks != nil {
if GetEnvironment().genChunks != nil {
return nil
}

if env.GenDoc == nil {
if GetEnvironment().GenDoc == nil {
return nil
}

data, err := tmjson.Marshal(env.GenDoc)
data, err := tmjson.Marshal(GetEnvironment().GenDoc)
if err != nil {
return err
}
Expand All @@ -162,7 +173,9 @@ func InitGenesisChunks() error {
end = len(data)
}

env.genChunks = append(env.genChunks, base64.StdEncoding.EncodeToString(data[i:end]))
mut.Lock()
defer mut.Unlock()
globalEnv.genChunks = append(globalEnv.genChunks, base64.StdEncoding.EncodeToString(data[i:end]))
}

return nil
Expand All @@ -188,7 +201,7 @@ func getHeight(latestHeight int64, heightPtr *int64) (int64, error) {
return 0, fmt.Errorf("height %d must be less than or equal to the current blockchain height %d",
height, latestHeight)
}
base := env.BlockStore.Base()
base := GetEnvironment().BlockStore.Base()
if height < base {
return 0, fmt.Errorf("height %d is not available, lowest height is %d",
height, base)
Expand All @@ -199,6 +212,7 @@ func getHeight(latestHeight int64, heightPtr *int64) (int64, error) {
}

func latestUncommittedHeight() int64 {
env := GetEnvironment()
nodeIsSyncing := env.ConsensusReactor.WaitSync()
if nodeIsSyncing {
return env.BlockStore.Height()
Expand Down
3 changes: 3 additions & 0 deletions rpc/core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const (
// More: https://docs.tendermint.com/master/rpc/#/Websocket/subscribe
func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, error) {
addr := ctx.RemoteAddr()
env := GetEnvironment()

if env.EventBus.NumClients() >= env.Config.MaxSubscriptionClients {
return nil, fmt.Errorf("max_subscription_clients %d reached", env.Config.MaxSubscriptionClients)
Expand Down Expand Up @@ -108,6 +109,7 @@ func Subscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultSubscribe, er
// More: https://docs.tendermint.com/master/rpc/#/Websocket/unsubscribe
func Unsubscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultUnsubscribe, error) {
addr := ctx.RemoteAddr()
env := GetEnvironment()
env.Logger.Info("Unsubscribe from query", "remote", addr, "query", query)
q, err := tmquery.New(query)
if err != nil {
Expand All @@ -124,6 +126,7 @@ func Unsubscribe(ctx *rpctypes.Context, query string) (*ctypes.ResultUnsubscribe
// More: https://docs.tendermint.com/master/rpc/#/Websocket/unsubscribe_all
func UnsubscribeAll(ctx *rpctypes.Context) (*ctypes.ResultUnsubscribe, error) {
addr := ctx.RemoteAddr()
env := GetEnvironment()
env.Logger.Info("Unsubscribe from all", "remote", addr)
err := env.EventBus.UnsubscribeAll(context.Background(), addr)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion rpc/core/evidence.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func BroadcastEvidence(ctx *rpctypes.Context, ev types.Evidence) (*ctypes.Result
return nil, fmt.Errorf("evidence.ValidateBasic failed: %w", err)
}

if err := env.EvidencePool.AddEvidence(ev); err != nil {
if err := GetEnvironment().EvidencePool.AddEvidence(ev); err != nil {
return nil, fmt.Errorf("failed to add evidence: %w", err)
}
return &ctypes.ResultBroadcastEvidence{Hash: ev.Hash()}, nil
Expand Down
9 changes: 6 additions & 3 deletions rpc/core/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
// CheckTx nor DeliverTx results.
// More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_async
func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
err := env.Mempool.CheckTx(tx, nil, mempl.TxInfo{})
err := GetEnvironment().Mempool.CheckTx(tx, nil, mempl.TxInfo{})

if err != nil {
return nil, err
Expand All @@ -33,7 +33,7 @@ func BroadcastTxAsync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadca
// More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_sync
func BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
resCh := make(chan *abci.Response, 1)
err := env.Mempool.CheckTx(tx, func(res *abci.Response) {
err := GetEnvironment().Mempool.CheckTx(tx, func(res *abci.Response) {
select {
case <-ctx.Context().Done():
case resCh <- res:
Expand Down Expand Up @@ -63,6 +63,7 @@ func BroadcastTxSync(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcas
// More: https://docs.tendermint.com/master/rpc/#/Tx/broadcast_tx_commit
func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) {
subscriber := ctx.RemoteAddr()
env := GetEnvironment()

if env.EventBus.NumClients() >= env.Config.MaxSubscriptionClients {
return nil, fmt.Errorf("max_subscription_clients %d reached", env.Config.MaxSubscriptionClients)
Expand Down Expand Up @@ -153,6 +154,7 @@ func BroadcastTxCommit(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultBroadc
func UnconfirmedTxs(ctx *rpctypes.Context, limitPtr *int) (*ctypes.ResultUnconfirmedTxs, error) {
// reuse per_page validator
limit := validatePerPage(limitPtr)
env := GetEnvironment()

txs := env.Mempool.ReapMaxTxs(limit)
return &ctypes.ResultUnconfirmedTxs{
Expand All @@ -165,6 +167,7 @@ func UnconfirmedTxs(ctx *rpctypes.Context, limitPtr *int) (*ctypes.ResultUnconfi
// NumUnconfirmedTxs gets number of unconfirmed transactions.
// More: https://docs.tendermint.com/master/rpc/#/Info/num_unconfirmed_txs
func NumUnconfirmedTxs(ctx *rpctypes.Context) (*ctypes.ResultUnconfirmedTxs, error) {
env := GetEnvironment()
return &ctypes.ResultUnconfirmedTxs{
Count: env.Mempool.Size(),
Total: env.Mempool.Size(),
Expand All @@ -175,7 +178,7 @@ func NumUnconfirmedTxs(ctx *rpctypes.Context) (*ctypes.ResultUnconfirmedTxs, err
// be added to the mempool either.
// More: https://docs.tendermint.com/master/rpc/#/Tx/check_tx
func CheckTx(ctx *rpctypes.Context, tx types.Tx) (*ctypes.ResultCheckTx, error) {
res, err := env.ProxyAppMempool.CheckTxSync(abci.RequestCheckTx{Tx: tx})
res, err := GetEnvironment().ProxyAppMempool.CheckTxSync(abci.RequestCheckTx{Tx: tx})
if err != nil {
return nil, err
}
Expand Down
5 changes: 5 additions & 0 deletions rpc/core/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
// NetInfo returns network info.
// More: https://docs.tendermint.com/master/rpc/#/Info/net_info
func NetInfo(ctx *rpctypes.Context) (*ctypes.ResultNetInfo, error) {
env := GetEnvironment()
peersList := env.P2PPeers.Peers().List()
peers := make([]ctypes.Peer, 0, len(peersList))
for _, peer := range peersList {
Expand Down Expand Up @@ -43,6 +44,7 @@ func UnsafeDialSeeds(ctx *rpctypes.Context, seeds []string) (*ctypes.ResultDialS
if len(seeds) == 0 {
return &ctypes.ResultDialSeeds{}, errors.New("no seeds provided")
}
env := GetEnvironment()
env.Logger.Info("DialSeeds", "seeds", seeds)
if err := env.P2PPeers.DialPeersAsync(seeds); err != nil {
return &ctypes.ResultDialSeeds{}, err
Expand All @@ -63,6 +65,7 @@ func UnsafeDialPeers(ctx *rpctypes.Context, peers []string, persistent, uncondit
return &ctypes.ResultDialPeers{}, err
}

env := GetEnvironment()
env.Logger.Info("DialPeers", "peers", peers, "persistent",
persistent, "unconditional", unconditional, "private", private)

Expand Down Expand Up @@ -94,6 +97,7 @@ func UnsafeDialPeers(ctx *rpctypes.Context, peers []string, persistent, uncondit
// Genesis returns genesis file.
// More: https://docs.tendermint.com/master/rpc/#/Info/genesis
func Genesis(ctx *rpctypes.Context) (*ctypes.ResultGenesis, error) {
env := GetEnvironment()
if len(env.genChunks) > 1 {
return nil, errors.New("genesis response is large, please use the genesis_chunked API instead")
}
Expand All @@ -102,6 +106,7 @@ func Genesis(ctx *rpctypes.Context) (*ctypes.ResultGenesis, error) {
}

func GenesisChunked(ctx *rpctypes.Context, chunk uint) (*ctypes.ResultGenesisChunk, error) {
env := GetEnvironment()
if env.genChunks == nil {
return nil, fmt.Errorf("service configuration error, genesis chunks are not initialized")
}
Expand Down
Loading

0 comments on commit f0b9b8d

Please sign in to comment.