From 93fa655e3eef6ba4ffc8b72e3a9280eb880cb089 Mon Sep 17 00:00:00 2001 From: TheTechromancer Date: Mon, 25 Dec 2023 18:28:10 -0500 Subject: [PATCH 1/7] fix dnszonetransfer bug --- bbot/core/configurator/args.py | 2 ++ bbot/modules/base.py | 6 ++++-- bbot/modules/dnszonetransfer.py | 10 ++++++---- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/bbot/core/configurator/args.py b/bbot/core/configurator/args.py index 795242911..173583827 100644 --- a/bbot/core/configurator/args.py +++ b/bbot/core/configurator/args.py @@ -29,11 +29,13 @@ def parse_args(self, *args, **kwargs): if ret.silent: ret.yes = True ret.modules = chain_lists(ret.modules) + ret.exclude_modules = chain_lists(ret.exclude_modules) ret.output_modules = chain_lists(ret.output_modules) ret.targets = chain_lists(ret.targets, try_files=True, msg="Reading targets from file: {filename}") ret.whitelist = chain_lists(ret.whitelist, try_files=True, msg="Reading whitelist from file: {filename}") ret.blacklist = chain_lists(ret.blacklist, try_files=True, msg="Reading blacklist from file: {filename}") ret.flags = chain_lists(ret.flags) + ret.exclude_flags = chain_lists(ret.exclude_flags) ret.require_flags = chain_lists(ret.require_flags) for m in ret.modules: if m not in module_choices and not self._dummy: diff --git a/bbot/modules/base.py b/bbot/modules/base.py index 660111d3f..6036c1463 100644 --- a/bbot/modules/base.py +++ b/bbot/modules/base.py @@ -357,7 +357,8 @@ async def _handle_batch(self): self.debug(f"Handling batch of {len(events):,} events") submitted = True async with self.scan._acatch(f"{self.name}.handle_batch()"): - await self.handle_batch(*events) + handle_batch_task = asyncio.create_task(self.handle_batch(*events)) + await handle_batch_task self.debug(f"Finished handling batch of {len(events):,} events") if finish: context = f"{self.name}.finish()" @@ -605,7 +606,8 @@ async def _worker(self): self.scan.stats.event_consumed(event, self) self.debug(f"Handling {event}") async with self.scan._acatch(context), self._task_counter.count(context): - await self.handle_event(event) + handle_event_task = asyncio.create_task(self.handle_event(event)) + await handle_event_task self.debug(f"Finished handling {event}") else: self.debug(f"Not accepting {event} because {reason}") diff --git a/bbot/modules/dnszonetransfer.py b/bbot/modules/dnszonetransfer.py index d950514b5..1ec5bf5eb 100644 --- a/bbot/modules/dnszonetransfer.py +++ b/bbot/modules/dnszonetransfer.py @@ -36,10 +36,7 @@ async def handle_event(self, event): break try: self.debug(f"Attempting zone transfer against {nameserver} for domain {domain}") - xfr_answer = await self.scan.run_in_executor( - dns.query.xfr, nameserver, domain, timeout=self.timeout, lifetime=self.timeout - ) - zone = dns.zone.from_xfr(xfr_answer) + zone = await self.scan.run_in_executor(self.zone_transfer, nameserver, domain) except Exception as e: self.debug(f"Error retrieving zone for {domain}: {e}") continue @@ -64,3 +61,8 @@ async def handle_event(self, event): self.emit_event(child_event) else: self.debug(f"No data returned by {nameserver} for domain {domain}") + + def zone_transfer(self, nameserver, domain): + xfr_answer = dns.query.xfr(nameserver, domain, timeout=self.timeout, lifetime=self.timeout) + zone = dns.zone.from_xfr(xfr_answer) + return zone From a1870de205e9777dc91f9d39f1f292a3be8dc952 Mon Sep 17 00:00:00 2001 From: TheTechromancer Date: Mon, 25 Dec 2023 18:55:22 -0500 Subject: [PATCH 2/7] clearer task creation for asyncio debugging --- bbot/modules/base.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/bbot/modules/base.py b/bbot/modules/base.py index 6036c1463..01299c216 100644 --- a/bbot/modules/base.py +++ b/bbot/modules/base.py @@ -533,7 +533,8 @@ async def _setup(self): status = False self.debug(f"Setting up module {self.name}") try: - result = await self.setup() + setup_task = asyncio.create_task(self.setup()) + result = await setup_task if type(result) == tuple and len(result) == 2: status, msg = result else: @@ -600,7 +601,8 @@ async def _worker(self): if event.type == "FINISHED": context = f"{self.name}.finish()" async with self.scan._acatch(context), self._task_counter.count(context): - await self.finish() + finish_task = asyncio.create_task(self.finish()) + await finish_task else: context = f"{self.name}.handle_event({event})" self.scan.stats.event_consumed(event, self) From 63d967ae4edc6c67a3f21464bc02572911abbcda Mon Sep 17 00:00:00 2001 From: TheTechromancer Date: Mon, 25 Dec 2023 20:00:26 -0500 Subject: [PATCH 3/7] don't use process executor if we don't have to --- bbot/core/helpers/validators.py | 6 +++++- bbot/modules/bevigil.py | 2 +- bbot/modules/wayback.py | 10 +--------- bbot/scanner/manager.py | 3 ++- 4 files changed, 9 insertions(+), 12 deletions(-) diff --git a/bbot/core/helpers/validators.py b/bbot/core/helpers/validators.py index 3b8dc77c0..0384a876e 100644 --- a/bbot/core/helpers/validators.py +++ b/bbot/core/helpers/validators.py @@ -207,7 +207,11 @@ def clean_url(url: str): return parsed -def collapse_urls(urls, threshold=10): +def collapse_urls(*args, **kwargs): + return list(_collapse_urls(*args, **kwargs)) + + +def _collapse_urls(urls, threshold=10): """ Collapses a list of URLs by deduping similar URLs based on a hashing mechanism. Useful for cleaning large lists of noisy URLs, such as those retrieved from wayback. diff --git a/bbot/modules/bevigil.py b/bbot/modules/bevigil.py index 0b54d40f4..e6c9990dd 100644 --- a/bbot/modules/bevigil.py +++ b/bbot/modules/bevigil.py @@ -34,7 +34,7 @@ async def handle_event(self, event): if self.urls: urls = await self.query(query, request_fn=self.request_urls, parse_fn=self.parse_urls) if urls: - for parsed_url in self.helpers.validators.collapse_urls(urls): + for parsed_url in await self.scan.run_in_executor(self.helpers.validators.collapse_urls, urls): self.emit_event(parsed_url.geturl(), "URL_UNVERIFIED", source=event) async def request_subdomains(self, query): diff --git a/bbot/modules/wayback.py b/bbot/modules/wayback.py index 92219d97c..4bec112bf 100644 --- a/bbot/modules/wayback.py +++ b/bbot/modules/wayback.py @@ -56,8 +56,7 @@ async def query(self, query): dns_names = set() collapsed_urls = 0 start_time = datetime.now() - parsed_urls = await self.scan.run_in_executor_mp( - self.execute_callback, + parsed_urls = await self.scan.run_in_executor( self.helpers.validators.collapse_urls, urls, threshold=self.garbage_threshold, @@ -76,10 +75,3 @@ async def query(self, query): duration = self.helpers.human_timedelta(end_time - start_time) self.verbose(f"Collapsed {len(urls):,} -> {collapsed_urls:,} URLs in {duration}") return results - - @staticmethod - def execute_callback(callback, *args, **kwargs): - """ - This exists so that we can run our URL parsing logic in a separate process. - """ - return list(callback(*args, **kwargs)) diff --git a/bbot/scanner/manager.py b/bbot/scanner/manager.py index 5a661edf2..3cb84d308 100644 --- a/bbot/scanner/manager.py +++ b/bbot/scanner/manager.py @@ -415,7 +415,8 @@ async def _worker_loop(self): except asyncio.queues.QueueEmpty: await asyncio.sleep(0.1) continue - await self.emit_event(event, **kwargs) + emit_event_task = asyncio.create_task(self.emit_event(event, **kwargs), name=f"emit_event({event})") + await emit_event_task except Exception: log.critical(traceback.format_exc()) From b52c05bd0612b3e89640f63db00d583d56f77cd5 Mon Sep 17 00:00:00 2001 From: TheTechromancer Date: Mon, 25 Dec 2023 21:30:22 -0500 Subject: [PATCH 4/7] processpool max_works = (num_cpus - 1) --- bbot/scanner/scanner.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/bbot/scanner/scanner.py b/bbot/scanner/scanner.py index 0b900c291..e192053b0 100644 --- a/bbot/scanner/scanner.py +++ b/bbot/scanner/scanner.py @@ -259,7 +259,10 @@ def __init__( mp.set_start_method("spawn") except Exception: self.warning(f"Failed to set multiprocessing spawn method. This may negatively affect performance.") - self.process_pool = ProcessPoolExecutor() + # we spawn 1 fewer processes than cores + # this helps to avoid locking up the system or competing with the main python process for cpu time + num_processes = max(1, mp.cpu_count() - 1) + self.process_pool = ProcessPoolExecutor(max_workers=num_processes) self._stopping = False From b47389f09610e4b5d47c70cf023874dbaaa932af Mon Sep 17 00:00:00 2001 From: TheTechromancer Date: Mon, 25 Dec 2023 22:51:49 -0500 Subject: [PATCH 5/7] better task debugging --- bbot/core/helpers/dns.py | 3 ++- bbot/modules/base.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/bbot/core/helpers/dns.py b/bbot/core/helpers/dns.py index ed880150a..2ed10c3e3 100644 --- a/bbot/core/helpers/dns.py +++ b/bbot/core/helpers/dns.py @@ -958,7 +958,8 @@ async def is_wildcard_domain(self, domain, log_info=False): # continue for _ in range(self.wildcard_tests): rand_query = f"{rand_string(digits=False, length=10)}.{host}" - wildcard_tasks[rdtype].append(self.resolve(rand_query, type=rdtype, use_cache=False)) + wildcard_task = asyncio.create_task(self.resolve(rand_query, type=rdtype, use_cache=False)) + wildcard_tasks[rdtype].append(wildcard_task) # combine the random results is_wildcard = False diff --git a/bbot/modules/base.py b/bbot/modules/base.py index 01299c216..4d82fa85b 100644 --- a/bbot/modules/base.py +++ b/bbot/modules/base.py @@ -608,7 +608,8 @@ async def _worker(self): self.scan.stats.event_consumed(event, self) self.debug(f"Handling {event}") async with self.scan._acatch(context), self._task_counter.count(context): - handle_event_task = asyncio.create_task(self.handle_event(event)) + task_name = f"{self.name}.handle_event({event]})" + handle_event_task = asyncio.create_task(self.handle_event(event), name=task_name) await handle_event_task self.debug(f"Finished handling {event}") else: From a42d66e4f4de372891644ab1aebb093fe03376c9 Mon Sep 17 00:00:00 2001 From: TheTechromancer Date: Mon, 25 Dec 2023 22:56:17 -0500 Subject: [PATCH 6/7] fix tests --- bbot/modules/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bbot/modules/base.py b/bbot/modules/base.py index 4d82fa85b..08ac2817a 100644 --- a/bbot/modules/base.py +++ b/bbot/modules/base.py @@ -608,7 +608,7 @@ async def _worker(self): self.scan.stats.event_consumed(event, self) self.debug(f"Handling {event}") async with self.scan._acatch(context), self._task_counter.count(context): - task_name = f"{self.name}.handle_event({event]})" + task_name = f"{self.name}.handle_event({event})" handle_event_task = asyncio.create_task(self.handle_event(event), name=task_name) await handle_event_task self.debug(f"Finished handling {event}") From ea767b194d1ad0474e9b9b2b2feea963be51379d Mon Sep 17 00:00:00 2001 From: TheTechromancer Date: Wed, 27 Dec 2023 16:09:28 -0500 Subject: [PATCH 7/7] use tasks for batch dns resolutions --- bbot/core/helpers/dns.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bbot/core/helpers/dns.py b/bbot/core/helpers/dns.py index 2ed10c3e3..779dddb1d 100644 --- a/bbot/core/helpers/dns.py +++ b/bbot/core/helpers/dns.py @@ -662,7 +662,7 @@ async def resolve_batch(self, queries, **kwargs): batch_size = 250 for i in range(0, len(queries), batch_size): batch = queries[i : i + batch_size] - tasks = [self._resolve_batch_coro_wrapper(q, **kwargs) for q in batch] + tasks = [asyncio.create_task(self._resolve_batch_coro_wrapper(q, **kwargs)) for q in batch] async for task in as_completed(tasks): yield await task