Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: filter rate limit #1258

Merged
merged 3 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pkgs.buildGo121Module {
'' else "";

# FIXME: This needs to be manually changed when updating modules.
vendorHash = "sha256-TrKlv3UHhFl+1HviEYFTmOpF+UiVdL6h53IkJXBFsRo=";
vendorHash = "sha256-yQ3anfZ/PU0M0KHiXqA9Ri8zFkg1nTYIk43jmcdGZYU=";

# Fix for 'nix run' trying to execute 'go-waku'.
meta = { mainProgram = "waku"; };
Expand Down
1 change: 1 addition & 0 deletions examples/basic-light-client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ require (
github.com/ipfs/go-log/v2 v2.5.1 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jellydator/ttlcache/v3 v3.3.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions examples/basic-light-client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,8 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc=
github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
Expand Down
1 change: 1 addition & 0 deletions examples/basic-relay/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ require (
github.com/ipfs/go-log/v2 v2.5.1 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jellydator/ttlcache/v3 v3.3.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions examples/basic-relay/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,8 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc=
github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
Expand Down
2 changes: 1 addition & 1 deletion examples/chat2-reliable/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node.
PubsubTopic: relay.DefaultWakuTopic,
ContentTopics: protocol.NewContentTopicSet(options.ContentTopic),
}
var filterOpt filter.FilterSubscribeOption
var filterOpt filter.SubscribeOption
peerID, err := options.Filter.NodePeerID()
if err != nil {
filterOpt = filter.WithAutomaticPeerSelection()
Expand Down
2 changes: 1 addition & 1 deletion examples/chat2/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node.
PubsubTopic: relay.DefaultWakuTopic,
ContentTopics: protocol.NewContentTopicSet(options.ContentTopic),
}
var filterOpt filter.FilterSubscribeOption
var filterOpt filter.SubscribeOption
peerID, err := options.Filter.NodePeerID()
if err != nil {
filterOpt = filter.WithAutomaticPeerSelection()
Expand Down
1 change: 1 addition & 0 deletions examples/chat2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ require (
github.com/ipfs/go-cid v0.4.1 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jellydator/ttlcache/v3 v3.3.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions examples/chat2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,8 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc=
github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
Expand Down
1 change: 1 addition & 0 deletions examples/filter2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ require (
github.com/ipfs/go-cid v0.4.1 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jellydator/ttlcache/v3 v3.3.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions examples/filter2/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,8 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc=
github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
Expand Down
1 change: 1 addition & 0 deletions examples/noise/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ require (
github.com/ipfs/go-cid v0.4.1 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jellydator/ttlcache/v3 v3.3.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions examples/noise/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,8 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc=
github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
Expand Down
1 change: 1 addition & 0 deletions examples/rln/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ require (
github.com/ipfs/go-log/v2 v2.5.1 // indirect
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
github.com/jellydator/ttlcache/v3 v3.3.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
Expand Down
2 changes: 2 additions & 0 deletions examples/rln/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,8 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc=
github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ require (
github.com/dustin/go-humanize v1.0.1
github.com/go-chi/chi/v5 v5.0.0
github.com/jackc/pgx/v5 v5.4.1
github.com/jellydator/ttlcache/v3 v3.3.0
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0
github.com/waku-org/go-noise v0.0.4
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -949,6 +949,8 @@ github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0
github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc=
github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
Expand Down
6 changes: 3 additions & 3 deletions library/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func FilterSubscribe(instance *WakuInstance, filterJSON string, peerID string, m
ctx = instance.ctx
}

var fOptions []filter.FilterSubscribeOption
var fOptions []filter.SubscribeOption
if peerID != "" {
p, err := peer.Decode(peerID)
if err != nil {
Expand Down Expand Up @@ -141,7 +141,7 @@ func FilterUnsubscribe(instance *WakuInstance, filterJSON string, peerID string,
ctx = instance.ctx
}

var fOptions []filter.FilterSubscribeOption
var fOptions []filter.SubscribeOption
if peerID != "" {
p, err := peer.Decode(peerID)
if err != nil {
Expand Down Expand Up @@ -185,7 +185,7 @@ func FilterUnsubscribeAll(instance *WakuInstance, peerID string, ms int) (string
ctx = instance.ctx
}

var fOptions []filter.FilterSubscribeOption
var fOptions []filter.SubscribeOption
if peerID != "" {
p, err := peer.Decode(peerID)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/api/filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func possibleRecursiveError(err error) bool {

func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int, peersToExclude ...peer.ID) ([]*subscription.SubscriptionDetails, error) {
// Low-level subscribe, returns a set of SubscriptionDetails
options := make([]filter.FilterSubscribeOption, 0)
options := make([]filter.SubscribeOption, 0)
options = append(options, filter.WithMaxPeersPerContentFilter(int(peerCount)))
for _, p := range apiSub.Config.Peers {
options = append(options, filter.WithPeer(p))
Expand Down
4 changes: 2 additions & 2 deletions waku/v2/node/wakuoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type WakuNodeParameters struct {
enableRelay bool
enableFilterLightNode bool
enableFilterFullNode bool
filterOpts []filter.Option
filterOpts []filter.FullNodeOption
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
pubsubOpts []pubsub.Option
lightpushOpts []lightpush.Option

Expand Down Expand Up @@ -471,7 +471,7 @@ func WithWakuFilterLightNode() WakuNodeOption {

// WithWakuFilterFullNode enables the Waku Filter V2 protocol full node functionality.
// This WakuNodeOption accepts a list of WakuFilter options to setup the protocol
func WithWakuFilterFullNode(filterOpts ...filter.Option) WakuNodeOption {
func WithWakuFilterFullNode(filterOpts ...filter.FullNodeOption) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableFilterFullNode = true
params.filterOpts = filterOpts
Expand Down
39 changes: 27 additions & 12 deletions waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type WakuFilterLightNode struct {
log *zap.Logger
subscriptions *subscription.SubscriptionsMap
pm *peermanager.PeerManager
limiter *utils.RateLimiter
peerPingInterval time.Duration
}

Expand Down Expand Up @@ -89,6 +90,7 @@ func NewWakuFilterLightNode(
onlineChecker onlinechecker.OnlineChecker,
reg prometheus.Registerer,
log *zap.Logger,
opts ...LightNodeOption,
) *WakuFilterLightNode {
wf := new(WakuFilterLightNode)
wf.log = log.Named("filterv2-lightnode")
Expand All @@ -99,6 +101,14 @@ func NewWakuFilterLightNode(
wf.CommonService = service.NewCommonService()
wf.metrics = newMetrics(reg)
wf.peerPingInterval = 1 * time.Minute

params := &LightNodeParameters{}
opts = append(DefaultLightNodeOptions(), opts...)
for _, opt := range opts {
opt(params)
}
wf.limiter = utils.NewRateLimiter(params.limitR, params.limitB)

return wf
}

Expand Down Expand Up @@ -155,6 +165,11 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea

logger := wf.log.With(logging.HostID("peerID", peerID))

if !wf.limiter.Allow(peerID) {
wf.metrics.RecordError(rateLimitFailure)
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
return
}

if !wf.subscriptions.IsSubscribedTo(peerID) {
logger.Warn("received message push from unknown peer", logging.HostID("peerID", peerID))
wf.metrics.RecordError(unknownPeerMessagePush)
Expand Down Expand Up @@ -287,7 +302,7 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte,

}

if filterSubscribeResponse.RequestId != request.RequestId {
if filterSubscribeResponse.RequestId != "N/A" && filterSubscribeResponse.RequestId != request.RequestId {
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
wf.log.Error("requestID mismatch", zap.String("expected", request.RequestId), zap.String("received", filterSubscribeResponse.RequestId))
wf.metrics.RecordError(requestIDMismatch)
err := NewFilterError(300, "request_id_mismatch")
Expand All @@ -303,8 +318,8 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte,
return nil
}

func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context, contentFilter protocol.ContentFilter, opts []FilterSubscribeOption) (*FilterSubscribeParameters, map[string][]string, error) {
params := new(FilterSubscribeParameters)
func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context, contentFilter protocol.ContentFilter, opts []SubscribeOption) (*SubscribeParameters, map[string][]string, error) {
params := new(SubscribeParameters)
params.log = wf.log
params.host = wf.h
params.pm = wf.pm
Expand Down Expand Up @@ -362,7 +377,7 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context,
// If contentTopics passed result in different pubSub topics (due to Auto/Static sharding), then multiple subscription requests are sent to the peer.
// This may change if Filterv2 protocol is updated to handle such a scenario in a single request.
// Note: In case of partial failure, results are returned for successful subscriptions along with error indicating failed contentTopics.
func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) ([]*subscription.SubscriptionDetails, error) {
func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...SubscribeOption) ([]*subscription.SubscriptionDetails, error) {
wf.RLock()
defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil {
Expand Down Expand Up @@ -464,8 +479,8 @@ func (wf *WakuFilterLightNode) FilterSubscription(peerID peer.ID, contentFilter
return wf.subscriptions.NewSubscription(peerID, contentFilter), nil
}

func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...FilterSubscribeOption) (*FilterSubscribeParameters, error) {
params := new(FilterSubscribeParameters)
func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...SubscribeOption) (*SubscribeParameters, error) {
params := new(SubscribeParameters)
params.log = wf.log
opts = append(DefaultUnsubscribeOptions(), opts...)
for _, opt := range opts {
Expand All @@ -477,14 +492,14 @@ func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...FilterSubscribeO
return params, nil
}

func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID, opts ...FilterPingOption) error {
func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID, opts ...PingOption) error {
wf.RLock()
defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil {
return err
}

params := &FilterPingParameters{}
params := &PingParameters{}
for _, opt := range opts {
opt(params)
}
Expand All @@ -501,7 +516,7 @@ func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID, opts ..
}

// Unsubscribe is used to stop receiving messages from specified peers for the content filter
func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) {
func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...SubscribeOption) (*WakuFilterPushResult, error) {
wf.RLock()
defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil {
Expand Down Expand Up @@ -606,7 +621,7 @@ func (wf *WakuFilterLightNode) IsListening(pubsubTopic, contentTopic string) boo
// If there are no more subscriptions matching the passed [peer, contentFilter] pair,
// server unsubscribe is also performed
func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, sub *subscription.SubscriptionDetails,
opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) {
opts ...SubscribeOption) (*WakuFilterPushResult, error) {
wf.RLock()
defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil {
Expand Down Expand Up @@ -653,7 +668,7 @@ func (wf *WakuFilterLightNode) unsubscribeFromServer(ctx context.Context, reques

// close all subscribe for selectedPeer or if selectedPeer == "", then all peers
// send the unsubscribeAll request to the peers
func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) {
func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...SubscribeOption) (*WakuFilterPushResult, error) {
params, err := wf.getUnsubscribeParameters(opts...)
if err != nil {
return nil, err
Expand Down Expand Up @@ -724,7 +739,7 @@ func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...Filte
}

// UnsubscribeAll is used to stop receiving messages from peer(s). It does not close subscriptions
func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) {
func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...SubscribeOption) (*WakuFilterPushResult, error) {
wf.RLock()
defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil {
Expand Down
Loading
Loading