Skip to content

Commit

Permalink
better engine debugging
Browse files Browse the repository at this point in the history
  • Loading branch information
github-actions committed Aug 14, 2024
1 parent 1220598 commit f6c533a
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 38 deletions.
70 changes: 38 additions & 32 deletions bbot/core/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ class EngineBase:

ERROR_CLASS = BBOTEngineError

def __init__(self):
def __init__(self, debug=False):
self._shutdown_status = False
self.log = logging.getLogger(f"bbot.core.{self.__class__.__name__.lower()}")
self._debug = debug

def pickle(self, obj):
try:
Expand Down Expand Up @@ -78,6 +79,10 @@ async def _infinite_retry(self, callback, *args, **kwargs):
if max_retries is not None and retries > max_retries:
raise TimeoutError(f"Timed out after {max_retries*interval:,} seconds {context}")

def debug(self, *args, **kwargs):
if self._debug:
self.log.debug(*args, **kwargs)


class EngineClient(EngineBase):
"""
Expand Down Expand Up @@ -114,9 +119,9 @@ class EngineClient(EngineBase):

SERVER_CLASS = None

def __init__(self, **kwargs):
super().__init__()
def __init__(self, debug=False, **kwargs):
self.name = f"EngineClient {self.__class__.__name__}"
super().__init__(debug=debug)
self.process = None
if self.SERVER_CLASS is None:
raise ValueError(f"Must set EngineClient SERVER_CLASS, {self.SERVER_CLASS}")
Expand All @@ -141,7 +146,7 @@ def check_error(self, message):

async def run_and_return(self, command, *args, **kwargs):
fn_str = f"{command}({args}, {kwargs})"
self.log.debug(f"{self.name}: executing run-and-return {fn_str}")
self.debug(f"{self.name}: executing run-and-return {fn_str}")
if self._shutdown_status and not command == "_shutdown":
self.log.verbose(f"{self.name} has been shut down and is not accepting new tasks")
return
Expand All @@ -150,7 +155,7 @@ async def run_and_return(self, command, *args, **kwargs):
message = self.make_message(command, args=args, kwargs=kwargs)
if message is error_sentinel:
return
await self._infinite_retry(socket.send, message)
await socket.send(message)
binary = await self._infinite_retry(socket.recv, _context=f"waiting for return value from {fn_str}")
except BaseException:
try:
Expand All @@ -161,15 +166,15 @@ async def run_and_return(self, command, *args, **kwargs):
raise
# self.log.debug(f"{self.name}.{command}({kwargs}) got binary: {binary}")
message = self.unpickle(binary)
self.log.debug(f"{self.name}: {fn_str} got return value: {message}")
self.debug(f"{self.name}: {fn_str} got return value: {message}")
# error handling
if self.check_error(message):
return
return message

async def run_and_yield(self, command, *args, **kwargs):
fn_str = f"{command}({args}, {kwargs})"
self.log.debug(f"{self.name}: executing run-and-yield {fn_str}")
self.debug(f"{self.name}: executing run-and-yield {fn_str}")
if self._shutdown_status:
self.log.verbose("Engine has been shut down and is not accepting new tasks")
return
Expand All @@ -188,18 +193,18 @@ async def run_and_yield(self, command, *args, **kwargs):
)
# self.log.debug(f"{self.name}.{command}({kwargs}) got binary: {binary}")
message = self.unpickle(binary)
self.log.debug(f"{self.name} {command} got iteration: {message}")
self.debug(f"{self.name}: {fn_str} got iteration: {message}")
# error handling
if self.check_error(message) or self.check_stop(message):
break
yield message
except (StopAsyncIteration, GeneratorExit) as e:
exc_name = e.__class__.__name__
self.log.debug(f"{self.name}.{command} got {exc_name}")
self.debug(f"{self.name}.{command} got {exc_name}")
try:
await self.send_cancel_message(socket, fn_str)
except Exception:
self.log.debug(f"{self.name}.{command} failed to send cancel message after {exc_name}")
self.debug(f"{self.name}.{command} failed to send cancel message after {exc_name}")
self.log.trace(traceback.format_exc())
break

Expand Down Expand Up @@ -266,6 +271,7 @@ def start_server(self):
# this allows us to more easily mock http, etc.
if os.environ.get("BBOT_TESTING", "") == "True":
kwargs["_loop"] = get_event_loop()
kwargs["debug"] = self._debug
self.process = CORE.create_process(
target=self.server_process,
args=(
Expand Down Expand Up @@ -305,7 +311,7 @@ async def new_socket(self):
if self._server_process is None:
self._server_process = self.start_server()
while not self.socket_path.exists():
self.log.debug(f"{self.name}: waiting for server process to start...")
self.debug(f"{self.name}: waiting for server process to start...")
await asyncio.sleep(0.1)
socket = self.context.socket(zmq.DEALER)
socket.setsockopt(zmq.LINGER, 0)
Expand Down Expand Up @@ -366,9 +372,9 @@ class EngineServer(EngineBase):

CMDS = {}

def __init__(self, socket_path):
super().__init__()
def __init__(self, socket_path, debug=False):
self.name = f"EngineServer {self.__class__.__name__}"
super().__init__(debug=debug)
self.socket_path = socket_path
self.client_id_var = contextvars.ContextVar("client_id", default=None)
# task <--> client id mapping
Expand Down Expand Up @@ -397,49 +403,49 @@ async def run_and_return(self, client_id, command_fn, *args, **kwargs):
fn_str = f"{command_fn.__name__}({args}, {kwargs})"
with self.client_id_context(client_id):
try:
self.log.debug(f"{self.name} run-and-return {fn_str}")
self.debug(f"{self.name}: run-and-return {fn_str}")
result = error_sentinel
try:
result = await command_fn(*args, **kwargs)
except BaseException as e:
if not in_exception_chain(e, (KeyboardInterrupt, asyncio.CancelledError)):
error = f"Error in {self.name}.{fn_str}: {e}"
self.log.debug(error)
self.debug(error)
trace = traceback.format_exc()
self.log.debug(trace)
self.debug(trace)
result = {"_e": (error, trace)}
finally:
self.tasks.pop(client_id, None)
if result is not error_sentinel:
self.log.debug(f"{self.name}: Sending response to {fn_str}: {result}")
self.debug(f"{self.name}: Sending response to {fn_str}: {result}")
await self.send_socket_multipart(client_id, result)
except BaseException as e:
self.log.critical(
f"Unhandled exception in {self.name}.run_and_return({client_id}, {command_fn}, {args}, {kwargs}): {e}"
)
self.log.critical(traceback.format_exc())
finally:
self.log.debug(f"{self.name} finished run-and-return {command_fn.__name__}({args}, {kwargs})")
self.debug(f"{self.name} finished run-and-return {command_fn.__name__}({args}, {kwargs})")

async def run_and_yield(self, client_id, command_fn, *args, **kwargs):
fn_str = f"{command_fn.__name__}({args}, {kwargs})"
with self.client_id_context(client_id):
try:
self.log.debug(f"{self.name} run-and-yield {fn_str}")
self.debug(f"{self.name}: run-and-yield {fn_str}")
try:
async for _ in command_fn(*args, **kwargs):
self.log.debug(f"{self.name}: sending iteration for {command_fn.__name__}(): {_}")
self.debug(f"{self.name}: sending iteration for {command_fn.__name__}(): {_}")
await self.send_socket_multipart(client_id, _)
except BaseException as e:
if not in_exception_chain(e, (KeyboardInterrupt, asyncio.CancelledError)):
error = f"Error in {self.name}.{fn_str}: {e}"
trace = traceback.format_exc()
self.log.debug(error)
self.log.debug(trace)
self.debug(error)
self.debug(trace)
result = {"_e": (error, trace)}
await self.send_socket_multipart(client_id, result)
finally:
self.log.debug(f"{self.name} reached end of run-and-yield iteration for {command_fn.__name__}()")
self.debug(f"{self.name} reached end of run-and-yield iteration for {command_fn.__name__}()")
# _s == special signal that means StopIteration
await self.send_socket_multipart(client_id, {"_s": None})
self.tasks.pop(client_id, None)
Expand All @@ -449,7 +455,7 @@ async def run_and_yield(self, client_id, command_fn, *args, **kwargs):
)
self.log.critical(traceback.format_exc())
finally:
self.log.debug(f"{self.name} finished run-and-yield {command_fn.__name__}()")
self.debug(f"{self.name} finished run-and-yield {command_fn.__name__}()")

async def send_socket_multipart(self, client_id, message):
try:
Expand All @@ -464,7 +470,7 @@ def check_error(self, message):
return True

async def worker(self):
self.log.debug(f"{self.name}: starting worker")
self.debug(f"{self.name}: starting worker")
try:
while 1:
client_id, binary = await self.socket.recv_multipart()
Expand All @@ -480,14 +486,14 @@ async def worker(self):

# -1 == cancel task
if cmd == -1:
self.log.debug(f"{self.name} got cancel signal")
self.debug(f"{self.name} got cancel signal")
await self.send_socket_multipart(client_id, {"m": "CANCEL_OK"})
await self.cancel_task(client_id)
continue

# -99 == shutdown task
if cmd == -99:
self.log.debug(f"{self.name} got shutdown signal")
self.debug(f"{self.name} got shutdown signal")
await self.send_socket_multipart(client_id, {"m": "SHUTDOWN_OK"})
await self._shutdown()
return
Expand Down Expand Up @@ -525,7 +531,7 @@ async def worker(self):
self.log.error(f"{self.name}: error in EngineServer worker: {e}")
self.log.trace(traceback.format_exc())
finally:
self.log.debug(f"{self.name}: finished worker()")
self.debug(f"{self.name}: finished worker()")

async def _shutdown(self):
if not self._shutdown_status:
Expand All @@ -540,7 +546,7 @@ async def _shutdown(self):
self.context.term()
except Exception:
self.log.trace(traceback.format_exc())
self.log.debug(f"{self.name}: finished shutting down")
self.log.verbose(f"{self.name}: finished shutting down")

def new_child_task(self, client_id, coro):
task = asyncio.create_task(coro)
Expand Down Expand Up @@ -573,11 +579,11 @@ async def cancel_task(self, client_id):
if parent_task is None:
return
parent_task, _cmd, _args, _kwargs = parent_task
self.log.debug(f"{self.name}: Cancelling client id {client_id} (task: {parent_task})")
self.debug(f"{self.name}: Cancelling client id {client_id} (task: {parent_task})")
parent_task.cancel()
child_tasks = self.child_tasks.pop(client_id, set())
if child_tasks:
self.log.debug(f"{self.name}: Cancelling {len(child_tasks):,} child tasks for client id {client_id}")
self.debug(f"{self.name}: Cancelling {len(child_tasks):,} child tasks for client id {client_id}")
for child_task in child_tasks:
child_task.cancel()

Expand All @@ -588,7 +594,7 @@ async def _cancel_task(self, task):
try:
await asyncio.wait_for(task, timeout=10)
except (TimeoutError, asyncio.exceptions.TimeoutError):
self.log.debug(f"{self.name}: Timeout cancelling task")
self.log.trace(f"{self.name}: Timeout cancelling task: {task}")
return
except (KeyboardInterrupt, asyncio.CancelledError):
return
Expand Down
3 changes: 2 additions & 1 deletion bbot/core/helpers/dns/dns.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ def __init__(self, parent_helper):
self.parent_helper = parent_helper
self.config = self.parent_helper.config
self.dns_config = self.config.get("dns", {})
super().__init__(server_kwargs={"config": self.config})
engine_debug = self.config.get("engine", {}).get("debug", False)
super().__init__(server_kwargs={"config": self.config}, debug=engine_debug)

# resolver
self.timeout = self.dns_config.get("timeout", 5)
Expand Down
4 changes: 2 additions & 2 deletions bbot/core/helpers/dns/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ class DNSEngine(EngineServer):
99: "_mock_dns",
}

def __init__(self, socket_path, config={}):
super().__init__(socket_path)
def __init__(self, socket_path, config={}, debug=False):
super().__init__(socket_path, debug=debug)

self.config = config
self.dns_config = self.config.get("dns", {})
Expand Down
4 changes: 2 additions & 2 deletions bbot/core/helpers/web/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ class HTTPEngine(EngineServer):
"max_redirects",
)

def __init__(self, socket_path, target, config={}):
super().__init__(socket_path)
def __init__(self, socket_path, target, config={}, debug=False):
super().__init__(socket_path, debug=debug)
self.target = target
self.config = config
self.web_config = self.config.get("web", {})
Expand Down
6 changes: 5 additions & 1 deletion bbot/core/helpers/web/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ def __init__(self, parent_helper):
self.web_spider_distance = self.web_config.get("spider_distance", 0)
self.target = self.preset.target
self.ssl_verify = self.config.get("ssl_verify", False)
super().__init__(server_kwargs={"config": self.config, "target": self.parent_helper.preset.target.radix_only})
engine_debug = self.config.get("engine", {}).get("debug", False)
super().__init__(
server_kwargs={"config": self.config, "target": self.parent_helper.preset.target.radix_only},
debug=engine_debug,
)

def AsyncClient(self, *args, **kwargs):
from .client import BBOTAsyncClient
Expand Down
5 changes: 5 additions & 0 deletions bbot/defaults.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ web:
# Whether to verify SSL certificates
ssl_verify: false

### ENGINE ###

engine:
debug: false

# Tool dependencies
deps:
ffuf:
Expand Down
2 changes: 2 additions & 0 deletions bbot/test/test.conf
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ dns:
- example.com
- evilcorp.com
- one
engine:
debug: true
agent_url: ws://127.0.0.1:8765
agent_token: test
speculate: false
Expand Down

0 comments on commit f6c533a

Please sign in to comment.