Skip to content

Commit

Permalink
Merge pull request #943 from blacklanternsecurity/fix-dnszonetransfer…
Browse files Browse the repository at this point in the history
…-bug

Fix Event Loop Blocks
  • Loading branch information
TheTechromancer authored Jan 2, 2024
2 parents f0828e7 + ea767b1 commit fa0fbac
Show file tree
Hide file tree
Showing 9 changed files with 33 additions and 23 deletions.
2 changes: 2 additions & 0 deletions bbot/core/configurator/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ def parse_args(self, *args, **kwargs):
if ret.silent:
ret.yes = True
ret.modules = chain_lists(ret.modules)
ret.exclude_modules = chain_lists(ret.exclude_modules)
ret.output_modules = chain_lists(ret.output_modules)
ret.targets = chain_lists(ret.targets, try_files=True, msg="Reading targets from file: {filename}")
ret.whitelist = chain_lists(ret.whitelist, try_files=True, msg="Reading whitelist from file: {filename}")
ret.blacklist = chain_lists(ret.blacklist, try_files=True, msg="Reading blacklist from file: {filename}")
ret.flags = chain_lists(ret.flags)
ret.exclude_flags = chain_lists(ret.exclude_flags)
ret.require_flags = chain_lists(ret.require_flags)
for m in ret.modules:
if m not in module_choices and not self._dummy:
Expand Down
5 changes: 3 additions & 2 deletions bbot/core/helpers/dns.py
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ async def resolve_batch(self, queries, **kwargs):
batch_size = 250
for i in range(0, len(queries), batch_size):
batch = queries[i : i + batch_size]
tasks = [self._resolve_batch_coro_wrapper(q, **kwargs) for q in batch]
tasks = [asyncio.create_task(self._resolve_batch_coro_wrapper(q, **kwargs)) for q in batch]
async for task in as_completed(tasks):
yield await task

Expand Down Expand Up @@ -958,7 +958,8 @@ async def is_wildcard_domain(self, domain, log_info=False):
# continue
for _ in range(self.wildcard_tests):
rand_query = f"{rand_string(digits=False, length=10)}.{host}"
wildcard_tasks[rdtype].append(self.resolve(rand_query, type=rdtype, use_cache=False))
wildcard_task = asyncio.create_task(self.resolve(rand_query, type=rdtype, use_cache=False))
wildcard_tasks[rdtype].append(wildcard_task)

# combine the random results
is_wildcard = False
Expand Down
6 changes: 5 additions & 1 deletion bbot/core/helpers/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,11 @@ def clean_url(url: str):
return parsed


def collapse_urls(urls, threshold=10):
def collapse_urls(*args, **kwargs):
return list(_collapse_urls(*args, **kwargs))


def _collapse_urls(urls, threshold=10):
"""
Collapses a list of URLs by deduping similar URLs based on a hashing mechanism.
Useful for cleaning large lists of noisy URLs, such as those retrieved from wayback.
Expand Down
13 changes: 9 additions & 4 deletions bbot/modules/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,8 @@ async def _handle_batch(self):
self.debug(f"Handling batch of {len(events):,} events")
submitted = True
async with self.scan._acatch(f"{self.name}.handle_batch()"):
await self.handle_batch(*events)
handle_batch_task = asyncio.create_task(self.handle_batch(*events))
await handle_batch_task
self.debug(f"Finished handling batch of {len(events):,} events")
if finish:
context = f"{self.name}.finish()"
Expand Down Expand Up @@ -532,7 +533,8 @@ async def _setup(self):
status = False
self.debug(f"Setting up module {self.name}")
try:
result = await self.setup()
setup_task = asyncio.create_task(self.setup())
result = await setup_task
if type(result) == tuple and len(result) == 2:
status, msg = result
else:
Expand Down Expand Up @@ -599,13 +601,16 @@ async def _worker(self):
if event.type == "FINISHED":
context = f"{self.name}.finish()"
async with self.scan._acatch(context), self._task_counter.count(context):
await self.finish()
finish_task = asyncio.create_task(self.finish())
await finish_task
else:
context = f"{self.name}.handle_event({event})"
self.scan.stats.event_consumed(event, self)
self.debug(f"Handling {event}")
async with self.scan._acatch(context), self._task_counter.count(context):
await self.handle_event(event)
task_name = f"{self.name}.handle_event({event})"
handle_event_task = asyncio.create_task(self.handle_event(event), name=task_name)
await handle_event_task
self.debug(f"Finished handling {event}")
else:
self.debug(f"Not accepting {event} because {reason}")
Expand Down
2 changes: 1 addition & 1 deletion bbot/modules/bevigil.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async def handle_event(self, event):
if self.urls:
urls = await self.query(query, request_fn=self.request_urls, parse_fn=self.parse_urls)
if urls:
for parsed_url in self.helpers.validators.collapse_urls(urls):
for parsed_url in await self.scan.run_in_executor(self.helpers.validators.collapse_urls, urls):
self.emit_event(parsed_url.geturl(), "URL_UNVERIFIED", source=event)

async def request_subdomains(self, query):
Expand Down
10 changes: 6 additions & 4 deletions bbot/modules/dnszonetransfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,7 @@ async def handle_event(self, event):
break
try:
self.debug(f"Attempting zone transfer against {nameserver} for domain {domain}")
xfr_answer = await self.scan.run_in_executor(
dns.query.xfr, nameserver, domain, timeout=self.timeout, lifetime=self.timeout
)
zone = dns.zone.from_xfr(xfr_answer)
zone = await self.scan.run_in_executor(self.zone_transfer, nameserver, domain)
except Exception as e:
self.debug(f"Error retrieving zone for {domain}: {e}")
continue
Expand All @@ -64,3 +61,8 @@ async def handle_event(self, event):
self.emit_event(child_event)
else:
self.debug(f"No data returned by {nameserver} for domain {domain}")

def zone_transfer(self, nameserver, domain):
xfr_answer = dns.query.xfr(nameserver, domain, timeout=self.timeout, lifetime=self.timeout)
zone = dns.zone.from_xfr(xfr_answer)
return zone
10 changes: 1 addition & 9 deletions bbot/modules/wayback.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ async def query(self, query):
dns_names = set()
collapsed_urls = 0
start_time = datetime.now()
parsed_urls = await self.scan.run_in_executor_mp(
self.execute_callback,
parsed_urls = await self.scan.run_in_executor(
self.helpers.validators.collapse_urls,
urls,
threshold=self.garbage_threshold,
Expand All @@ -76,10 +75,3 @@ async def query(self, query):
duration = self.helpers.human_timedelta(end_time - start_time)
self.verbose(f"Collapsed {len(urls):,} -> {collapsed_urls:,} URLs in {duration}")
return results

@staticmethod
def execute_callback(callback, *args, **kwargs):
"""
This exists so that we can run our URL parsing logic in a separate process.
"""
return list(callback(*args, **kwargs))
3 changes: 2 additions & 1 deletion bbot/scanner/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,8 @@ async def _worker_loop(self):
except asyncio.queues.QueueEmpty:
await asyncio.sleep(0.1)
continue
await self.emit_event(event, **kwargs)
emit_event_task = asyncio.create_task(self.emit_event(event, **kwargs), name=f"emit_event({event})")
await emit_event_task

except Exception:
log.critical(traceback.format_exc())
Expand Down
5 changes: 4 additions & 1 deletion bbot/scanner/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,10 @@ def __init__(
mp.set_start_method("spawn")
except Exception:
self.warning(f"Failed to set multiprocessing spawn method. This may negatively affect performance.")
self.process_pool = ProcessPoolExecutor()
# we spawn 1 fewer processes than cores
# this helps to avoid locking up the system or competing with the main python process for cpu time
num_processes = max(1, mp.cpu_count() - 1)
self.process_pool = ProcessPoolExecutor(max_workers=num_processes)

self._stopping = False

Expand Down

0 comments on commit fa0fbac

Please sign in to comment.