Skip to content

Commit

Permalink
Limit the number of pending heavy getBlocksByRange queries
Browse files Browse the repository at this point in the history
  • Loading branch information
jieyilong committed Apr 21, 2023
1 parent 7a9432a commit 34bebdc
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 4 deletions.
6 changes: 5 additions & 1 deletion common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ const (
// CfgRPCMaxConnections limits concurrent connections accepted by RPC server.
CfgRPCMaxConnections = "rpc.maxConnections"
// CfgRPCTimeoutSecs set a timeout for RPC.
CfgRPCTimeoutSecs = "rpc.timeoutSecs"
CfgRPCTimeoutSecs = "rpc.timeoutSecs"
CfgRPCGetBlocksHeavyQueryThreshold = "rpc.getBlocksHeavyQueryThreshold"
CfgRPCMaxHeavyGetBlocksQueryCount = "rpc.maxHeavyGetBlocksQueryCount"

// CfgLogLevels sets the log level.
CfgLogLevels = "log.levels"
Expand Down Expand Up @@ -221,6 +223,8 @@ func init() {
viper.SetDefault(CfgRPCPort, "16900")
viper.SetDefault(CfgRPCMaxConnections, 200)
viper.SetDefault(CfgRPCTimeoutSecs, 60)
viper.SetDefault(CfgRPCGetBlocksHeavyQueryThreshold, 500)
viper.SetDefault(CfgRPCMaxHeavyGetBlocksQueryCount, 30)

viper.SetDefault(CfgLogLevels, "*:debug")
viper.SetDefault(CfgLogPrintSelfID, false)
Expand Down
36 changes: 33 additions & 3 deletions rpc/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/thetatoken/theta/ledger/types"

sbc "github.com/thetatoken/thetasubchain/blockchain"
scom "github.com/thetatoken/thetasubchain/common"
"github.com/thetatoken/thetasubchain/core"
score "github.com/thetatoken/thetasubchain/core"
"github.com/thetatoken/thetasubchain/ledger/state"
Expand Down Expand Up @@ -408,12 +409,41 @@ func (t *ThetaRPCService) GetBlocksByRange(args *GetBlocksByRangeArgs, result *G
}

if args.Start > args.End {
return errors.New("Starting block must be less than ending block")
return errors.New("starting block must be less than ending block")
}

maxBlockRange := common.JSONUint64(5000)
if args.End-args.Start > maxBlockRange {
return errors.New("Can't retrieve more than 100 blocks at a time")
blockStart := args.Start
blockEnd := args.End
queryBlockRange := blockEnd - blockStart
if queryBlockRange > maxBlockRange {
return fmt.Errorf("can't retrieve more than %v blocks at a time", maxBlockRange)
}

heavyQueryThreshold := viper.GetUint64(scom.CfgRPCGetBlocksHeavyQueryThreshold)
isHeavyQuery := uint64(queryBlockRange) > heavyQueryThreshold
if isHeavyQuery {
t.pendingHeavyGetBlocksCounterLock.Lock()
hasTooManyPendingHeavyQueries := t.pendingHeavyGetBlocksCounter > viper.GetUint64(scom.CfgRPCMaxHeavyGetBlocksQueryCount)
t.pendingHeavyGetBlocksCounterLock.Unlock()

if hasTooManyPendingHeavyQueries {
warningMsg := fmt.Sprintf("too many pending heavy getBlocksByRange queries, rejecting getBlocksByRange query from block %v to %v", blockStart, blockEnd)
logger.Warnf(warningMsg)
return fmt.Errorf(warningMsg)
}

t.pendingHeavyGetBlocksCounterLock.Lock()
t.pendingHeavyGetBlocksCounter += 1
t.pendingHeavyGetBlocksCounterLock.Unlock()

defer func() { // the heavy query has now been processed
t.pendingHeavyGetBlocksCounterLock.Lock()
if t.pendingHeavyGetBlocksCounter > 0 {
t.pendingHeavyGetBlocksCounter -= 1
}
t.pendingHeavyGetBlocksCounterLock.Unlock()
}()
}

blocks := t.chain.FindBlocksByHeight(uint64(args.End))
Expand Down
29 changes: 29 additions & 0 deletions rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/thetatoken/theta/common/timer"
"github.com/thetatoken/theta/common/util"
"github.com/thetatoken/theta/dispatcher"
"github.com/thetatoken/theta/rpc/lib/rpc-codec/jsonrpc2"
Expand All @@ -39,6 +40,10 @@ type ThetaRPCService struct {
chain *sbc.Chain
consensus *sconsensus.ConsensusEngine

pendingHeavyGetBlocksCounter uint64
pendingHeavyGetBlocksCounterLock *sync.Mutex
pendingHeavyGetBlocksCounterResetTimer *timer.RepeatTimer

// Life cycle
wg *sync.WaitGroup
ctx context.Context
Expand All @@ -62,8 +67,13 @@ func NewThetaRPCServer(mempool *smp.Mempool, ledger *sld.Ledger, dispatcher *dis
t := &ThetaRPCServer{
ThetaRPCService: &ThetaRPCService{
wg: &sync.WaitGroup{},

pendingHeavyGetBlocksCounter: 0,
pendingHeavyGetBlocksCounterLock: &sync.Mutex{},
pendingHeavyGetBlocksCounterResetTimer: timer.NewRepeatTimer("pendingHeavyGetBlocksCounterReset", 30*time.Minute),
},
}
t.pendingHeavyGetBlocksCounterResetTimer.Reset()

t.mempool = mempool
t.ledger = ledger
Expand Down Expand Up @@ -101,6 +111,9 @@ func (t *ThetaRPCServer) Start(ctx context.Context) {
t.wg.Add(1)
go t.mainLoop()

t.wg.Add(1)
go t.heavyQueryCounterLoop()

t.wg.Add(1)
go t.txCallback()
}
Expand All @@ -115,6 +128,22 @@ func (t *ThetaRPCServer) mainLoop() {
t.server.Shutdown(t.ctx)
}

func (t *ThetaRPCServer) heavyQueryCounterLoop() {
defer t.wg.Done()

for {
select {
case <-t.pendingHeavyGetBlocksCounterResetTimer.Ch:
t.pendingHeavyGetBlocksCounterLock.Lock()
t.pendingHeavyGetBlocksCounter = 0 // reset the counter to zero at a fixed time interval, otherwise the counter could get stuck if some pending queries never return
t.pendingHeavyGetBlocksCounterLock.Unlock()
case <-t.ctx.Done():
t.stopped = true
return
}
}
}

func (t *ThetaRPCServer) serve() {
address := viper.GetString(scom.CfgRPCAddress)
port := viper.GetString(scom.CfgRPCPort)
Expand Down

0 comments on commit 34bebdc

Please sign in to comment.