Skip to content

Commit

Permalink
fix queue bug by implementing shufflequeue
Browse files Browse the repository at this point in the history
  • Loading branch information
TheTechromancer committed Dec 15, 2023
1 parent ff158fe commit 73d309f
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 13 deletions.
10 changes: 10 additions & 0 deletions bbot/core/helpers/async_helpers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import uuid
import random
import asyncio
import logging
import threading
Expand All @@ -12,6 +13,15 @@
from .cache import CacheDict


class ShuffleQueue(asyncio.Queue):
def _put(self, item):
random_index = random.randint(0, self.qsize())
self._queue.insert(random_index, item)

def _get(self):
return self._queue.popleft()


class _Lock(asyncio.Lock):
def __init__(self, name):
self.name = name
Expand Down
6 changes: 3 additions & 3 deletions bbot/modules/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from ..core.helpers.misc import get_size # noqa
from ..core.errors import ValidationError
from ..core.helpers.async_helpers import TaskCounter
from ..core.helpers.async_helpers import TaskCounter, ShuffleQueue


class BaseModule:
Expand Down Expand Up @@ -1065,13 +1065,13 @@ def config(self):
@property
def incoming_event_queue(self):
if self._incoming_event_queue is None:
self._incoming_event_queue = asyncio.PriorityQueue()
self._incoming_event_queue = ShuffleQueue()
return self._incoming_event_queue

@property
def outgoing_event_queue(self):
if self._outgoing_event_queue is None:
self._outgoing_event_queue = asyncio.PriorityQueue()
self._outgoing_event_queue = ShuffleQueue()
return self._outgoing_event_queue

@property
Expand Down
13 changes: 3 additions & 10 deletions bbot/scanner/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from contextlib import suppress

from ..core.errors import ValidationError
from ..core.helpers.async_helpers import TaskCounter
from ..core.helpers.async_helpers import TaskCounter, ShuffleQueue

log = logging.getLogger("bbot.scanner.manager")

Expand All @@ -18,7 +18,7 @@ class ScanManager:
Attributes:
scan (Scan): Reference to the Scan object that instantiated the ScanManager.
incoming_event_queue (asyncio.PriorityQueue): Queue storing incoming events for processing.
incoming_event_queue (ShuffleQueue): Queue storing incoming events for processing.
events_distributed (set): Set tracking globally unique events.
events_accepted (set): Set tracking events accepted by individual modules.
dns_resolution (bool): Flag to enable or disable DNS resolution.
Expand All @@ -39,14 +39,7 @@ def __init__(self, scan):

self.scan = scan

# TODO: consider reworking modules' dedupe policy (accept_dupes)
# by creating a function that decides the criteria for what is
# considered to be a duplicate (by default this would be a simple
# hash(event)), but allowing each module to override it if needed.
# If a module used the default function, its dedupe could be done
# at the manager level to save memory. If not, it would be done by the scan.

self.incoming_event_queue = asyncio.PriorityQueue()
self.incoming_event_queue = ShuffleQueue()
# track incoming duplicates module-by-module (for `suppress_dupes` attribute of modules)
self.incoming_dup_tracker = set()
# track outgoing duplicates (for `accept_dupes` attribute of modules)
Expand Down

0 comments on commit 73d309f

Please sign in to comment.