From 34bebdc367cbf40b886ba35d71f30b1132306307 Mon Sep 17 00:00:00 2001 From: "Jieyi Long (THETA Network)" Date: Fri, 21 Apr 2023 13:43:00 +0800 Subject: [PATCH] Limit the number of pending heavy getBlocksByRange queries --- common/config.go | 6 +++++- rpc/query.go | 36 +++++++++++++++++++++++++++++++++--- rpc/server.go | 29 +++++++++++++++++++++++++++++ 3 files changed, 67 insertions(+), 4 deletions(-) diff --git a/common/config.go b/common/config.go index 96d6398..9232bf5 100644 --- a/common/config.go +++ b/common/config.go @@ -119,7 +119,9 @@ const ( // CfgRPCMaxConnections limits concurrent connections accepted by RPC server. CfgRPCMaxConnections = "rpc.maxConnections" // CfgRPCTimeoutSecs set a timeout for RPC. - CfgRPCTimeoutSecs = "rpc.timeoutSecs" + CfgRPCTimeoutSecs = "rpc.timeoutSecs" + CfgRPCGetBlocksHeavyQueryThreshold = "rpc.getBlocksHeavyQueryThreshold" + CfgRPCMaxHeavyGetBlocksQueryCount = "rpc.maxHeavyGetBlocksQueryCount" // CfgLogLevels sets the log level. CfgLogLevels = "log.levels" @@ -221,6 +223,8 @@ func init() { viper.SetDefault(CfgRPCPort, "16900") viper.SetDefault(CfgRPCMaxConnections, 200) viper.SetDefault(CfgRPCTimeoutSecs, 60) + viper.SetDefault(CfgRPCGetBlocksHeavyQueryThreshold, 500) + viper.SetDefault(CfgRPCMaxHeavyGetBlocksQueryCount, 30) viper.SetDefault(CfgLogLevels, "*:debug") viper.SetDefault(CfgLogPrintSelfID, false) diff --git a/rpc/query.go b/rpc/query.go index 1419d46..19a8ebb 100644 --- a/rpc/query.go +++ b/rpc/query.go @@ -15,6 +15,7 @@ import ( "github.com/thetatoken/theta/ledger/types" sbc "github.com/thetatoken/thetasubchain/blockchain" + scom "github.com/thetatoken/thetasubchain/common" "github.com/thetatoken/thetasubchain/core" score "github.com/thetatoken/thetasubchain/core" "github.com/thetatoken/thetasubchain/ledger/state" @@ -408,12 +409,41 @@ func (t *ThetaRPCService) GetBlocksByRange(args *GetBlocksByRangeArgs, result *G } if args.Start > args.End { - return errors.New("Starting block must be less than ending block") + return errors.New("starting block must be less than ending block") } maxBlockRange := common.JSONUint64(5000) - if args.End-args.Start > maxBlockRange { - return errors.New("Can't retrieve more than 100 blocks at a time") + blockStart := args.Start + blockEnd := args.End + queryBlockRange := blockEnd - blockStart + if queryBlockRange > maxBlockRange { + return fmt.Errorf("can't retrieve more than %v blocks at a time", maxBlockRange) + } + + heavyQueryThreshold := viper.GetUint64(scom.CfgRPCGetBlocksHeavyQueryThreshold) + isHeavyQuery := uint64(queryBlockRange) > heavyQueryThreshold + if isHeavyQuery { + t.pendingHeavyGetBlocksCounterLock.Lock() + hasTooManyPendingHeavyQueries := t.pendingHeavyGetBlocksCounter > viper.GetUint64(scom.CfgRPCMaxHeavyGetBlocksQueryCount) + t.pendingHeavyGetBlocksCounterLock.Unlock() + + if hasTooManyPendingHeavyQueries { + warningMsg := fmt.Sprintf("too many pending heavy getBlocksByRange queries, rejecting getBlocksByRange query from block %v to %v", blockStart, blockEnd) + logger.Warnf(warningMsg) + return fmt.Errorf(warningMsg) + } + + t.pendingHeavyGetBlocksCounterLock.Lock() + t.pendingHeavyGetBlocksCounter += 1 + t.pendingHeavyGetBlocksCounterLock.Unlock() + + defer func() { // the heavy query has now been processed + t.pendingHeavyGetBlocksCounterLock.Lock() + if t.pendingHeavyGetBlocksCounter > 0 { + t.pendingHeavyGetBlocksCounter -= 1 + } + t.pendingHeavyGetBlocksCounterLock.Unlock() + }() } blocks := t.chain.FindBlocksByHeight(uint64(args.End)) diff --git a/rpc/server.go b/rpc/server.go index 97961d4..1980b91 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -17,6 +17,7 @@ import ( log "github.com/sirupsen/logrus" "github.com/spf13/viper" + "github.com/thetatoken/theta/common/timer" "github.com/thetatoken/theta/common/util" "github.com/thetatoken/theta/dispatcher" "github.com/thetatoken/theta/rpc/lib/rpc-codec/jsonrpc2" @@ -39,6 +40,10 @@ type ThetaRPCService struct { chain *sbc.Chain consensus *sconsensus.ConsensusEngine + pendingHeavyGetBlocksCounter uint64 + pendingHeavyGetBlocksCounterLock *sync.Mutex + pendingHeavyGetBlocksCounterResetTimer *timer.RepeatTimer + // Life cycle wg *sync.WaitGroup ctx context.Context @@ -62,8 +67,13 @@ func NewThetaRPCServer(mempool *smp.Mempool, ledger *sld.Ledger, dispatcher *dis t := &ThetaRPCServer{ ThetaRPCService: &ThetaRPCService{ wg: &sync.WaitGroup{}, + + pendingHeavyGetBlocksCounter: 0, + pendingHeavyGetBlocksCounterLock: &sync.Mutex{}, + pendingHeavyGetBlocksCounterResetTimer: timer.NewRepeatTimer("pendingHeavyGetBlocksCounterReset", 30*time.Minute), }, } + t.pendingHeavyGetBlocksCounterResetTimer.Reset() t.mempool = mempool t.ledger = ledger @@ -101,6 +111,9 @@ func (t *ThetaRPCServer) Start(ctx context.Context) { t.wg.Add(1) go t.mainLoop() + t.wg.Add(1) + go t.heavyQueryCounterLoop() + t.wg.Add(1) go t.txCallback() } @@ -115,6 +128,22 @@ func (t *ThetaRPCServer) mainLoop() { t.server.Shutdown(t.ctx) } +func (t *ThetaRPCServer) heavyQueryCounterLoop() { + defer t.wg.Done() + + for { + select { + case <-t.pendingHeavyGetBlocksCounterResetTimer.Ch: + t.pendingHeavyGetBlocksCounterLock.Lock() + t.pendingHeavyGetBlocksCounter = 0 // reset the counter to zero at a fixed time interval, otherwise the counter could get stuck if some pending queries never return + t.pendingHeavyGetBlocksCounterLock.Unlock() + case <-t.ctx.Done(): + t.stopped = true + return + } + } +} + func (t *ThetaRPCServer) serve() { address := viper.GetString(scom.CfgRPCAddress) port := viper.GetString(scom.CfgRPCPort)