diff --git a/cmd/waku/server/rest/filter_test.go b/cmd/waku/server/rest/filter_test.go index 1641175b7..7877b61ff 100644 --- a/cmd/waku/server/rest/filter_test.go +++ b/cmd/waku/server/rest/filter_test.go @@ -23,6 +23,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/utils" + "golang.org/x/time/rate" ) func createNode(t *testing.T, opts ...node.WakuNodeOption) *node.WakuNode { @@ -37,8 +38,8 @@ func createNode(t *testing.T, opts ...node.WakuNodeOption) *node.WakuNode { // node2 connects to node1 func twoFilterConnectedNodes(t *testing.T, pubSubTopics ...string) (*node.WakuNode, *node.WakuNode) { - node1 := createNode(t, node.WithWakuFilterFullNode()) // full node filter - node2 := createNode(t, node.WithWakuFilterLightNode()) // light node filter + node1 := createNode(t, node.WithWakuFilterFullNode(filter.WithFullNodeRateLimiter(rate.Inf, 0))) // full node filter + node2 := createNode(t, node.WithWakuFilterLightNode()) // light node filter node2.Host().Peerstore().AddAddr(node1.Host().ID(), tests.GetHostAddress(node1.Host()), peerstore.PermanentAddrTTL) err := node2.Host().Peerstore().AddProtocols(node1.Host().ID(), filter.FilterSubscribeID_v20beta1) diff --git a/default.nix b/default.nix index d644b34cd..ea566f3fa 100644 --- a/default.nix +++ b/default.nix @@ -26,7 +26,7 @@ pkgs.buildGo121Module { '' else ""; # FIXME: This needs to be manually changed when updating modules. - vendorHash = "sha256-TrKlv3UHhFl+1HviEYFTmOpF+UiVdL6h53IkJXBFsRo="; + vendorHash = "sha256-yQ3anfZ/PU0M0KHiXqA9Ri8zFkg1nTYIk43jmcdGZYU="; # Fix for 'nix run' trying to execute 'go-waku'. meta = { mainProgram = "waku"; }; diff --git a/examples/basic-light-client/go.mod b/examples/basic-light-client/go.mod index f6c89887b..e54a79b77 100644 --- a/examples/basic-light-client/go.mod +++ b/examples/basic-light-client/go.mod @@ -62,6 +62,7 @@ require ( github.com/ipfs/go-log/v2 v2.5.1 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect + github.com/jellydator/ttlcache/v3 v3.3.0 // indirect github.com/klauspost/compress v1.17.9 // indirect github.com/klauspost/cpuid/v2 v2.2.8 // indirect github.com/koron/go-ssdp v0.0.4 // indirect diff --git a/examples/basic-light-client/go.sum b/examples/basic-light-client/go.sum index 39cb26316..d955e1ca6 100644 --- a/examples/basic-light-client/go.sum +++ b/examples/basic-light-client/go.sum @@ -335,6 +335,8 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk= github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU= github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU= +github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc= +github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= diff --git a/examples/basic-relay/go.mod b/examples/basic-relay/go.mod index 62da77484..c5ac2cc35 100644 --- a/examples/basic-relay/go.mod +++ b/examples/basic-relay/go.mod @@ -62,6 +62,7 @@ require ( github.com/ipfs/go-log/v2 v2.5.1 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect + github.com/jellydator/ttlcache/v3 v3.3.0 // indirect github.com/klauspost/compress v1.17.9 // indirect github.com/klauspost/cpuid/v2 v2.2.8 // indirect github.com/koron/go-ssdp v0.0.4 // indirect diff --git a/examples/basic-relay/go.sum b/examples/basic-relay/go.sum index 0251424b2..58711c6a4 100644 --- a/examples/basic-relay/go.sum +++ b/examples/basic-relay/go.sum @@ -337,6 +337,8 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk= github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU= github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU= +github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc= +github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= diff --git a/examples/chat2/go.mod b/examples/chat2/go.mod index 2a4a51568..6c1344793 100644 --- a/examples/chat2/go.mod +++ b/examples/chat2/go.mod @@ -70,6 +70,7 @@ require ( github.com/ipfs/go-cid v0.4.1 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect + github.com/jellydator/ttlcache/v3 v3.3.0 // indirect github.com/klauspost/compress v1.17.9 // indirect github.com/klauspost/cpuid/v2 v2.2.8 // indirect github.com/koron/go-ssdp v0.0.4 // indirect diff --git a/examples/chat2/go.sum b/examples/chat2/go.sum index 9010c4191..0ea193335 100644 --- a/examples/chat2/go.sum +++ b/examples/chat2/go.sum @@ -349,6 +349,8 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk= github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU= github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU= +github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc= +github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= diff --git a/examples/filter2/go.mod b/examples/filter2/go.mod index ff4512f5a..3ca4bd51b 100644 --- a/examples/filter2/go.mod +++ b/examples/filter2/go.mod @@ -57,6 +57,7 @@ require ( github.com/ipfs/go-cid v0.4.1 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect + github.com/jellydator/ttlcache/v3 v3.3.0 // indirect github.com/klauspost/compress v1.17.9 // indirect github.com/klauspost/cpuid/v2 v2.2.8 // indirect github.com/koron/go-ssdp v0.0.4 // indirect diff --git a/examples/filter2/go.sum b/examples/filter2/go.sum index d4a571cea..04bf1ad6c 100644 --- a/examples/filter2/go.sum +++ b/examples/filter2/go.sum @@ -335,6 +335,8 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk= github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU= github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU= +github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc= +github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= diff --git a/examples/noise/go.mod b/examples/noise/go.mod index f8f2a3d6f..471b5f018 100644 --- a/examples/noise/go.mod +++ b/examples/noise/go.mod @@ -60,6 +60,7 @@ require ( github.com/ipfs/go-cid v0.4.1 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect + github.com/jellydator/ttlcache/v3 v3.3.0 // indirect github.com/klauspost/compress v1.17.9 // indirect github.com/klauspost/cpuid/v2 v2.2.8 // indirect github.com/koron/go-ssdp v0.0.4 // indirect diff --git a/examples/noise/go.sum b/examples/noise/go.sum index 3644a82e6..27afd3f8c 100644 --- a/examples/noise/go.sum +++ b/examples/noise/go.sum @@ -335,6 +335,8 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk= github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU= github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU= +github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc= +github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= diff --git a/examples/rln/go.mod b/examples/rln/go.mod index 00e33f507..1d87befd6 100644 --- a/examples/rln/go.mod +++ b/examples/rln/go.mod @@ -58,6 +58,7 @@ require ( github.com/ipfs/go-log/v2 v2.5.1 // indirect github.com/jackpal/go-nat-pmp v1.0.2 // indirect github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect + github.com/jellydator/ttlcache/v3 v3.3.0 // indirect github.com/klauspost/compress v1.17.9 // indirect github.com/klauspost/cpuid/v2 v2.2.8 // indirect github.com/koron/go-ssdp v0.0.4 // indirect diff --git a/examples/rln/go.sum b/examples/rln/go.sum index d4a571cea..04bf1ad6c 100644 --- a/examples/rln/go.sum +++ b/examples/rln/go.sum @@ -335,6 +335,8 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk= github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU= github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU= +github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc= +github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= diff --git a/go.mod b/go.mod index 267d6080e..2dc2bf34e 100644 --- a/go.mod +++ b/go.mod @@ -40,6 +40,7 @@ require ( github.com/dustin/go-humanize v1.0.1 github.com/go-chi/chi/v5 v5.0.0 github.com/jackc/pgx/v5 v5.4.1 + github.com/jellydator/ttlcache/v3 v3.3.0 github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 github.com/waku-org/go-noise v0.0.4 github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 diff --git a/go.sum b/go.sum index 722d878a5..21d1de737 100644 --- a/go.sum +++ b/go.sum @@ -949,6 +949,8 @@ github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0 github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU= github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU= +github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc= +github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 3b56d4700..3d81048d6 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -54,6 +54,7 @@ type WakuFilterLightNode struct { log *zap.Logger subscriptions *subscription.SubscriptionsMap pm *peermanager.PeerManager + limiter *utils.RateLimiter peerPingInterval time.Duration } @@ -89,6 +90,7 @@ func NewWakuFilterLightNode( onlineChecker onlinechecker.OnlineChecker, reg prometheus.Registerer, log *zap.Logger, + opts ...LightNodeOption, ) *WakuFilterLightNode { wf := new(WakuFilterLightNode) wf.log = log.Named("filterv2-lightnode") @@ -99,6 +101,14 @@ func NewWakuFilterLightNode( wf.CommonService = service.NewCommonService() wf.metrics = newMetrics(reg) wf.peerPingInterval = 1 * time.Minute + + params := &LightNodeParameters{} + opts = append(DefaultLightNodeOptions(), opts...) + for _, opt := range opts { + opt(params) + } + wf.limiter = utils.NewRateLimiter(params.limitR, params.limitB) + return wf } @@ -155,6 +165,14 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea logger := wf.log.With(logging.HostID("peerID", peerID)) + if !wf.limiter.Allow(peerID) { + wf.metrics.RecordError(rateLimitFailure) + if err := stream.Reset(); err != nil { + wf.log.Error("resetting connection", zap.Error(err)) + } + return + } + if !wf.subscriptions.IsSubscribedTo(peerID) { logger.Warn("received message push from unknown peer", logging.HostID("peerID", peerID)) wf.metrics.RecordError(unknownPeerMessagePush) @@ -287,7 +305,7 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte, } - if filterSubscribeResponse.RequestId != request.RequestId { + if filterSubscribeResponse.RequestId != "N/A" && filterSubscribeResponse.RequestId != request.RequestId { wf.log.Error("requestID mismatch", zap.String("expected", request.RequestId), zap.String("received", filterSubscribeResponse.RequestId)) wf.metrics.RecordError(requestIDMismatch) err := NewFilterError(300, "request_id_mismatch") diff --git a/waku/v2/protocol/filter/metrics.go b/waku/v2/protocol/filter/metrics.go index 51e3b356d..89ac8e4aa 100644 --- a/waku/v2/protocol/filter/metrics.go +++ b/waku/v2/protocol/filter/metrics.go @@ -96,6 +96,7 @@ var ( peerNotFoundFailure metricsErrCategory = "peer_not_found_failure" writeResponseFailure metricsErrCategory = "write_response_failure" pushTimeoutFailure metricsErrCategory = "push_timeout_failure" + rateLimitFailure metricsErrCategory = "ratelimit_failure" ) // RecordError increases the counter for different error types diff --git a/waku/v2/protocol/filter/options.go b/waku/v2/protocol/filter/options.go index f19876212..bde105e47 100644 --- a/waku/v2/protocol/filter/options.go +++ b/waku/v2/protocol/filter/options.go @@ -11,6 +11,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol" "go.uber.org/zap" + "golang.org/x/time/rate" ) func (old *FilterSubscribeParameters) Copy() *FilterSubscribeParameters { @@ -57,13 +58,35 @@ type ( Timeout time.Duration MaxSubscribers int pm *peermanager.PeerManager + limitR rate.Limit + limitB int } Option func(*FilterParameters) + LightNodeParameters struct { + limitR rate.Limit + limitB int + } + + LightNodeOption func(*LightNodeParameters) + FilterSubscribeOption func(*FilterSubscribeParameters) error ) +func WithLightNodeRateLimiter(r rate.Limit, b int) LightNodeOption { + return func(params *LightNodeParameters) { + params.limitR = r + params.limitB = b + } +} + +func DefaultLightNodeOptions() []LightNodeOption { + return []LightNodeOption{ + WithLightNodeRateLimiter(1, 1), + } +} + func WithTimeout(timeout time.Duration) Option { return func(params *FilterParameters) { params.Timeout = timeout @@ -202,9 +225,17 @@ func WithPeerManager(pm *peermanager.PeerManager) Option { } } +func WithFullNodeRateLimiter(r rate.Limit, b int) Option { + return func(params *FilterParameters) { + params.limitR = r + params.limitB = b + } +} + func DefaultOptions() []Option { return []Option{ WithTimeout(DefaultIdleSubscriptionTimeout), WithMaxSubscribers(DefaultMaxSubscribers), + WithFullNodeRateLimiter(1, 1), } } diff --git a/waku/v2/protocol/filter/server.go b/waku/v2/protocol/filter/server.go index bacfe85cc..82c4c47da 100644 --- a/waku/v2/protocol/filter/server.go +++ b/waku/v2/protocol/filter/server.go @@ -37,9 +37,9 @@ type ( metrics Metrics log *zap.Logger *service.CommonService - subscriptions *SubscribersMap - pm *peermanager.PeerManager - + subscriptions *SubscribersMap + pm *peermanager.PeerManager + limiter *utils.RateLimiter maxSubscriptions int } ) @@ -56,6 +56,7 @@ func NewWakuFilterFullNode(timesource timesource.Timesource, reg prometheus.Regi opt(params) } + wf.limiter = utils.NewRateLimiter(params.limitR, params.limitB) wf.CommonService = service.NewCommonService() wf.metrics = newMetrics(reg) wf.subscriptions = NewSubscribersMap(params.Timeout) @@ -93,7 +94,14 @@ func (wf *WakuFilterFullNode) start(sub *relay.Subscription) error { func (wf *WakuFilterFullNode) onRequest(ctx context.Context) func(network.Stream) { return func(stream network.Stream) { - logger := wf.log.With(logging.HostID("peer", stream.Conn().RemotePeer())) + peerID := stream.Conn().RemotePeer() + logger := wf.log.With(logging.HostID("peer", peerID)) + + if !wf.limiter.Allow(peerID) { + wf.metrics.RecordError(rateLimitFailure) + wf.reply(ctx, stream, &pb.FilterSubscribeRequest{RequestId: "N/A"}, http.StatusTooManyRequests, "filter request rejected due rate limit exceeded") + return + } reader := pbio.NewDelimitedReader(stream, math.MaxInt32) diff --git a/waku/v2/protocol/filter/test_utils.go b/waku/v2/protocol/filter/test_utils.go index 015cb352e..88b9e04e6 100644 --- a/waku/v2/protocol/filter/test_utils.go +++ b/waku/v2/protocol/filter/test_utils.go @@ -22,6 +22,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" + "golang.org/x/time/rate" ) type LightNodeData struct { @@ -133,7 +134,7 @@ func (s *FilterTestSuite) GetWakuFilterFullNode(topic string, withRegisterAll bo nodeData := s.GetWakuRelay(topic) - node2Filter := NewWakuFilterFullNode(timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log) + node2Filter := NewWakuFilterFullNode(timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log, WithFullNodeRateLimiter(rate.Inf, 0)) node2Filter.SetHost(nodeData.FullNodeHost) var sub *relay.Subscription @@ -166,7 +167,7 @@ func (s *FilterTestSuite) GetWakuFilterLightNode() LightNodeData { b := relay.NewBroadcaster(10) s.Require().NoError(b.Start(context.Background())) pm := peermanager.NewPeerManager(5, 5, nil, nil, true, s.Log) - filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), onlinechecker.NewDefaultOnlineChecker(true), prometheus.DefaultRegisterer, s.Log) + filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), onlinechecker.NewDefaultOnlineChecker(true), prometheus.DefaultRegisterer, s.Log, WithLightNodeRateLimiter(rate.Inf, 0)) filterPush.SetHost(host) pm.SetHost(host) return LightNodeData{filterPush, host} diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 9d6744315..7e411a4ac 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -24,7 +24,6 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" - "golang.org/x/time/rate" ) // LightPushID_v20beta1 is the current Waku LightPush protocol identifier @@ -40,7 +39,7 @@ var ( type WakuLightPush struct { h host.Host relay *relay.WakuRelay - limiter *rate.Limiter + limiter *utils.RateLimiter cancel context.CancelFunc pm *peermanager.PeerManager metrics Metrics @@ -59,11 +58,12 @@ func NewWakuLightPush(relay *relay.WakuRelay, pm *peermanager.PeerManager, reg p wakuLP.metrics = newMetrics(reg) params := &LightpushParameters{} + opts = append(DefaultLightpushOptions(), opts...) for _, opt := range opts { opt(params) } - wakuLP.limiter = params.limiter + wakuLP.limiter = utils.NewRateLimiter(params.limitR, params.limitB) return wakuLP } @@ -106,7 +106,7 @@ func (wakuLP *WakuLightPush) onRequest(ctx context.Context) func(network.Stream) Response: &pb.PushResponse{}, } - if wakuLP.limiter != nil && !wakuLP.limiter.Allow() { + if !wakuLP.limiter.Allow(stream.Conn().RemotePeer()) { wakuLP.metrics.RecordError(rateLimitFailure) responseMsg := "exceeds the rate limit" responsePushRPC.Response.Info = &responseMsg diff --git a/waku/v2/protocol/lightpush/waku_lightpush_option.go b/waku/v2/protocol/lightpush/waku_lightpush_option.go index 7ed043705..b9740ab4c 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_option.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_option.go @@ -14,7 +14,8 @@ import ( ) type LightpushParameters struct { - limiter *rate.Limiter + limitR rate.Limit + limitB int } type Option func(*LightpushParameters) @@ -22,7 +23,14 @@ type Option func(*LightpushParameters) // WithRateLimiter is an option used to specify a rate limiter for requests received in lightpush protocol func WithRateLimiter(r rate.Limit, b int) Option { return func(params *LightpushParameters) { - params.limiter = rate.NewLimiter(r, b) + params.limitR = r + params.limitB = b + } +} + +func DefaultLightpushOptions() []Option { + return []Option{ + WithRateLimiter(1, 1), } } diff --git a/waku/v2/protocol/lightpush/waku_lightpush_test.go b/waku/v2/protocol/lightpush/waku_lightpush_test.go index 6ff17f634..5b4e9111b 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_test.go @@ -8,6 +8,7 @@ import ( "time" "github.com/waku-org/go-waku/waku/v2/peermanager" + "golang.org/x/time/rate" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peerstore" @@ -273,7 +274,7 @@ func TestWakuLightPushCornerCases(t *testing.T) { defer node2.Stop() defer sub2.Unsubscribe() - lightPushNode2 := NewWakuLightPush(node2, pm, prometheus.DefaultRegisterer, utils.Logger()) + lightPushNode2 := NewWakuLightPush(node2, pm, prometheus.DefaultRegisterer, utils.Logger(), WithRateLimiter(rate.Inf, 0)) lightPushNode2.SetHost(host2) err := lightPushNode2.Start(ctx) require.NoError(t, err) @@ -358,7 +359,7 @@ func TestWakuLightPushWithStaticSharding(t *testing.T) { clientHost, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - client := NewWakuLightPush(nil, nil, prometheus.DefaultRegisterer, utils.Logger()) + client := NewWakuLightPush(nil, nil, prometheus.DefaultRegisterer, utils.Logger(), WithRateLimiter(rate.Inf, 0)) client.SetHost(clientHost) // Node2 @@ -366,7 +367,7 @@ func TestWakuLightPushWithStaticSharding(t *testing.T) { defer node2.Stop() defer sub2.Unsubscribe() - lightPushNode2 := NewWakuLightPush(node2, nil, prometheus.DefaultRegisterer, utils.Logger()) + lightPushNode2 := NewWakuLightPush(node2, nil, prometheus.DefaultRegisterer, utils.Logger(), WithRateLimiter(rate.Inf, 0)) lightPushNode2.SetHost(host2) err = lightPushNode2.Start(ctx) require.NoError(t, err) diff --git a/waku/v2/protocol/peer_exchange/protocol.go b/waku/v2/protocol/peer_exchange/protocol.go index 5a3821b96..dc181fb42 100644 --- a/waku/v2/protocol/peer_exchange/protocol.go +++ b/waku/v2/protocol/peer_exchange/protocol.go @@ -23,7 +23,6 @@ import ( "github.com/waku-org/go-waku/waku/v2/service" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" - "golang.org/x/time/rate" ) // PeerExchangeID_v20alpha1 is the current Waku Peer Exchange protocol identifier @@ -51,7 +50,7 @@ type WakuPeerExchange struct { peerConnector PeerConnector enrCache *enrCache - limiter *rate.Limiter + limiter *utils.RateLimiter } // NewWakuPeerExchange returns a new instance of WakuPeerExchange struct @@ -68,11 +67,12 @@ func NewWakuPeerExchange(disc *discv5.DiscoveryV5, clusterID uint16, peerConnect wakuPX.CommonService = service.NewCommonService() params := &PeerExchangeParameters{} + opts = append(DefaultPeerExchangeOptions(), opts...) for _, opt := range opts { opt(params) } - wakuPX.limiter = params.limiter + wakuPX.limiter = utils.NewRateLimiter(params.limiterR, params.limiterB) return wakuPX, nil } @@ -97,9 +97,10 @@ func (wakuPX *WakuPeerExchange) start() error { func (wakuPX *WakuPeerExchange) onRequest() func(network.Stream) { return func(stream network.Stream) { - logger := wakuPX.log.With(logging.HostID("peer", stream.Conn().RemotePeer())) + peerID := stream.Conn().RemotePeer() + logger := wakuPX.log.With(logging.HostID("peer", peerID)) - if wakuPX.limiter != nil && !wakuPX.limiter.Allow() { + if wakuPX.limiter != nil && !wakuPX.limiter.Allow(peerID) { wakuPX.metrics.RecordError(rateLimitFailure) wakuPX.log.Info("exceeds the rate limit") // TODO: peer exchange protocol should contain an err field diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go index c08988091..c25078b73 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go @@ -12,7 +12,8 @@ import ( ) type PeerExchangeParameters struct { - limiter *rate.Limiter + limiterR rate.Limit + limiterB int } type Option func(*PeerExchangeParameters) @@ -20,7 +21,14 @@ type Option func(*PeerExchangeParameters) // WithRateLimiter is an option used to specify a rate limiter for requests received in lightpush protocol func WithRateLimiter(r rate.Limit, b int) Option { return func(params *PeerExchangeParameters) { - params.limiter = rate.NewLimiter(r, b) + params.limiterR = r + params.limiterB = b + } +} + +func DefaultPeerExchangeOptions() []Option { + return []Option{ + WithRateLimiter(1, 1), } } diff --git a/waku/v2/utils/limiter.go b/waku/v2/utils/limiter.go new file mode 100644 index 000000000..a587659f8 --- /dev/null +++ b/waku/v2/utils/limiter.go @@ -0,0 +1,69 @@ +package utils + +import ( + "context" + "sync" + "time" + + "github.com/jellydator/ttlcache/v3" + "github.com/libp2p/go-libp2p/core/peer" + "golang.org/x/time/rate" +) + +type RateLimiter struct { + sync.Mutex + limiters *ttlcache.Cache[peer.ID, *rate.Limiter] + r rate.Limit + b int +} + +func NewRateLimiter(r rate.Limit, b int) *RateLimiter { + return &RateLimiter{ + r: r, + b: b, + limiters: ttlcache.New[peer.ID, *rate.Limiter]( + ttlcache.WithTTL[peer.ID, *rate.Limiter](30 * time.Minute), + ), + } +} + +func (r *RateLimiter) Start(ctx context.Context) { + go func() { + t := time.NewTicker(time.Hour) + defer t.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-t.C: + r.Lock() + r.limiters.DeleteExpired() + r.Unlock() + } + } + }() +} + +func (r *RateLimiter) getOrCreate(peerID peer.ID) *rate.Limiter { + r.Lock() + defer r.Unlock() + + var limiter *rate.Limiter + if !r.limiters.Has(peerID) { + limiter = rate.NewLimiter(r.r, r.b) + r.limiters.Set(peerID, limiter, ttlcache.DefaultTTL) + } else { + v := r.limiters.Get(peerID) + limiter = v.Value() + } + return limiter +} + +func (r *RateLimiter) Allow(peerID peer.ID) bool { + return r.getOrCreate(peerID).Allow() +} + +func (r *RateLimiter) Wait(ctx context.Context, peerID peer.ID) error { + return r.getOrCreate(peerID).Wait(ctx) +}