Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: filter network change #1270

Merged
merged 1 commit into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 88 additions & 20 deletions waku/v2/api/filter/filter_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
// filterSubscriptions is the map of filter subscription IDs to subscriptions

const filterSubBatchSize = 90
const initNetworkConnType = 255

type appFilterMap map[string]filterConfig

Expand All @@ -43,6 +44,7 @@ type FilterManager struct {
filterConfigs appFilterMap // map of application filterID to {aggregatedFilterID, application ContentFilter}
waitingToSubQueue chan filterConfig
envProcessor EnevelopeProcessor
networkConnType byte
}

type SubDetails struct {
Expand Down Expand Up @@ -76,6 +78,7 @@ func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter
mgr.incompleteFilterBatch = make(map[string]filterConfig)
mgr.filterConfigs = make(appFilterMap)
mgr.waitingToSubQueue = make(chan filterConfig, 100)
mgr.networkConnType = initNetworkConnType

//parsing the subscribe params only to read the batchInterval passed.
mgr.params = new(subscribeParameters)
Expand Down Expand Up @@ -114,8 +117,8 @@ func (mgr *FilterManager) startFilterSubLoop() {
}
}

// addFilter method checks if there are existing waiting filters for the pubsubTopic to be subscribed and adds the new filter to the same batch
// once batchlimit is hit, all filters are subscribed to and new batch is created.
// SubscribeFilter method checks if there are existing waiting filters for the pubsubTopic to be subscribed and adds the new filter to the same batch
// once batch-limit is hit, all filters are subscribed to and new batch is created.
// if node is not online, then batch is pushed to a queue to be picked up later for subscription and new batch is created

func (mgr *FilterManager) SubscribeFilter(filterID string, cf protocol.ContentFilter) {
Expand Down Expand Up @@ -182,37 +185,102 @@ func (mgr *FilterManager) NetworkChange() {
mgr.node.PingPeers() // ping all peers to check if subscriptions are alive
}

func (mgr *FilterManager) checkAndProcessQueue(pubsubTopic string) {
if len(mgr.waitingToSubQueue) > 0 {
for af := range mgr.waitingToSubQueue {
// TODO: change the below logic once topic specific health is implemented for lightClients
if pubsubTopic == "" || pubsubTopic == af.contentFilter.PubsubTopic {
// check if any filter subs are pending and subscribe them
mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", af.ID), zap.Stringer("content-filter", af.contentFilter))
go mgr.subscribeAndRunLoop(af)
} else {
mgr.waitingToSubQueue <- af
}
if len(mgr.waitingToSubQueue) == 0 {
mgr.logger.Debug("no pending subscriptions")
break
}
}
}
}

func (mgr *FilterManager) closeAndWait(wg *sync.WaitGroup, asub *SubDetails) {
defer wg.Done()
asub.cancel()
for {
env, ok := <-asub.sub.DataCh
if !ok {
mgr.logger.Debug("unsubscribed filter", zap.Strings("content-topics", asub.sub.ContentFilter.ContentTopics.ToList()))
return
}
// process any in-flight envelopes
err := mgr.envProcessor.OnNewEnvelope(env)
if err != nil {
mgr.logger.Error("invoking onNewEnvelopes error", zap.Error(err))
}
}
}

func (mgr *FilterManager) resubscribeAllSubscriptions() {
filterSubsCount := len(mgr.filterSubscriptions)
mgr.Lock()
mgr.logger.Debug("unsubscribing all filter subscriptions", zap.Int("subs-count", filterSubsCount))
var wg sync.WaitGroup
wg.Add(len(mgr.filterSubscriptions))

for _, asub := range mgr.filterSubscriptions {
go mgr.closeAndWait(&wg, &asub)
}
mgr.filterSubscriptions = make(map[string]SubDetails)
kaichaosun marked this conversation as resolved.
Show resolved Hide resolved

mgr.Unlock()

wg.Wait() //Waiting till all unsubs are done to avoid race between sub and unsub

mgr.logger.Debug("unsubscribed all filter subscriptions", zap.Int("subs-count", filterSubsCount))

// locking to protect filterConfigs map, can't lock while calling subscribe as same lock is acquired inside subscribe
mgr.Lock()
localMap := make(appFilterMap)
for filterID, config := range mgr.filterConfigs {
localMap[filterID] = config
}
mgr.Unlock()

for filterID, config := range localMap {
mgr.SubscribeFilter(filterID, config.contentFilter)
}

}

// OnConnectionStatusChange to be triggered when connection status change is detected either from offline to online or vice-versa
// Note that pubsubTopic specific change can be triggered by specifying pubsubTopic,
// if pubsubTopic is empty it indicates complete connection status change such as node went offline or came back online.
func (mgr *FilterManager) OnConnectionStatusChange(pubsubTopic string, newStatus bool) {
func (mgr *FilterManager) OnConnectionStatusChange(pubsubTopic string, newStatus bool, connType byte) {
chaitanyaprem marked this conversation as resolved.
Show resolved Hide resolved
subs := mgr.node.Subscriptions()
mgr.logger.Debug("inside on connection status change", zap.Bool("new-status", newStatus),
zap.Int("agg filters count", len(mgr.filterSubscriptions)), zap.Int("filter subs count", len(subs)))
/*
Checking for initialization condition because when filterManager is initialized networkConnType is set to 255 and when first time node goes online
the network conn type will be set and will trigger resubscribe which is not desired.
Change in connType refers to scenario where the localnode's network has changed e.g: a mobile switching between wifi and cellular,
this in-turn means ip address of the localnode has changed.
this can cause issues in filter-push where it never recovers and hence resubscribing all filters
*/
if mgr.networkConnType != initNetworkConnType &&
mgr.networkConnType != connType { //
// resubscribe all existing filters
go mgr.resubscribeAllSubscriptions()
}
if newStatus && !mgr.onlineChecker.IsOnline() { // switched from offline to Online
mgr.onlineChecker.SetOnline(newStatus)
mgr.NetworkChange()
mgr.logger.Debug("switching from offline to online")
mgr.Lock()
if len(mgr.waitingToSubQueue) > 0 {
for af := range mgr.waitingToSubQueue {
// TODO: change the below logic once topic specific health is implemented for lightClients
if pubsubTopic == "" || pubsubTopic == af.contentFilter.PubsubTopic {
// check if any filter subs are pending and subscribe them
mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", af.ID), zap.Stringer("content-filter", af.contentFilter))
go mgr.subscribeAndRunLoop(af)
} else {
mgr.waitingToSubQueue <- af
}
if len(mgr.waitingToSubQueue) == 0 {
mgr.logger.Debug("no pending subscriptions")
break
}
}
}
mgr.checkAndProcessQueue(pubsubTopic)
mgr.Unlock()
}

mgr.networkConnType = connType
mgr.onlineChecker.SetOnline(newStatus)
}

Expand Down
4 changes: 2 additions & 2 deletions waku/v2/api/filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ func (s *FilterApiTestSuite) TestFilterManager() {
// Mock peers going down
s.LightNodeHost.Peerstore().RemovePeer(s.FullNodeHost.ID())

fm.OnConnectionStatusChange("", false)
fm.OnConnectionStatusChange("", false, 0)
time.Sleep(2 * time.Second)
fm.OnConnectionStatusChange("", true)
fm.OnConnectionStatusChange("", true, 0)
s.ConnectToFullNode(s.LightNode, s.FullNode)
time.Sleep(3 * time.Second)

Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea
}

if !wf.subscriptions.IsSubscribedTo(peerID) {
logger.Warn("received message push from unknown peer", logging.HostID("peerID", peerID))
logger.Warn("received message push from unknown peer")
wf.metrics.RecordError(unknownPeerMessagePush)
//Send a wildcard unsubscribe to this peer so that further requests are not forwarded to us
if err := stream.Reset(); err != nil {
Expand Down
Loading