From 73d309f3434bb0dae29f558e0256a20e13f5e27d Mon Sep 17 00:00:00 2001 From: TheTechromancer Date: Fri, 15 Dec 2023 16:25:00 -0500 Subject: [PATCH] fix queue bug by implementing shufflequeue --- bbot/core/helpers/async_helpers.py | 10 ++++++++++ bbot/modules/base.py | 6 +++--- bbot/scanner/manager.py | 13 +++---------- 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/bbot/core/helpers/async_helpers.py b/bbot/core/helpers/async_helpers.py index f2a0548926..916764e53e 100644 --- a/bbot/core/helpers/async_helpers.py +++ b/bbot/core/helpers/async_helpers.py @@ -1,4 +1,5 @@ import uuid +import random import asyncio import logging import threading @@ -12,6 +13,15 @@ from .cache import CacheDict +class ShuffleQueue(asyncio.Queue): + def _put(self, item): + random_index = random.randint(0, self.qsize()) + self._queue.insert(random_index, item) + + def _get(self): + return self._queue.popleft() + + class _Lock(asyncio.Lock): def __init__(self, name): self.name = name diff --git a/bbot/modules/base.py b/bbot/modules/base.py index 6dee967456..7599289eac 100644 --- a/bbot/modules/base.py +++ b/bbot/modules/base.py @@ -6,7 +6,7 @@ from ..core.helpers.misc import get_size # noqa from ..core.errors import ValidationError -from ..core.helpers.async_helpers import TaskCounter +from ..core.helpers.async_helpers import TaskCounter, ShuffleQueue class BaseModule: @@ -1065,13 +1065,13 @@ def config(self): @property def incoming_event_queue(self): if self._incoming_event_queue is None: - self._incoming_event_queue = asyncio.PriorityQueue() + self._incoming_event_queue = ShuffleQueue() return self._incoming_event_queue @property def outgoing_event_queue(self): if self._outgoing_event_queue is None: - self._outgoing_event_queue = asyncio.PriorityQueue() + self._outgoing_event_queue = ShuffleQueue() return self._outgoing_event_queue @property diff --git a/bbot/scanner/manager.py b/bbot/scanner/manager.py index 13d46669d0..1258c9e80c 100644 --- a/bbot/scanner/manager.py +++ b/bbot/scanner/manager.py @@ -4,7 +4,7 @@ from contextlib import suppress from ..core.errors import ValidationError -from ..core.helpers.async_helpers import TaskCounter +from ..core.helpers.async_helpers import TaskCounter, ShuffleQueue log = logging.getLogger("bbot.scanner.manager") @@ -18,7 +18,7 @@ class ScanManager: Attributes: scan (Scan): Reference to the Scan object that instantiated the ScanManager. - incoming_event_queue (asyncio.PriorityQueue): Queue storing incoming events for processing. + incoming_event_queue (ShuffleQueue): Queue storing incoming events for processing. events_distributed (set): Set tracking globally unique events. events_accepted (set): Set tracking events accepted by individual modules. dns_resolution (bool): Flag to enable or disable DNS resolution. @@ -39,14 +39,7 @@ def __init__(self, scan): self.scan = scan - # TODO: consider reworking modules' dedupe policy (accept_dupes) - # by creating a function that decides the criteria for what is - # considered to be a duplicate (by default this would be a simple - # hash(event)), but allowing each module to override it if needed. - # If a module used the default function, its dedupe could be done - # at the manager level to save memory. If not, it would be done by the scan. - - self.incoming_event_queue = asyncio.PriorityQueue() + self.incoming_event_queue = ShuffleQueue() # track incoming duplicates module-by-module (for `suppress_dupes` attribute of modules) self.incoming_dup_tracker = set() # track outgoing duplicates (for `accept_dupes` attribute of modules)