Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Event Loop Blocks #943

Merged
merged 7 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bbot/core/configurator/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ def parse_args(self, *args, **kwargs):
if ret.silent:
ret.yes = True
ret.modules = chain_lists(ret.modules)
ret.exclude_modules = chain_lists(ret.exclude_modules)
ret.output_modules = chain_lists(ret.output_modules)
ret.targets = chain_lists(ret.targets, try_files=True, msg="Reading targets from file: {filename}")
ret.whitelist = chain_lists(ret.whitelist, try_files=True, msg="Reading whitelist from file: {filename}")
ret.blacklist = chain_lists(ret.blacklist, try_files=True, msg="Reading blacklist from file: {filename}")
ret.flags = chain_lists(ret.flags)
ret.exclude_flags = chain_lists(ret.exclude_flags)
ret.require_flags = chain_lists(ret.require_flags)
for m in ret.modules:
if m not in module_choices and not self._dummy:
Expand Down
5 changes: 3 additions & 2 deletions bbot/core/helpers/dns.py
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ async def resolve_batch(self, queries, **kwargs):
batch_size = 250
for i in range(0, len(queries), batch_size):
batch = queries[i : i + batch_size]
tasks = [self._resolve_batch_coro_wrapper(q, **kwargs) for q in batch]
tasks = [asyncio.create_task(self._resolve_batch_coro_wrapper(q, **kwargs)) for q in batch]
async for task in as_completed(tasks):
yield await task

Expand Down Expand Up @@ -958,7 +958,8 @@ async def is_wildcard_domain(self, domain, log_info=False):
# continue
for _ in range(self.wildcard_tests):
rand_query = f"{rand_string(digits=False, length=10)}.{host}"
wildcard_tasks[rdtype].append(self.resolve(rand_query, type=rdtype, use_cache=False))
wildcard_task = asyncio.create_task(self.resolve(rand_query, type=rdtype, use_cache=False))
wildcard_tasks[rdtype].append(wildcard_task)

# combine the random results
is_wildcard = False
Expand Down
6 changes: 5 additions & 1 deletion bbot/core/helpers/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,11 @@ def clean_url(url: str):
return parsed


def collapse_urls(urls, threshold=10):
def collapse_urls(*args, **kwargs):
return list(_collapse_urls(*args, **kwargs))


def _collapse_urls(urls, threshold=10):
"""
Collapses a list of URLs by deduping similar URLs based on a hashing mechanism.
Useful for cleaning large lists of noisy URLs, such as those retrieved from wayback.
Expand Down
13 changes: 9 additions & 4 deletions bbot/modules/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,8 @@ async def _handle_batch(self):
self.debug(f"Handling batch of {len(events):,} events")
submitted = True
async with self.scan._acatch(f"{self.name}.handle_batch()"):
await self.handle_batch(*events)
handle_batch_task = asyncio.create_task(self.handle_batch(*events))
await handle_batch_task
self.debug(f"Finished handling batch of {len(events):,} events")
if finish:
context = f"{self.name}.finish()"
Expand Down Expand Up @@ -532,7 +533,8 @@ async def _setup(self):
status = False
self.debug(f"Setting up module {self.name}")
try:
result = await self.setup()
setup_task = asyncio.create_task(self.setup())
result = await setup_task
if type(result) == tuple and len(result) == 2:
status, msg = result
else:
Expand Down Expand Up @@ -599,13 +601,16 @@ async def _worker(self):
if event.type == "FINISHED":
context = f"{self.name}.finish()"
async with self.scan._acatch(context), self._task_counter.count(context):
await self.finish()
finish_task = asyncio.create_task(self.finish())
await finish_task
else:
context = f"{self.name}.handle_event({event})"
self.scan.stats.event_consumed(event, self)
self.debug(f"Handling {event}")
async with self.scan._acatch(context), self._task_counter.count(context):
await self.handle_event(event)
task_name = f"{self.name}.handle_event({event})"
handle_event_task = asyncio.create_task(self.handle_event(event), name=task_name)
await handle_event_task
self.debug(f"Finished handling {event}")
else:
self.debug(f"Not accepting {event} because {reason}")
Expand Down
2 changes: 1 addition & 1 deletion bbot/modules/bevigil.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async def handle_event(self, event):
if self.urls:
urls = await self.query(query, request_fn=self.request_urls, parse_fn=self.parse_urls)
if urls:
for parsed_url in self.helpers.validators.collapse_urls(urls):
for parsed_url in await self.scan.run_in_executor(self.helpers.validators.collapse_urls, urls):
self.emit_event(parsed_url.geturl(), "URL_UNVERIFIED", source=event)

async def request_subdomains(self, query):
Expand Down
10 changes: 6 additions & 4 deletions bbot/modules/dnszonetransfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,7 @@ async def handle_event(self, event):
break
try:
self.debug(f"Attempting zone transfer against {nameserver} for domain {domain}")
xfr_answer = await self.scan.run_in_executor(
dns.query.xfr, nameserver, domain, timeout=self.timeout, lifetime=self.timeout
)
zone = dns.zone.from_xfr(xfr_answer)
zone = await self.scan.run_in_executor(self.zone_transfer, nameserver, domain)
except Exception as e:
self.debug(f"Error retrieving zone for {domain}: {e}")
continue
Expand All @@ -64,3 +61,8 @@ async def handle_event(self, event):
self.emit_event(child_event)
else:
self.debug(f"No data returned by {nameserver} for domain {domain}")

def zone_transfer(self, nameserver, domain):
xfr_answer = dns.query.xfr(nameserver, domain, timeout=self.timeout, lifetime=self.timeout)
zone = dns.zone.from_xfr(xfr_answer)
return zone
10 changes: 1 addition & 9 deletions bbot/modules/wayback.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ async def query(self, query):
dns_names = set()
collapsed_urls = 0
start_time = datetime.now()
parsed_urls = await self.scan.run_in_executor_mp(
self.execute_callback,
parsed_urls = await self.scan.run_in_executor(
self.helpers.validators.collapse_urls,
urls,
threshold=self.garbage_threshold,
Expand All @@ -76,10 +75,3 @@ async def query(self, query):
duration = self.helpers.human_timedelta(end_time - start_time)
self.verbose(f"Collapsed {len(urls):,} -> {collapsed_urls:,} URLs in {duration}")
return results

@staticmethod
def execute_callback(callback, *args, **kwargs):
"""
This exists so that we can run our URL parsing logic in a separate process.
"""
return list(callback(*args, **kwargs))
3 changes: 2 additions & 1 deletion bbot/scanner/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,8 @@ async def _worker_loop(self):
except asyncio.queues.QueueEmpty:
await asyncio.sleep(0.1)
continue
await self.emit_event(event, **kwargs)
emit_event_task = asyncio.create_task(self.emit_event(event, **kwargs), name=f"emit_event({event})")
await emit_event_task

except Exception:
log.critical(traceback.format_exc())
Expand Down
5 changes: 4 additions & 1 deletion bbot/scanner/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,10 @@ def __init__(
mp.set_start_method("spawn")
except Exception:
self.warning(f"Failed to set multiprocessing spawn method. This may negatively affect performance.")
self.process_pool = ProcessPoolExecutor()
# we spawn 1 fewer processes than cores
# this helps to avoid locking up the system or competing with the main python process for cpu time
num_processes = max(1, mp.cpu_count() - 1)
self.process_pool = ProcessPoolExecutor(max_workers=num_processes)

self._stopping = False

Expand Down