From f6c533ab7c696c575d357b7aeb8b885086c6a615 Mon Sep 17 00:00:00 2001 From: github-actions Date: Wed, 14 Aug 2024 17:37:31 -0400 Subject: [PATCH] better engine debugging --- bbot/core/engine.py | 70 ++++++++++++++++++--------------- bbot/core/helpers/dns/dns.py | 3 +- bbot/core/helpers/dns/engine.py | 4 +- bbot/core/helpers/web/engine.py | 4 +- bbot/core/helpers/web/web.py | 6 ++- bbot/defaults.yml | 5 +++ bbot/test/test.conf | 2 + 7 files changed, 56 insertions(+), 38 deletions(-) diff --git a/bbot/core/engine.py b/bbot/core/engine.py index 70652d456..06498d259 100644 --- a/bbot/core/engine.py +++ b/bbot/core/engine.py @@ -40,9 +40,10 @@ class EngineBase: ERROR_CLASS = BBOTEngineError - def __init__(self): + def __init__(self, debug=False): self._shutdown_status = False self.log = logging.getLogger(f"bbot.core.{self.__class__.__name__.lower()}") + self._debug = debug def pickle(self, obj): try: @@ -78,6 +79,10 @@ async def _infinite_retry(self, callback, *args, **kwargs): if max_retries is not None and retries > max_retries: raise TimeoutError(f"Timed out after {max_retries*interval:,} seconds {context}") + def debug(self, *args, **kwargs): + if self._debug: + self.log.debug(*args, **kwargs) + class EngineClient(EngineBase): """ @@ -114,9 +119,9 @@ class EngineClient(EngineBase): SERVER_CLASS = None - def __init__(self, **kwargs): - super().__init__() + def __init__(self, debug=False, **kwargs): self.name = f"EngineClient {self.__class__.__name__}" + super().__init__(debug=debug) self.process = None if self.SERVER_CLASS is None: raise ValueError(f"Must set EngineClient SERVER_CLASS, {self.SERVER_CLASS}") @@ -141,7 +146,7 @@ def check_error(self, message): async def run_and_return(self, command, *args, **kwargs): fn_str = f"{command}({args}, {kwargs})" - self.log.debug(f"{self.name}: executing run-and-return {fn_str}") + self.debug(f"{self.name}: executing run-and-return {fn_str}") if self._shutdown_status and not command == "_shutdown": self.log.verbose(f"{self.name} has been shut down and is not accepting new tasks") return @@ -150,7 +155,7 @@ async def run_and_return(self, command, *args, **kwargs): message = self.make_message(command, args=args, kwargs=kwargs) if message is error_sentinel: return - await self._infinite_retry(socket.send, message) + await socket.send(message) binary = await self._infinite_retry(socket.recv, _context=f"waiting for return value from {fn_str}") except BaseException: try: @@ -161,7 +166,7 @@ async def run_and_return(self, command, *args, **kwargs): raise # self.log.debug(f"{self.name}.{command}({kwargs}) got binary: {binary}") message = self.unpickle(binary) - self.log.debug(f"{self.name}: {fn_str} got return value: {message}") + self.debug(f"{self.name}: {fn_str} got return value: {message}") # error handling if self.check_error(message): return @@ -169,7 +174,7 @@ async def run_and_return(self, command, *args, **kwargs): async def run_and_yield(self, command, *args, **kwargs): fn_str = f"{command}({args}, {kwargs})" - self.log.debug(f"{self.name}: executing run-and-yield {fn_str}") + self.debug(f"{self.name}: executing run-and-yield {fn_str}") if self._shutdown_status: self.log.verbose("Engine has been shut down and is not accepting new tasks") return @@ -188,18 +193,18 @@ async def run_and_yield(self, command, *args, **kwargs): ) # self.log.debug(f"{self.name}.{command}({kwargs}) got binary: {binary}") message = self.unpickle(binary) - self.log.debug(f"{self.name} {command} got iteration: {message}") + self.debug(f"{self.name}: {fn_str} got iteration: {message}") # error handling if self.check_error(message) or self.check_stop(message): break yield message except (StopAsyncIteration, GeneratorExit) as e: exc_name = e.__class__.__name__ - self.log.debug(f"{self.name}.{command} got {exc_name}") + self.debug(f"{self.name}.{command} got {exc_name}") try: await self.send_cancel_message(socket, fn_str) except Exception: - self.log.debug(f"{self.name}.{command} failed to send cancel message after {exc_name}") + self.debug(f"{self.name}.{command} failed to send cancel message after {exc_name}") self.log.trace(traceback.format_exc()) break @@ -266,6 +271,7 @@ def start_server(self): # this allows us to more easily mock http, etc. if os.environ.get("BBOT_TESTING", "") == "True": kwargs["_loop"] = get_event_loop() + kwargs["debug"] = self._debug self.process = CORE.create_process( target=self.server_process, args=( @@ -305,7 +311,7 @@ async def new_socket(self): if self._server_process is None: self._server_process = self.start_server() while not self.socket_path.exists(): - self.log.debug(f"{self.name}: waiting for server process to start...") + self.debug(f"{self.name}: waiting for server process to start...") await asyncio.sleep(0.1) socket = self.context.socket(zmq.DEALER) socket.setsockopt(zmq.LINGER, 0) @@ -366,9 +372,9 @@ class EngineServer(EngineBase): CMDS = {} - def __init__(self, socket_path): - super().__init__() + def __init__(self, socket_path, debug=False): self.name = f"EngineServer {self.__class__.__name__}" + super().__init__(debug=debug) self.socket_path = socket_path self.client_id_var = contextvars.ContextVar("client_id", default=None) # task <--> client id mapping @@ -397,21 +403,21 @@ async def run_and_return(self, client_id, command_fn, *args, **kwargs): fn_str = f"{command_fn.__name__}({args}, {kwargs})" with self.client_id_context(client_id): try: - self.log.debug(f"{self.name} run-and-return {fn_str}") + self.debug(f"{self.name}: run-and-return {fn_str}") result = error_sentinel try: result = await command_fn(*args, **kwargs) except BaseException as e: if not in_exception_chain(e, (KeyboardInterrupt, asyncio.CancelledError)): error = f"Error in {self.name}.{fn_str}: {e}" - self.log.debug(error) + self.debug(error) trace = traceback.format_exc() - self.log.debug(trace) + self.debug(trace) result = {"_e": (error, trace)} finally: self.tasks.pop(client_id, None) if result is not error_sentinel: - self.log.debug(f"{self.name}: Sending response to {fn_str}: {result}") + self.debug(f"{self.name}: Sending response to {fn_str}: {result}") await self.send_socket_multipart(client_id, result) except BaseException as e: self.log.critical( @@ -419,27 +425,27 @@ async def run_and_return(self, client_id, command_fn, *args, **kwargs): ) self.log.critical(traceback.format_exc()) finally: - self.log.debug(f"{self.name} finished run-and-return {command_fn.__name__}({args}, {kwargs})") + self.debug(f"{self.name} finished run-and-return {command_fn.__name__}({args}, {kwargs})") async def run_and_yield(self, client_id, command_fn, *args, **kwargs): fn_str = f"{command_fn.__name__}({args}, {kwargs})" with self.client_id_context(client_id): try: - self.log.debug(f"{self.name} run-and-yield {fn_str}") + self.debug(f"{self.name}: run-and-yield {fn_str}") try: async for _ in command_fn(*args, **kwargs): - self.log.debug(f"{self.name}: sending iteration for {command_fn.__name__}(): {_}") + self.debug(f"{self.name}: sending iteration for {command_fn.__name__}(): {_}") await self.send_socket_multipart(client_id, _) except BaseException as e: if not in_exception_chain(e, (KeyboardInterrupt, asyncio.CancelledError)): error = f"Error in {self.name}.{fn_str}: {e}" trace = traceback.format_exc() - self.log.debug(error) - self.log.debug(trace) + self.debug(error) + self.debug(trace) result = {"_e": (error, trace)} await self.send_socket_multipart(client_id, result) finally: - self.log.debug(f"{self.name} reached end of run-and-yield iteration for {command_fn.__name__}()") + self.debug(f"{self.name} reached end of run-and-yield iteration for {command_fn.__name__}()") # _s == special signal that means StopIteration await self.send_socket_multipart(client_id, {"_s": None}) self.tasks.pop(client_id, None) @@ -449,7 +455,7 @@ async def run_and_yield(self, client_id, command_fn, *args, **kwargs): ) self.log.critical(traceback.format_exc()) finally: - self.log.debug(f"{self.name} finished run-and-yield {command_fn.__name__}()") + self.debug(f"{self.name} finished run-and-yield {command_fn.__name__}()") async def send_socket_multipart(self, client_id, message): try: @@ -464,7 +470,7 @@ def check_error(self, message): return True async def worker(self): - self.log.debug(f"{self.name}: starting worker") + self.debug(f"{self.name}: starting worker") try: while 1: client_id, binary = await self.socket.recv_multipart() @@ -480,14 +486,14 @@ async def worker(self): # -1 == cancel task if cmd == -1: - self.log.debug(f"{self.name} got cancel signal") + self.debug(f"{self.name} got cancel signal") await self.send_socket_multipart(client_id, {"m": "CANCEL_OK"}) await self.cancel_task(client_id) continue # -99 == shutdown task if cmd == -99: - self.log.debug(f"{self.name} got shutdown signal") + self.debug(f"{self.name} got shutdown signal") await self.send_socket_multipart(client_id, {"m": "SHUTDOWN_OK"}) await self._shutdown() return @@ -525,7 +531,7 @@ async def worker(self): self.log.error(f"{self.name}: error in EngineServer worker: {e}") self.log.trace(traceback.format_exc()) finally: - self.log.debug(f"{self.name}: finished worker()") + self.debug(f"{self.name}: finished worker()") async def _shutdown(self): if not self._shutdown_status: @@ -540,7 +546,7 @@ async def _shutdown(self): self.context.term() except Exception: self.log.trace(traceback.format_exc()) - self.log.debug(f"{self.name}: finished shutting down") + self.log.verbose(f"{self.name}: finished shutting down") def new_child_task(self, client_id, coro): task = asyncio.create_task(coro) @@ -573,11 +579,11 @@ async def cancel_task(self, client_id): if parent_task is None: return parent_task, _cmd, _args, _kwargs = parent_task - self.log.debug(f"{self.name}: Cancelling client id {client_id} (task: {parent_task})") + self.debug(f"{self.name}: Cancelling client id {client_id} (task: {parent_task})") parent_task.cancel() child_tasks = self.child_tasks.pop(client_id, set()) if child_tasks: - self.log.debug(f"{self.name}: Cancelling {len(child_tasks):,} child tasks for client id {client_id}") + self.debug(f"{self.name}: Cancelling {len(child_tasks):,} child tasks for client id {client_id}") for child_task in child_tasks: child_task.cancel() @@ -588,7 +594,7 @@ async def _cancel_task(self, task): try: await asyncio.wait_for(task, timeout=10) except (TimeoutError, asyncio.exceptions.TimeoutError): - self.log.debug(f"{self.name}: Timeout cancelling task") + self.log.trace(f"{self.name}: Timeout cancelling task: {task}") return except (KeyboardInterrupt, asyncio.CancelledError): return diff --git a/bbot/core/helpers/dns/dns.py b/bbot/core/helpers/dns/dns.py index 2f77ce081..07f562132 100644 --- a/bbot/core/helpers/dns/dns.py +++ b/bbot/core/helpers/dns/dns.py @@ -56,7 +56,8 @@ def __init__(self, parent_helper): self.parent_helper = parent_helper self.config = self.parent_helper.config self.dns_config = self.config.get("dns", {}) - super().__init__(server_kwargs={"config": self.config}) + engine_debug = self.config.get("engine", {}).get("debug", False) + super().__init__(server_kwargs={"config": self.config}, debug=engine_debug) # resolver self.timeout = self.dns_config.get("timeout", 5) diff --git a/bbot/core/helpers/dns/engine.py b/bbot/core/helpers/dns/engine.py index 6840d5506..d24c1f766 100644 --- a/bbot/core/helpers/dns/engine.py +++ b/bbot/core/helpers/dns/engine.py @@ -37,8 +37,8 @@ class DNSEngine(EngineServer): 99: "_mock_dns", } - def __init__(self, socket_path, config={}): - super().__init__(socket_path) + def __init__(self, socket_path, config={}, debug=False): + super().__init__(socket_path, debug=debug) self.config = config self.dns_config = self.config.get("dns", {}) diff --git a/bbot/core/helpers/web/engine.py b/bbot/core/helpers/web/engine.py index 30e037e6c..8f7984e2e 100644 --- a/bbot/core/helpers/web/engine.py +++ b/bbot/core/helpers/web/engine.py @@ -27,8 +27,8 @@ class HTTPEngine(EngineServer): "max_redirects", ) - def __init__(self, socket_path, target, config={}): - super().__init__(socket_path) + def __init__(self, socket_path, target, config={}, debug=False): + super().__init__(socket_path, debug=debug) self.target = target self.config = config self.web_config = self.config.get("web", {}) diff --git a/bbot/core/helpers/web/web.py b/bbot/core/helpers/web/web.py index 1e8ca3c61..c061a3d62 100644 --- a/bbot/core/helpers/web/web.py +++ b/bbot/core/helpers/web/web.py @@ -57,7 +57,11 @@ def __init__(self, parent_helper): self.web_spider_distance = self.web_config.get("spider_distance", 0) self.target = self.preset.target self.ssl_verify = self.config.get("ssl_verify", False) - super().__init__(server_kwargs={"config": self.config, "target": self.parent_helper.preset.target.radix_only}) + engine_debug = self.config.get("engine", {}).get("debug", False) + super().__init__( + server_kwargs={"config": self.config, "target": self.parent_helper.preset.target.radix_only}, + debug=engine_debug, + ) def AsyncClient(self, *args, **kwargs): from .client import BBOTAsyncClient diff --git a/bbot/defaults.yml b/bbot/defaults.yml index 62f178898..2ce8d4208 100644 --- a/bbot/defaults.yml +++ b/bbot/defaults.yml @@ -97,6 +97,11 @@ web: # Whether to verify SSL certificates ssl_verify: false +### ENGINE ### + +engine: + debug: false + # Tool dependencies deps: ffuf: diff --git a/bbot/test/test.conf b/bbot/test/test.conf index 8ae91bcf3..63914fe65 100644 --- a/bbot/test/test.conf +++ b/bbot/test/test.conf @@ -36,6 +36,8 @@ dns: - example.com - evilcorp.com - one +engine: + debug: true agent_url: ws://127.0.0.1:8765 agent_token: test speculate: false