Skip to content

Commit

Permalink
Merge pull request #1014 from blacklanternsecurity/qsize-tlc
Browse files Browse the repository at this point in the history
Smooth network spikes, improve memory efficiency
  • Loading branch information
TheTechromancer authored Jan 22, 2024
2 parents 027c0ed + 3778ff9 commit a623f5e
Show file tree
Hide file tree
Showing 76 changed files with 288 additions and 235 deletions.
19 changes: 10 additions & 9 deletions bbot/core/helpers/async_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,33 +57,34 @@ def __init__(self):

@property
def value(self):
return len(self.tasks)
return sum([t.n for t in self.tasks.values()])

def count(self, task_name, _log=True):
def count(self, task_name, n=1, _log=True):
if callable(task_name):
task_name = f"{task_name.__qualname__}()"
return self.Task(self, task_name, _log)
return self.Task(self, task_name, n=n, _log=_log)

class Task:
def __init__(self, manager, task_name, _log=True):
def __init__(self, manager, task_name, n=1, _log=True):
self.manager = manager
self.task_name = task_name
self.task_id = None
self.start_time = None
self.log = _log
self.n = n

async def __aenter__(self):
self.task_id = uuid.uuid4() # generate a unique ID for the task
self.task_id = uuid.uuid4()
# if self.log:
# log.trace(f"Starting task {self.task_name} ({self.task_id})")
async with self.manager.lock: # acquire the lock
async with self.manager.lock:
self.start_time = datetime.now()
self.manager.tasks[self.task_id] = self
return self.task_id # this will be passed as 'task_id' to __aexit__
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
async with self.manager.lock: # acquire the lock
self.manager.tasks.pop(self.task_id, None) # remove only current task
async with self.manager.lock:
self.manager.tasks.pop(self.task_id, None)
# if self.log:
# log.trace(f"Finished task {self.task_name} ({self.task_id})")

Expand Down
4 changes: 2 additions & 2 deletions bbot/core/helpers/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ def __init__(self, config, scan=None):
# cloud helpers
self.cloud = CloudHelper(self)

def interactsh(self):
return Interactsh(self)
def interactsh(self, *args, **kwargs):
return Interactsh(self, *args, **kwargs)

def http_compare(self, url, allow_redirects=False, include_cache_buster=True):
return HttpCompare(url, self, allow_redirects=allow_redirects, include_cache_buster=include_cache_buster)
Expand Down
5 changes: 3 additions & 2 deletions bbot/core/helpers/interactsh.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,13 @@ class Interactsh:
```
"""

def __init__(self, parent_helper):
def __init__(self, parent_helper, poll_interval=10):
self.parent_helper = parent_helper
self.server = None
self.correlation_id = None
self.custom_server = self.parent_helper.config.get("interactsh_server", None)
self.token = self.parent_helper.config.get("interactsh_token", None)
self.poll_interval = poll_interval
self._poll_task = None

async def register(self, callback=None):
Expand Down Expand Up @@ -279,7 +280,7 @@ async def _poll_loop(self, callback):
log.warning(e)
log.trace(traceback.format_exc())
if not data_list:
await asyncio.sleep(10)
await asyncio.sleep(self.poll_interval)
continue
for data in data_list:
if data:
Expand Down
4 changes: 4 additions & 0 deletions bbot/core/helpers/names_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@
"ashley",
"audrey",
"austin",
"azathoth",
"baggins",
"bailey",
"barbara",
Expand Down Expand Up @@ -347,8 +348,10 @@
"courtney",
"craig",
"crystal",
"cthulu",
"curtis",
"cynthia",
"dagon",
"dale",
"dandelion",
"daniel",
Expand Down Expand Up @@ -554,6 +557,7 @@
"noah",
"norma",
"norman",
"nyarlathotep",
"obama",
"olivia",
"padme",
Expand Down
4 changes: 2 additions & 2 deletions bbot/modules/ajaxpro.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async def handle_event(self, event):
probe_confirm = await self.helpers.request(f"{event.data}a/whatever.ashx")
if probe_confirm:
if probe_confirm.status_code != 200:
self.emit_event(
await self.emit_event(
{
"host": str(event.host),
"url": event.data,
Expand All @@ -40,7 +40,7 @@ async def handle_event(self, event):
ajaxpro_regex_result = self.ajaxpro_regex.search(resp_body)
if ajaxpro_regex_result:
ajax_pro_path = ajaxpro_regex_result.group(0)
self.emit_event(
await self.emit_event(
{
"host": str(event.host),
"url": event.data["url"],
Expand Down
2 changes: 1 addition & 1 deletion bbot/modules/azure_realm.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ async def handle_event(self, event):
auth_url, "URL_UNVERIFIED", source=event, tags=["affiliate", "ms-auth-url"]
)
url_event.source_domain = domain
self.emit_event(url_event)
await self.emit_event(url_event)

async def getuserrealm(self, domain):
url = f"https://login.microsoftonline.com/getuserrealm.srf?login=test@{domain}"
Expand Down
4 changes: 2 additions & 2 deletions bbot/modules/azure_tenant.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async def handle_event(self, event):
self.verbose(f'Found {len(domains):,} domains under tenant for "{query}": {", ".join(sorted(domains))}')
for domain in domains:
if domain != query:
self.emit_event(domain, "DNS_NAME", source=event, tags=["affiliate", "azure-tenant"])
await self.emit_event(domain, "DNS_NAME", source=event, tags=["affiliate", "azure-tenant"])
# tenant names
if domain.lower().endswith(".onmicrosoft.com"):
tenantname = domain.split(".")[0].lower()
Expand All @@ -44,7 +44,7 @@ async def handle_event(self, event):
event_data = {"tenant-names": sorted(tenant_names), "domains": sorted(domains)}
if tenant_id is not None:
event_data["tenant-id"] = tenant_id
self.emit_event(event_data, "AZURE_TENANT", source=event)
await self.emit_event(event_data, "AZURE_TENANT", source=event)

async def query(self, domain):
url = f"{self.base_url}/autodiscover/autodiscover.svc"
Expand Down
6 changes: 3 additions & 3 deletions bbot/modules/badsecrets.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ async def handle_event(self, event):
"url": event.data["url"],
"host": str(event.host),
}
self.emit_event(data, "VULNERABILITY", event)
await self.emit_event(data, "VULNERABILITY", event)
elif r["type"] == "IdentifyOnly":
# There is little value to presenting a non-vulnerable asp.net viewstate, as it is not crackable without a Matrioshka brain. Just emit a technology instead.
if r["detecting_module"] == "ASPNET_Viewstate":
self.emit_event(
await self.emit_event(
{"technology": "microsoft asp.net", "url": event.data["url"], "host": str(event.host)},
"TECHNOLOGY",
event,
Expand All @@ -67,4 +67,4 @@ async def handle_event(self, event):
"url": event.data["url"],
"host": str(event.host),
}
self.emit_event(data, "FINDING", event)
await self.emit_event(data, "FINDING", event)
43 changes: 12 additions & 31 deletions bbot/modules/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class BaseModule:

_preserve_graph = False
_stats_exclude = False
_qsize = 0
_qsize = 100
_priority = 3
_name = "base"
_type = "scan"
Expand Down Expand Up @@ -175,7 +175,7 @@ async def handle_event(self, event):
"""
pass

def handle_batch(self, *events):
async def handle_batch(self, *events):
"""Handles incoming events in batches for optimized processing.
This method is automatically called when multiple events that match any in `watched_events` are encountered and the `batch_size` attribute is set to a value greater than 1. Override this method to implement custom batch event-handling logic for your module.
Expand Down Expand Up @@ -350,13 +350,14 @@ async def _handle_batch(self):
- If a "FINISHED" event is found, invokes 'finish()' method of the module.
"""
finish = False
async with self._task_counter.count(f"{self.name}.handle_batch()"):
async with self._task_counter.count(f"{self.name}.handle_batch()") as counter:
submitted = False
if self.batch_size <= 1:
return
if self.num_incoming_events > 0:
events, finish = await self._events_waiting()
if events and not self.errored:
counter.n = len(events)
self.debug(f"Handling batch of {len(events):,} events")
submitted = True
async with self.scan._acatch(f"{self.name}.handle_batch()"):
Expand All @@ -381,7 +382,7 @@ def make_event(self, *args, **kwargs):
Examples:
>>> new_event = self.make_event("1.2.3.4", source=event)
>>> self.emit_event(new_event)
>>> await self.emit_event(new_event)
Returns:
Event or None: The created event, or None if a validation error occurred and raise_error was False.
Expand All @@ -401,7 +402,7 @@ def make_event(self, *args, **kwargs):
event.module = self
return event

def emit_event(self, *args, **kwargs):
async def emit_event(self, *args, **kwargs):
"""Emit an event to the event queue and distribute it to interested modules.
This is how modules "return" data.
Expand All @@ -419,10 +420,10 @@ def emit_event(self, *args, **kwargs):
```
Examples:
>>> self.emit_event("www.evilcorp.com", source=event, tags=["affiliate"])
>>> await self.emit_event("www.evilcorp.com", source=event, tags=["affiliate"])
>>> new_event = self.make_event("1.2.3.4", source=event)
>>> self.emit_event(new_event)
>>> await self.emit_event(new_event)
Returns:
None
Expand All @@ -438,27 +439,7 @@ def emit_event(self, *args, **kwargs):
emit_kwargs[o] = v
event = self.make_event(*args, **event_kwargs)
if event:
self.queue_outgoing_event(event, **emit_kwargs)

async def emit_event_wait(self, *args, **kwargs):
"""Emit an event to the event queue and await until there is space in the outgoing queue.
This method is similar to `emit_event`, but it waits until there's sufficient space in the outgoing
event queue before emitting the event. It utilizes the queue size threshold defined in `self._qsize`.
Args:
*args: Positional arguments to be passed to `emit_event()` for event creation.
**kwargs: Keyword arguments to be passed to `emit_event()` for event creation or configuration.
Returns:
None
See Also:
emit_event: For emitting an event without waiting on the queue size.
"""
while self.outgoing_event_queue.qsize() > self._qsize:
await self.helpers.sleep(0.2)
return self.emit_event(*args, **kwargs)
await self.queue_outgoing_event(event, **emit_kwargs)

async def _events_waiting(self):
"""
Expand Down Expand Up @@ -808,7 +789,7 @@ async def queue_event(self, event, precheck=True):
except AttributeError:
self.debug(f"Not in an acceptable state to queue incoming event")

def queue_outgoing_event(self, event, **kwargs):
async def queue_outgoing_event(self, event, **kwargs):
"""
Queues an outgoing event to the module's outgoing event queue for further processing.
Expand All @@ -829,7 +810,7 @@ def queue_outgoing_event(self, event, **kwargs):
AttributeError: If the module is not in an acceptable state to queue outgoing events.
"""
try:
self.outgoing_event_queue.put_nowait((event, kwargs))
await self.outgoing_event_queue.put((event, kwargs))
except AttributeError:
self.debug(f"Not in an acceptable state to queue outgoing event")

Expand Down Expand Up @@ -1076,7 +1057,7 @@ def incoming_event_queue(self):
@property
def outgoing_event_queue(self):
if self._outgoing_event_queue is None:
self._outgoing_event_queue = ShuffleQueue()
self._outgoing_event_queue = ShuffleQueue(self._qsize)
return self._outgoing_event_queue

@property
Expand Down
4 changes: 2 additions & 2 deletions bbot/modules/bevigil.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ async def handle_event(self, event):
subdomains = await self.query(query, request_fn=self.request_subdomains, parse_fn=self.parse_subdomains)
if subdomains:
for subdomain in subdomains:
self.emit_event(subdomain, "DNS_NAME", source=event)
await self.emit_event(subdomain, "DNS_NAME", source=event)

if self.urls:
urls = await self.query(query, request_fn=self.request_urls, parse_fn=self.parse_urls)
if urls:
for parsed_url in await self.scan.run_in_executor(self.helpers.validators.collapse_urls, urls):
self.emit_event(parsed_url.geturl(), "URL_UNVERIFIED", source=event)
await self.emit_event(parsed_url.geturl(), "URL_UNVERIFIED", source=event)

async def request_subdomains(self, query):
url = f"{self.base_url}/{self.helpers.quote(query)}/subdomains/"
Expand Down
2 changes: 1 addition & 1 deletion bbot/modules/bucket_file_enum.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async def handle_aws(self, event):
bucket_file = url + "/" + key
file_extension = self.helpers.get_file_extension(key)
if file_extension not in self.scan.url_extension_blacklist:
self.emit_event(bucket_file, "URL_UNVERIFIED", source=event, tags="filedownload")
await self.emit_event(bucket_file, "URL_UNVERIFIED", source=event, tags="filedownload")
urls_emitted += 1
if urls_emitted >= self.file_limit:
return
4 changes: 2 additions & 2 deletions bbot/modules/builtwith.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ async def handle_event(self, event):
if subdomains:
for s in subdomains:
if s != event:
self.emit_event(s, "DNS_NAME", source=event)
await self.emit_event(s, "DNS_NAME", source=event)
# redirects
if self.config.get("redirects", True):
redirects = await self.query(query, parse_fn=self.parse_redirects, request_fn=self.request_redirects)
if redirects:
for r in redirects:
if r != event:
self.emit_event(r, "DNS_NAME", source=event, tags=["affiliate"])
await self.emit_event(r, "DNS_NAME", source=event, tags=["affiliate"])

async def request_domains(self, query):
url = f"{self.base_url}/v20/api.json?KEY={self.api_key}&LOOKUP={query}&NOMETA=yes&NOATTR=yes&HIDETEXT=yes&HIDEDL=yes"
Expand Down
4 changes: 2 additions & 2 deletions bbot/modules/bypass403.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ async def handle_event(self, event):
if results is None:
return
if len(results) > collapse_threshold:
self.emit_event(
await self.emit_event(
{
"description": f"403 Bypass MULTIPLE SIGNATURES (exceeded threshold {str(collapse_threshold)})",
"host": str(event.host),
Expand All @@ -143,7 +143,7 @@ async def handle_event(self, event):
)
else:
for description in results:
self.emit_event(
await self.emit_event(
{"description": description, "host": str(event.host), "url": event.data},
"FINDING",
source=event,
Expand Down
8 changes: 4 additions & 4 deletions bbot/modules/credshed.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ async def handle_event(self, event):

email_event = self.make_event(email, "EMAIL_ADDRESS", source=event, tags=tags)
if email_event is not None:
self.emit_event(email_event)
await self.emit_event(email_event)
if user and not self.already_seen(f"{email}:{user}"):
self.emit_event(user, "USERNAME", source=email_event, tags=tags)
await self.emit_event(user, "USERNAME", source=email_event, tags=tags)
if pw and not self.already_seen(f"{email}:{pw}"):
self.emit_event(pw, "PASSWORD", source=email_event, tags=tags)
await self.emit_event(pw, "PASSWORD", source=email_event, tags=tags)
for h_pw in hashes:
if h_pw and not self.already_seen(f"{email}:{h_pw}"):
self.emit_event(h_pw, "HASHED_PASSWORD", source=email_event, tags=tags)
await self.emit_event(h_pw, "HASHED_PASSWORD", source=email_event, tags=tags)
4 changes: 2 additions & 2 deletions bbot/modules/deadly/dastardly.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ async def handle_event(self, event):
for testcase in testsuite.testcases:
for failure in testcase.failures:
if failure.severity == "Info":
self.emit_event(
await self.emit_event(
{
"host": str(event.host),
"url": url,
Expand All @@ -70,7 +70,7 @@ async def handle_event(self, event):
event,
)
else:
self.emit_event(
await self.emit_event(
{
"severity": failure.severity,
"host": str(event.host),
Expand Down
2 changes: 1 addition & 1 deletion bbot/modules/deadly/ffuf.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ async def handle_event(self, event):

filters = await self.baseline_ffuf(fixed_url, exts=exts)
async for r in self.execute_ffuf(self.tempfile, fixed_url, exts=exts, filters=filters):
self.emit_event(r["url"], "URL_UNVERIFIED", source=event, tags=[f"status-{r['status']}"])
await self.emit_event(r["url"], "URL_UNVERIFIED", source=event, tags=[f"status-{r['status']}"])

async def filter_event(self, event):
if "endpoint" in event.tags:
Expand Down
Loading

0 comments on commit a623f5e

Please sign in to comment.