Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: patch data race #869

Merged
merged 8 commits into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rpc/core/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func ABCIQuery(
height int64,
prove bool,
) (*ctypes.ResultABCIQuery, error) {
resQuery, err := env.ProxyAppQuery.QuerySync(abci.RequestQuery{
resQuery, err := GetEnvironment().ProxyAppQuery.QuerySync(abci.RequestQuery{
Path: path,
Data: data,
Height: height,
Expand All @@ -33,7 +33,7 @@ func ABCIQuery(
// ABCIInfo gets some info about the application.
// More: https://docs.tendermint.com/master/rpc/#/ABCI/abci_info
func ABCIInfo(ctx *rpctypes.Context) (*ctypes.ResultABCIInfo, error) {
resInfo, err := env.ProxyAppQuery.InfoSync(proxy.RequestInfo)
resInfo, err := GetEnvironment().ProxyAppQuery.InfoSync(proxy.RequestInfo)
if err != nil {
return nil, err
}
Expand Down
48 changes: 24 additions & 24 deletions rpc/core/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,24 @@ func BlockchainInfo(ctx *rpctypes.Context, minHeight, maxHeight int64) (*ctypes.
const limit int64 = 20
var err error
minHeight, maxHeight, err = filterMinMax(
env.BlockStore.Base(),
env.BlockStore.Height(),
GetEnvironment().BlockStore.Base(),
GetEnvironment().BlockStore.Height(),
minHeight,
maxHeight,
limit)
if err != nil {
return nil, err
}
env.Logger.Debug("BlockchainInfoHandler", "maxHeight", maxHeight, "minHeight", minHeight)
GetEnvironment().Logger.Debug("BlockchainInfoHandler", "maxHeight", maxHeight, "minHeight", minHeight)

blockMetas := []*types.BlockMeta{}
for height := maxHeight; height >= minHeight; height-- {
blockMeta := env.BlockStore.LoadBlockMeta(height)
blockMeta := GetEnvironment().BlockStore.LoadBlockMeta(height)
blockMetas = append(blockMetas, blockMeta)
}

return &ctypes.ResultBlockchainInfo{
LastHeight: env.BlockStore.Height(),
LastHeight: GetEnvironment().BlockStore.Height(),
rootulp marked this conversation as resolved.
Show resolved Hide resolved
BlockMetas: blockMetas}, nil
}

Expand Down Expand Up @@ -82,13 +82,13 @@ func filterMinMax(base, height, min, max, limit int64) (int64, int64, error) {
// If no height is provided, it will fetch the latest block.
// More: https://docs.tendermint.com/master/rpc/#/Info/block
func Block(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlock, error) {
height, err := getHeight(env.BlockStore.Height(), heightPtr)
height, err := getHeight(GetEnvironment().BlockStore.Height(), heightPtr)
if err != nil {
return nil, err
}

block := env.BlockStore.LoadBlock(height)
blockMeta := env.BlockStore.LoadBlockMeta(height)
block := GetEnvironment().BlockStore.LoadBlock(height)
blockMeta := GetEnvironment().BlockStore.LoadBlockMeta(height)
if blockMeta == nil {
return &ctypes.ResultBlock{BlockID: types.BlockID{}, Block: block}, nil
}
Expand All @@ -98,39 +98,39 @@ func Block(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlock, error)
// BlockByHash gets block by hash.
// More: https://docs.tendermint.com/master/rpc/#/Info/block_by_hash
func BlockByHash(ctx *rpctypes.Context, hash []byte) (*ctypes.ResultBlock, error) {
block := env.BlockStore.LoadBlockByHash(hash)
block := GetEnvironment().BlockStore.LoadBlockByHash(hash)
if block == nil {
return &ctypes.ResultBlock{BlockID: types.BlockID{}, Block: nil}, nil
}
// If block is not nil, then blockMeta can't be nil.
blockMeta := env.BlockStore.LoadBlockMeta(block.Height)
blockMeta := GetEnvironment().BlockStore.LoadBlockMeta(block.Height)
return &ctypes.ResultBlock{BlockID: blockMeta.BlockID, Block: block}, nil
}

// Commit gets block commit at a given height.
// If no height is provided, it will fetch the commit for the latest block.
// More: https://docs.tendermint.com/master/rpc/#/Info/commit
func Commit(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultCommit, error) {
height, err := getHeight(env.BlockStore.Height(), heightPtr)
height, err := getHeight(GetEnvironment().BlockStore.Height(), heightPtr)
if err != nil {
return nil, err
}

blockMeta := env.BlockStore.LoadBlockMeta(height)
blockMeta := GetEnvironment().BlockStore.LoadBlockMeta(height)
if blockMeta == nil {
return nil, nil
}
header := blockMeta.Header

// If the next block has not been committed yet,
// use a non-canonical commit
if height == env.BlockStore.Height() {
commit := env.BlockStore.LoadSeenCommit(height)
if height == GetEnvironment().BlockStore.Height() {
commit := GetEnvironment().BlockStore.LoadSeenCommit(height)
return ctypes.NewResultCommit(&header, commit, false), nil
}

// Return the canonical commit (comes from the block at height+1)
commit := env.BlockStore.LoadBlockCommit(height)
commit := GetEnvironment().BlockStore.LoadBlockCommit(height)
return ctypes.NewResultCommit(&header, commit, true), nil
}

Expand Down Expand Up @@ -171,14 +171,14 @@ func validateDataCommitmentRange(beginBlock uint64, endBlock uint64) error {
if beginBlock > endBlock {
return fmt.Errorf("end block is smaller than begin block")
}
if endBlock > uint64(env.BlockStore.Height()) {
if endBlock > uint64(GetEnvironment().BlockStore.Height()) {
return fmt.Errorf(
"end block %d is higher than current chain height %d",
endBlock,
env.BlockStore.Height(),
GetEnvironment().BlockStore.Height(),
)
}
has, err := env.BlockIndexer.Has(int64(endBlock))
has, err := GetEnvironment().BlockIndexer.Has(int64(endBlock))
if err != nil {
return err
}
Expand Down Expand Up @@ -209,12 +209,12 @@ func hashDataRoots(blocks []*ctypes.ResultBlock) []byte {
// getBlock(h).Txs[5]
// More: https://docs.tendermint.com/master/rpc/#/Info/block_results
func BlockResults(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlockResults, error) {
height, err := getHeight(env.BlockStore.Height(), heightPtr)
height, err := getHeight(GetEnvironment().BlockStore.Height(), heightPtr)
if err != nil {
return nil, err
}

results, err := env.StateStore.LoadABCIResponses(height)
results, err := GetEnvironment().StateStore.LoadABCIResponses(height)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -269,7 +269,7 @@ func BlockSearch(
// heightsByQuery returns a list of heights corresponding to the provided query.
func heightsByQuery(ctx *rpctypes.Context, query string) ([]int64, error) {
// skip if block indexing is disabled
if _, ok := env.BlockIndexer.(*blockidxnull.BlockerIndexer); ok {
if _, ok := GetEnvironment().BlockIndexer.(*blockidxnull.BlockerIndexer); ok {
return nil, errors.New("block indexing is disabled")
}

Expand All @@ -278,7 +278,7 @@ func heightsByQuery(ctx *rpctypes.Context, query string) ([]int64, error) {
return nil, err
}

results, err := env.BlockIndexer.Search(ctx.Context(), q)
results, err := GetEnvironment().BlockIndexer.Search(ctx.Context(), q)
return results, err
}

Expand All @@ -302,9 +302,9 @@ func sortBlocks(results []int64, orderBy string) error {
func fetchBlocks(results []int64, pageSize int, skipCount int) []*ctypes.ResultBlock {
apiResults := make([]*ctypes.ResultBlock, 0, pageSize)
for i := skipCount; i < skipCount+pageSize; i++ {
block := env.BlockStore.LoadBlock(results[i])
block := GetEnvironment().BlockStore.LoadBlock(results[i])
if block != nil {
blockMeta := env.BlockStore.LoadBlockMeta(block.Height)
blockMeta := GetEnvironment().BlockStore.LoadBlockMeta(block.Height)
if blockMeta != nil {
apiResults = append(apiResults, &ctypes.ResultBlock{
Block: block,
Expand Down
6 changes: 4 additions & 2 deletions rpc/core/blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,12 @@ func TestBlockResults(t *testing.T) {
BeginBlock: &abci.ResponseBeginBlock{},
}

env = &Environment{}
env := &Environment{}
env.StateStore = sm.NewStore(dbm.NewMemDB())
err := env.StateStore.SaveABCIResponses(100, results)
require.NoError(t, err)
env.BlockStore = mockBlockStore{height: 100}
SetEnvironment(env)

testCases := []struct {
height int64
Expand Down Expand Up @@ -121,7 +122,7 @@ func TestBlockResults(t *testing.T) {
}

func TestDataCommitmentResults(t *testing.T) {
env = &Environment{}
env := &Environment{}
height := int64(2826)

blocks := randomBlocks(height)
Expand Down Expand Up @@ -149,6 +150,7 @@ func TestDataCommitmentResults(t *testing.T) {
beginQueryBlock: tc.beginQuery,
endQueryBlock: tc.endQuery,
}
SetEnvironment(env)

actualCommitment, err := DataCommitment(&rpctypes.Context{}, uint64(tc.beginQuery), uint64(tc.endQuery))
if tc.expectPass {
Expand Down
10 changes: 5 additions & 5 deletions rpc/core/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func Validators(ctx *rpctypes.Context, heightPtr *int64, pagePtr, perPagePtr *in
return nil, err
}

validators, err := env.StateStore.LoadValidators(height)
validators, err := GetEnvironment().StateStore.LoadValidators(height)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -50,7 +50,7 @@ func Validators(ctx *rpctypes.Context, heightPtr *int64, pagePtr, perPagePtr *in
// More: https://docs.tendermint.com/master/rpc/#/Info/dump_consensus_state
func DumpConsensusState(ctx *rpctypes.Context) (*ctypes.ResultDumpConsensusState, error) {
// Get Peer consensus states.
peers := env.P2PPeers.Peers().List()
peers := GetEnvironment().P2PPeers.Peers().List()
peerStates := make([]ctypes.PeerStateInfo, len(peers))
for i, peer := range peers {
peerState, ok := peer.Get(types.PeerStateKey).(*cm.PeerState)
Expand All @@ -69,7 +69,7 @@ func DumpConsensusState(ctx *rpctypes.Context) (*ctypes.ResultDumpConsensusState
}
}
// Get self round state.
roundState, err := env.ConsensusState.GetRoundStateJSON()
roundState, err := GetEnvironment().ConsensusState.GetRoundStateJSON()
if err != nil {
return nil, err
}
Expand All @@ -83,7 +83,7 @@ func DumpConsensusState(ctx *rpctypes.Context) (*ctypes.ResultDumpConsensusState
// More: https://docs.tendermint.com/master/rpc/#/Info/consensus_state
func ConsensusState(ctx *rpctypes.Context) (*ctypes.ResultConsensusState, error) {
// Get self round state.
bz, err := env.ConsensusState.GetRoundStateSimpleJSON()
bz, err := GetEnvironment().ConsensusState.GetRoundStateSimpleJSON()
return &ctypes.ResultConsensusState{RoundState: bz}, err
}

Expand All @@ -98,7 +98,7 @@ func ConsensusParams(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultCon
return nil, err
}

consensusParams, err := env.StateStore.LoadConsensusParams(height)
consensusParams, err := GetEnvironment().StateStore.LoadConsensusParams(height)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion rpc/core/dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ import (

// UnsafeFlushMempool removes all transactions from the mempool.
func UnsafeFlushMempool(ctx *rpctypes.Context) (*ctypes.ResultUnsafeFlushMempool, error) {
env.Mempool.Flush()
GetEnvironment().Mempool.Flush()
return &ctypes.ResultUnsafeFlushMempool{}, nil
}
39 changes: 26 additions & 13 deletions rpc/core/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package core
import (
"encoding/base64"
"fmt"
"sync"
"time"

cfg "github.com/tendermint/tendermint/config"
Expand Down Expand Up @@ -35,13 +36,23 @@ const (

var (
// set by Node
env *Environment
mut = &sync.Mutex{}
rootulp marked this conversation as resolved.
Show resolved Hide resolved
globalEnv *Environment
)

// SetEnvironment sets up the given Environment.
// It will race if multiple Node call SetEnvironment.
// SetEnvironment sets the global environment to e. The globalEnv var that this
// function modifies is protected by a sync.Once so multiple calls within the
evan-forbes marked this conversation as resolved.
Show resolved Hide resolved
// same process will not be effective.
func SetEnvironment(e *Environment) {
env = e
mut.Lock()
rach-id marked this conversation as resolved.
Show resolved Hide resolved
defer mut.Unlock()
globalEnv = e
}

func GetEnvironment() *Environment {
mut.Lock()
defer mut.Unlock()
return globalEnv
}

//----------------------------------------------
Expand Down Expand Up @@ -69,7 +80,7 @@ type peers interface {
Peers() p2p.IPeerSet
}

//----------------------------------------------
// ----------------------------------------------
// Environment contains objects and interfaces used by the RPC. It is expected
// to be setup once during startup.
type Environment struct {
Expand Down Expand Up @@ -142,15 +153,15 @@ func validatePerPage(perPagePtr *int) int {
// InitGenesisChunks configures the environment and should be called on service
// startup.
func InitGenesisChunks() error {
if env.genChunks != nil {
if GetEnvironment().genChunks != nil {
return nil
}

if env.GenDoc == nil {
if GetEnvironment().GenDoc == nil {
return nil
}

data, err := tmjson.Marshal(env.GenDoc)
data, err := tmjson.Marshal(GetEnvironment().GenDoc)
if err != nil {
return err
}
Expand All @@ -162,7 +173,9 @@ func InitGenesisChunks() error {
end = len(data)
}

env.genChunks = append(env.genChunks, base64.StdEncoding.EncodeToString(data[i:end]))
mut.Lock()
defer mut.Unlock()
globalEnv.genChunks = append(globalEnv.genChunks, base64.StdEncoding.EncodeToString(data[i:end]))
Comment on lines -165 to +178
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh no

}

return nil
Expand All @@ -188,7 +201,7 @@ func getHeight(latestHeight int64, heightPtr *int64) (int64, error) {
return 0, fmt.Errorf("height %d must be less than or equal to the current blockchain height %d",
height, latestHeight)
}
base := env.BlockStore.Base()
base := GetEnvironment().BlockStore.Base()
if height < base {
return 0, fmt.Errorf("height %d is not available, lowest height is %d",
height, base)
Expand All @@ -199,9 +212,9 @@ func getHeight(latestHeight int64, heightPtr *int64) (int64, error) {
}

func latestUncommittedHeight() int64 {
nodeIsSyncing := env.ConsensusReactor.WaitSync()
nodeIsSyncing := GetEnvironment().ConsensusReactor.WaitSync()
if nodeIsSyncing {
return env.BlockStore.Height()
return GetEnvironment().BlockStore.Height()
}
return env.BlockStore.Height() + 1
return GetEnvironment().BlockStore.Height() + 1
}
Loading