Skip to content

Commit

Permalink
feat: filter rate limit
Browse files Browse the repository at this point in the history
Also includes refactor to have a rate limiter per peer with ttl
  • Loading branch information
richard-ramos committed Nov 25, 2024
1 parent 96702e2 commit 440b816
Show file tree
Hide file tree
Showing 26 changed files with 206 additions and 41 deletions.
1 change: 1 addition & 0 deletions examples/basic-light-client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ require (
github.com/ipfs/go-log/v2 v2.5.1 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jellydator/ttlcache/v3 v3.3.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions examples/basic-light-client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,8 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc=
github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
Expand Down
1 change: 1 addition & 0 deletions examples/basic-relay/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ require (
github.com/ipfs/go-log/v2 v2.5.1 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jellydator/ttlcache/v3 v3.3.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions examples/basic-relay/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,8 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc=
github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
Expand Down
1 change: 1 addition & 0 deletions examples/chat2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ require (
github.com/ipfs/go-cid v0.4.1 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jellydator/ttlcache/v3 v3.3.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions examples/chat2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,8 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc=
github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
Expand Down
1 change: 1 addition & 0 deletions examples/filter2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ require (
github.com/ipfs/go-cid v0.4.1 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jellydator/ttlcache/v3 v3.3.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions examples/filter2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,8 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc=
github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
Expand Down
1 change: 1 addition & 0 deletions examples/noise/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ require (
github.com/ipfs/go-cid v0.4.1 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jellydator/ttlcache/v3 v3.3.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions examples/noise/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,8 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc=
github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
Expand Down
1 change: 1 addition & 0 deletions examples/rln/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ require (
github.com/ipfs/go-log/v2 v2.5.1 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jellydator/ttlcache/v3 v3.3.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions examples/rln/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,8 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc=
github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ require (
github.com/dustin/go-humanize v1.0.1
github.com/go-chi/chi/v5 v5.0.0
github.com/jackc/pgx/v5 v5.4.1
github.com/jellydator/ttlcache/v3 v3.3.0
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0
github.com/waku-org/go-noise v0.0.4
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,8 @@ github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0
github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc=
github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
Expand Down
4 changes: 2 additions & 2 deletions waku/v2/node/wakuoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type WakuNodeParameters struct {
enableRelay bool
enableFilterLightNode bool
enableFilterFullNode bool
filterOpts []filter.Option
filterOpts []filter.FullNodeOption
pubsubOpts []pubsub.Option
lightpushOpts []lightpush.Option

Expand Down Expand Up @@ -471,7 +471,7 @@ func WithWakuFilterLightNode() WakuNodeOption {

// WithWakuFilterFullNode enables the Waku Filter V2 protocol full node functionality.
// This WakuNodeOption accepts a list of WakuFilter options to setup the protocol
func WithWakuFilterFullNode(filterOpts ...filter.Option) WakuNodeOption {
func WithWakuFilterFullNode(filterOpts ...filter.FullNodeOption) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableFilterFullNode = true
params.filterOpts = filterOpts
Expand Down
17 changes: 16 additions & 1 deletion waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type WakuFilterLightNode struct {
log *zap.Logger
subscriptions *subscription.SubscriptionsMap
pm *peermanager.PeerManager
limiter *utils.RateLimiter
peerPingInterval time.Duration
}

Expand Down Expand Up @@ -89,6 +90,7 @@ func NewWakuFilterLightNode(
onlineChecker onlinechecker.OnlineChecker,
reg prometheus.Registerer,
log *zap.Logger,
opts ...LightNodeOption,
) *WakuFilterLightNode {
wf := new(WakuFilterLightNode)
wf.log = log.Named("filterv2-lightnode")
Expand All @@ -99,6 +101,14 @@ func NewWakuFilterLightNode(
wf.CommonService = service.NewCommonService()
wf.metrics = newMetrics(reg)
wf.peerPingInterval = 1 * time.Minute

params := &FilterLightNodeParameters{}
opts = append(DefaultLightNodeOptions(), opts...)
for _, opt := range opts {
opt(params)
}
wf.limiter = utils.NewRateLimiter(params.limitR, params.limitB)

return wf
}

Expand Down Expand Up @@ -155,6 +165,11 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea

logger := wf.log.With(logging.HostID("peerID", peerID))

if !wf.limiter.Allow(peerID) {
wf.metrics.RecordError(rateLimitFailure)
return
}

if !wf.subscriptions.IsSubscribedTo(peerID) {
logger.Warn("received message push from unknown peer", logging.HostID("peerID", peerID))
wf.metrics.RecordError(unknownPeerMessagePush)
Expand Down Expand Up @@ -287,7 +302,7 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte,

}

if filterSubscribeResponse.RequestId != request.RequestId {
if filterSubscribeResponse.RequestId != "N/A" && filterSubscribeResponse.RequestId != request.RequestId {
wf.log.Error("requestID mismatch", zap.String("expected", request.RequestId), zap.String("received", filterSubscribeResponse.RequestId))
wf.metrics.RecordError(requestIDMismatch)
err := NewFilterError(300, "request_id_mismatch")
Expand Down
1 change: 1 addition & 0 deletions waku/v2/protocol/filter/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ var (
peerNotFoundFailure metricsErrCategory = "peer_not_found_failure"
writeResponseFailure metricsErrCategory = "write_response_failure"
pushTimeoutFailure metricsErrCategory = "push_timeout_failure"
rateLimitFailure metricsErrCategory = "ratelimit_failure"
)

// RecordError increases the counter for different error types
Expand Down
51 changes: 41 additions & 10 deletions waku/v2/protocol/filter/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/protocol"
"go.uber.org/zap"
"golang.org/x/time/rate"
)

func (old *FilterSubscribeParameters) Copy() *FilterSubscribeParameters {
Expand Down Expand Up @@ -53,19 +54,41 @@ type (
wg *sync.WaitGroup
}

FilterParameters struct {
FilterFullNodeParameters struct {
Timeout time.Duration
MaxSubscribers int
pm *peermanager.PeerManager
limitR rate.Limit
limitB int
}

Option func(*FilterParameters)
FullNodeOption func(*FilterFullNodeParameters)

FilterLightNodeParameters struct {
limitR rate.Limit
limitB int
}

LightNodeOption func(*FilterLightNodeParameters)

FilterSubscribeOption func(*FilterSubscribeParameters) error
)

func WithTimeout(timeout time.Duration) Option {
return func(params *FilterParameters) {
func WithLightNodeRateLimiter(r rate.Limit, b int) LightNodeOption {
return func(params *FilterLightNodeParameters) {
params.limitR = r
params.limitB = b
}
}

func DefaultLightNodeOptions() []LightNodeOption {
return []LightNodeOption{
WithLightNodeRateLimiter(rate.Inf, 0),
}
}

func WithTimeout(timeout time.Duration) FullNodeOption {
return func(params *FilterFullNodeParameters) {
params.Timeout = timeout
}
}
Expand Down Expand Up @@ -190,21 +213,29 @@ func DefaultUnsubscribeOptions() []FilterSubscribeOption {
}
}

func WithMaxSubscribers(maxSubscribers int) Option {
return func(params *FilterParameters) {
func WithMaxSubscribers(maxSubscribers int) FullNodeOption {
return func(params *FilterFullNodeParameters) {
params.MaxSubscribers = maxSubscribers
}
}

func WithPeerManager(pm *peermanager.PeerManager) Option {
return func(params *FilterParameters) {
func WithPeerManager(pm *peermanager.PeerManager) FullNodeOption {
return func(params *FilterFullNodeParameters) {
params.pm = pm
}
}

func DefaultOptions() []Option {
return []Option{
func WithFullNodeRateLimiter(r rate.Limit, b int) FullNodeOption {
return func(params *FilterFullNodeParameters) {
params.limitR = r
params.limitB = b
}
}

func DefaultFullNodeOptions() []FullNodeOption {
return []FullNodeOption{
WithTimeout(DefaultIdleSubscriptionTimeout),
WithMaxSubscribers(DefaultMaxSubscribers),
WithFullNodeRateLimiter(rate.Inf, 0),
}
}
22 changes: 15 additions & 7 deletions waku/v2/protocol/filter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,26 @@ type (
metrics Metrics
log *zap.Logger
*service.CommonService
subscriptions *SubscribersMap
pm *peermanager.PeerManager

subscriptions *SubscribersMap
pm *peermanager.PeerManager
limiter *utils.RateLimiter
maxSubscriptions int
}
)

// NewWakuFilterFullNode returns a new instance of Waku Filter struct setup according to the chosen parameter and options
func NewWakuFilterFullNode(timesource timesource.Timesource, reg prometheus.Registerer, log *zap.Logger, opts ...Option) *WakuFilterFullNode {
func NewWakuFilterFullNode(timesource timesource.Timesource, reg prometheus.Registerer, log *zap.Logger, opts ...FullNodeOption) *WakuFilterFullNode {
wf := new(WakuFilterFullNode)
wf.log = log.Named("filterv2-fullnode")

params := new(FilterParameters)
optList := DefaultOptions()
params := new(FilterFullNodeParameters)
optList := DefaultFullNodeOptions()
optList = append(optList, opts...)
for _, opt := range optList {
opt(params)
}

wf.limiter = utils.NewRateLimiter(params.limitR, params.limitB)
wf.CommonService = service.NewCommonService()
wf.metrics = newMetrics(reg)
wf.subscriptions = NewSubscribersMap(params.Timeout)
Expand Down Expand Up @@ -93,7 +94,14 @@ func (wf *WakuFilterFullNode) start(sub *relay.Subscription) error {

func (wf *WakuFilterFullNode) onRequest(ctx context.Context) func(network.Stream) {
return func(stream network.Stream) {
logger := wf.log.With(logging.HostID("peer", stream.Conn().RemotePeer()))
peerID := stream.Conn().RemotePeer()
logger := wf.log.With(logging.HostID("peer", peerID))

if !wf.limiter.Allow(peerID) {
wf.metrics.RecordError(rateLimitFailure)
wf.reply(ctx, stream, &pb.FilterSubscribeRequest{RequestId: "N/A"}, http.StatusTooManyRequests, "filter request rejected due rate limit exceeded")
return
}

reader := pbio.NewDelimitedReader(stream, math.MaxInt32)

Expand Down
10 changes: 5 additions & 5 deletions waku/v2/protocol/lightpush/waku_lightpush.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
"golang.org/x/time/rate"
)

// LightPushID_v20beta1 is the current Waku LightPush protocol identifier
Expand All @@ -40,7 +39,7 @@ var (
type WakuLightPush struct {
h host.Host
relay *relay.WakuRelay
limiter *rate.Limiter
limiter *utils.RateLimiter
cancel context.CancelFunc
pm *peermanager.PeerManager
metrics Metrics
Expand All @@ -59,11 +58,12 @@ func NewWakuLightPush(relay *relay.WakuRelay, pm *peermanager.PeerManager, reg p
wakuLP.metrics = newMetrics(reg)

params := &LightpushParameters{}
opts = append(DefaultOptions(), opts...)
for _, opt := range opts {
opt(params)
}

wakuLP.limiter = params.limiter
wakuLP.limiter = utils.NewRateLimiter(params.limitR, params.limitB)

return wakuLP
}
Expand Down Expand Up @@ -106,7 +106,7 @@ func (wakuLP *WakuLightPush) onRequest(ctx context.Context) func(network.Stream)
Response: &pb.PushResponse{},
}

if wakuLP.limiter != nil && !wakuLP.limiter.Allow() {
if !wakuLP.limiter.Allow(stream.Conn().RemotePeer()) {
wakuLP.metrics.RecordError(rateLimitFailure)
responseMsg := "exceeds the rate limit"
responsePushRPC.Response.Info = &responseMsg
Expand Down Expand Up @@ -257,7 +257,7 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe
params.pm = wakuLP.pm
var err error

optList := append(DefaultOptions(wakuLP.h), opts...)
optList := append(DefaultRequestOptions(wakuLP.h), opts...)
for _, opt := range optList {
err := opt(params)
if err != nil {
Expand Down
Loading

0 comments on commit 440b816

Please sign in to comment.