diff --git a/bbot/core/configurator/args.py b/bbot/core/configurator/args.py index 795242911b..1735838271 100644 --- a/bbot/core/configurator/args.py +++ b/bbot/core/configurator/args.py @@ -29,11 +29,13 @@ def parse_args(self, *args, **kwargs): if ret.silent: ret.yes = True ret.modules = chain_lists(ret.modules) + ret.exclude_modules = chain_lists(ret.exclude_modules) ret.output_modules = chain_lists(ret.output_modules) ret.targets = chain_lists(ret.targets, try_files=True, msg="Reading targets from file: {filename}") ret.whitelist = chain_lists(ret.whitelist, try_files=True, msg="Reading whitelist from file: {filename}") ret.blacklist = chain_lists(ret.blacklist, try_files=True, msg="Reading blacklist from file: {filename}") ret.flags = chain_lists(ret.flags) + ret.exclude_flags = chain_lists(ret.exclude_flags) ret.require_flags = chain_lists(ret.require_flags) for m in ret.modules: if m not in module_choices and not self._dummy: diff --git a/bbot/core/helpers/dns.py b/bbot/core/helpers/dns.py index ed880150a5..779dddb1de 100644 --- a/bbot/core/helpers/dns.py +++ b/bbot/core/helpers/dns.py @@ -662,7 +662,7 @@ async def resolve_batch(self, queries, **kwargs): batch_size = 250 for i in range(0, len(queries), batch_size): batch = queries[i : i + batch_size] - tasks = [self._resolve_batch_coro_wrapper(q, **kwargs) for q in batch] + tasks = [asyncio.create_task(self._resolve_batch_coro_wrapper(q, **kwargs)) for q in batch] async for task in as_completed(tasks): yield await task @@ -958,7 +958,8 @@ async def is_wildcard_domain(self, domain, log_info=False): # continue for _ in range(self.wildcard_tests): rand_query = f"{rand_string(digits=False, length=10)}.{host}" - wildcard_tasks[rdtype].append(self.resolve(rand_query, type=rdtype, use_cache=False)) + wildcard_task = asyncio.create_task(self.resolve(rand_query, type=rdtype, use_cache=False)) + wildcard_tasks[rdtype].append(wildcard_task) # combine the random results is_wildcard = False diff --git a/bbot/core/helpers/validators.py b/bbot/core/helpers/validators.py index 3b8dc77c00..0384a876ef 100644 --- a/bbot/core/helpers/validators.py +++ b/bbot/core/helpers/validators.py @@ -207,7 +207,11 @@ def clean_url(url: str): return parsed -def collapse_urls(urls, threshold=10): +def collapse_urls(*args, **kwargs): + return list(_collapse_urls(*args, **kwargs)) + + +def _collapse_urls(urls, threshold=10): """ Collapses a list of URLs by deduping similar URLs based on a hashing mechanism. Useful for cleaning large lists of noisy URLs, such as those retrieved from wayback. diff --git a/bbot/modules/base.py b/bbot/modules/base.py index 660111d3fb..08ac2817ac 100644 --- a/bbot/modules/base.py +++ b/bbot/modules/base.py @@ -357,7 +357,8 @@ async def _handle_batch(self): self.debug(f"Handling batch of {len(events):,} events") submitted = True async with self.scan._acatch(f"{self.name}.handle_batch()"): - await self.handle_batch(*events) + handle_batch_task = asyncio.create_task(self.handle_batch(*events)) + await handle_batch_task self.debug(f"Finished handling batch of {len(events):,} events") if finish: context = f"{self.name}.finish()" @@ -532,7 +533,8 @@ async def _setup(self): status = False self.debug(f"Setting up module {self.name}") try: - result = await self.setup() + setup_task = asyncio.create_task(self.setup()) + result = await setup_task if type(result) == tuple and len(result) == 2: status, msg = result else: @@ -599,13 +601,16 @@ async def _worker(self): if event.type == "FINISHED": context = f"{self.name}.finish()" async with self.scan._acatch(context), self._task_counter.count(context): - await self.finish() + finish_task = asyncio.create_task(self.finish()) + await finish_task else: context = f"{self.name}.handle_event({event})" self.scan.stats.event_consumed(event, self) self.debug(f"Handling {event}") async with self.scan._acatch(context), self._task_counter.count(context): - await self.handle_event(event) + task_name = f"{self.name}.handle_event({event})" + handle_event_task = asyncio.create_task(self.handle_event(event), name=task_name) + await handle_event_task self.debug(f"Finished handling {event}") else: self.debug(f"Not accepting {event} because {reason}") diff --git a/bbot/modules/bevigil.py b/bbot/modules/bevigil.py index 0b54d40f40..e6c9990dd5 100644 --- a/bbot/modules/bevigil.py +++ b/bbot/modules/bevigil.py @@ -34,7 +34,7 @@ async def handle_event(self, event): if self.urls: urls = await self.query(query, request_fn=self.request_urls, parse_fn=self.parse_urls) if urls: - for parsed_url in self.helpers.validators.collapse_urls(urls): + for parsed_url in await self.scan.run_in_executor(self.helpers.validators.collapse_urls, urls): self.emit_event(parsed_url.geturl(), "URL_UNVERIFIED", source=event) async def request_subdomains(self, query): diff --git a/bbot/modules/dnszonetransfer.py b/bbot/modules/dnszonetransfer.py index d950514b5d..1ec5bf5eb3 100644 --- a/bbot/modules/dnszonetransfer.py +++ b/bbot/modules/dnszonetransfer.py @@ -36,10 +36,7 @@ async def handle_event(self, event): break try: self.debug(f"Attempting zone transfer against {nameserver} for domain {domain}") - xfr_answer = await self.scan.run_in_executor( - dns.query.xfr, nameserver, domain, timeout=self.timeout, lifetime=self.timeout - ) - zone = dns.zone.from_xfr(xfr_answer) + zone = await self.scan.run_in_executor(self.zone_transfer, nameserver, domain) except Exception as e: self.debug(f"Error retrieving zone for {domain}: {e}") continue @@ -64,3 +61,8 @@ async def handle_event(self, event): self.emit_event(child_event) else: self.debug(f"No data returned by {nameserver} for domain {domain}") + + def zone_transfer(self, nameserver, domain): + xfr_answer = dns.query.xfr(nameserver, domain, timeout=self.timeout, lifetime=self.timeout) + zone = dns.zone.from_xfr(xfr_answer) + return zone diff --git a/bbot/modules/wayback.py b/bbot/modules/wayback.py index 92219d97c3..4bec112bff 100644 --- a/bbot/modules/wayback.py +++ b/bbot/modules/wayback.py @@ -56,8 +56,7 @@ async def query(self, query): dns_names = set() collapsed_urls = 0 start_time = datetime.now() - parsed_urls = await self.scan.run_in_executor_mp( - self.execute_callback, + parsed_urls = await self.scan.run_in_executor( self.helpers.validators.collapse_urls, urls, threshold=self.garbage_threshold, @@ -76,10 +75,3 @@ async def query(self, query): duration = self.helpers.human_timedelta(end_time - start_time) self.verbose(f"Collapsed {len(urls):,} -> {collapsed_urls:,} URLs in {duration}") return results - - @staticmethod - def execute_callback(callback, *args, **kwargs): - """ - This exists so that we can run our URL parsing logic in a separate process. - """ - return list(callback(*args, **kwargs)) diff --git a/bbot/scanner/manager.py b/bbot/scanner/manager.py index 5a661edf27..3cb84d3082 100644 --- a/bbot/scanner/manager.py +++ b/bbot/scanner/manager.py @@ -415,7 +415,8 @@ async def _worker_loop(self): except asyncio.queues.QueueEmpty: await asyncio.sleep(0.1) continue - await self.emit_event(event, **kwargs) + emit_event_task = asyncio.create_task(self.emit_event(event, **kwargs), name=f"emit_event({event})") + await emit_event_task except Exception: log.critical(traceback.format_exc()) diff --git a/bbot/scanner/scanner.py b/bbot/scanner/scanner.py index 0b900c2911..e192053b0a 100644 --- a/bbot/scanner/scanner.py +++ b/bbot/scanner/scanner.py @@ -259,7 +259,10 @@ def __init__( mp.set_start_method("spawn") except Exception: self.warning(f"Failed to set multiprocessing spawn method. This may negatively affect performance.") - self.process_pool = ProcessPoolExecutor() + # we spawn 1 fewer processes than cores + # this helps to avoid locking up the system or competing with the main python process for cpu time + num_processes = max(1, mp.cpu_count() - 1) + self.process_pool = ProcessPoolExecutor(max_workers=num_processes) self._stopping = False