Skip to content

Commit

Permalink
Merge branch 'dev' into securitytxt
Browse files Browse the repository at this point in the history
  • Loading branch information
colin-stubbs authored Aug 24, 2024
2 parents e9d2b74 + 16fc5fa commit beb48a8
Show file tree
Hide file tree
Showing 34 changed files with 1,127 additions and 600 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ Passive API sources plus a recursive DNS brute-force with target-specific subdom
```bash
# find subdomains of evilcorp.com
bbot -t evilcorp.com -p subdomain-enum

# passive sources only
bbot -t evilcorp.com -p subdomain-enum -rf passive
```

<!-- BBOT SUBDOMAIN-ENUM PRESET EXPANDABLE -->
Expand Down
72 changes: 39 additions & 33 deletions bbot/core/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ class EngineBase:

ERROR_CLASS = BBOTEngineError

def __init__(self):
def __init__(self, debug=False):
self._shutdown_status = False
self.log = logging.getLogger(f"bbot.core.{self.__class__.__name__.lower()}")
self._debug = debug

def pickle(self, obj):
try:
Expand Down Expand Up @@ -73,11 +74,15 @@ async def _infinite_retry(self, callback, *args, **kwargs):
try:
return await asyncio.wait_for(callback(*args, **kwargs), timeout=interval)
except (TimeoutError, asyncio.exceptions.TimeoutError):
self.log.debug(f"{self.name}: Timeout after {interval:,} seconds{context}, retrying...")
self.log.debug(f"{self.name}: Timeout after {interval:,} seconds {context}, retrying...")
retries += 1
if max_retries is not None and retries > max_retries:
raise TimeoutError(f"Timed out after {max_retries*interval:,} seconds {context}")

def debug(self, *args, **kwargs):
if self._debug:
self.log.debug(*args, **kwargs)


class EngineClient(EngineBase):
"""
Expand Down Expand Up @@ -114,9 +119,9 @@ class EngineClient(EngineBase):

SERVER_CLASS = None

def __init__(self, **kwargs):
super().__init__()
def __init__(self, debug=False, **kwargs):
self.name = f"EngineClient {self.__class__.__name__}"
super().__init__(debug=debug)
self.process = None
if self.SERVER_CLASS is None:
raise ValueError(f"Must set EngineClient SERVER_CLASS, {self.SERVER_CLASS}")
Expand All @@ -141,7 +146,7 @@ def check_error(self, message):

async def run_and_return(self, command, *args, **kwargs):
fn_str = f"{command}({args}, {kwargs})"
self.log.debug(f"{self.name}: executing run-and-return {fn_str}")
self.debug(f"{self.name}: executing run-and-return {fn_str}")
if self._shutdown_status and not command == "_shutdown":
self.log.verbose(f"{self.name} has been shut down and is not accepting new tasks")
return
Expand All @@ -150,7 +155,7 @@ async def run_and_return(self, command, *args, **kwargs):
message = self.make_message(command, args=args, kwargs=kwargs)
if message is error_sentinel:
return
await self._infinite_retry(socket.send, message)
await socket.send(message)
binary = await self._infinite_retry(socket.recv, _context=f"waiting for return value from {fn_str}")
except BaseException:
try:
Expand All @@ -161,15 +166,15 @@ async def run_and_return(self, command, *args, **kwargs):
raise
# self.log.debug(f"{self.name}.{command}({kwargs}) got binary: {binary}")
message = self.unpickle(binary)
self.log.debug(f"{self.name}: {fn_str} got return value: {message}")
self.debug(f"{self.name}: {fn_str} got return value: {message}")
# error handling
if self.check_error(message):
return
return message

async def run_and_yield(self, command, *args, **kwargs):
fn_str = f"{command}({args}, {kwargs})"
self.log.debug(f"{self.name}: executing run-and-yield {fn_str}")
self.debug(f"{self.name}: executing run-and-yield {fn_str}")
if self._shutdown_status:
self.log.verbose("Engine has been shut down and is not accepting new tasks")
return
Expand All @@ -188,18 +193,18 @@ async def run_and_yield(self, command, *args, **kwargs):
)
# self.log.debug(f"{self.name}.{command}({kwargs}) got binary: {binary}")
message = self.unpickle(binary)
self.log.debug(f"{self.name} {command} got iteration: {message}")
self.debug(f"{self.name}: {fn_str} got iteration: {message}")
# error handling
if self.check_error(message) or self.check_stop(message):
break
yield message
except (StopAsyncIteration, GeneratorExit) as e:
exc_name = e.__class__.__name__
self.log.debug(f"{self.name}.{command} got {exc_name}")
self.debug(f"{self.name}.{command} got {exc_name}")
try:
await self.send_cancel_message(socket, fn_str)
except Exception:
self.log.debug(f"{self.name}.{command} failed to send cancel message after {exc_name}")
self.debug(f"{self.name}.{command} failed to send cancel message after {exc_name}")
self.log.trace(traceback.format_exc())
break

Expand Down Expand Up @@ -266,6 +271,7 @@ def start_server(self):
# this allows us to more easily mock http, etc.
if os.environ.get("BBOT_TESTING", "") == "True":
kwargs["_loop"] = get_event_loop()
kwargs["debug"] = self._debug
self.process = CORE.create_process(
target=self.server_process,
args=(
Expand Down Expand Up @@ -305,7 +311,7 @@ async def new_socket(self):
if self._server_process is None:
self._server_process = self.start_server()
while not self.socket_path.exists():
self.log.debug(f"{self.name}: waiting for server process to start...")
self.debug(f"{self.name}: waiting for server process to start...")
await asyncio.sleep(0.1)
socket = self.context.socket(zmq.DEALER)
socket.setsockopt(zmq.LINGER, 0)
Expand Down Expand Up @@ -366,9 +372,9 @@ class EngineServer(EngineBase):

CMDS = {}

def __init__(self, socket_path):
super().__init__()
def __init__(self, socket_path, debug=False):
self.name = f"EngineServer {self.__class__.__name__}"
super().__init__(debug=debug)
self.socket_path = socket_path
self.client_id_var = contextvars.ContextVar("client_id", default=None)
# task <--> client id mapping
Expand Down Expand Up @@ -397,49 +403,49 @@ async def run_and_return(self, client_id, command_fn, *args, **kwargs):
fn_str = f"{command_fn.__name__}({args}, {kwargs})"
with self.client_id_context(client_id):
try:
self.log.debug(f"{self.name} run-and-return {fn_str}")
self.debug(f"{self.name}: run-and-return {fn_str}")
result = error_sentinel
try:
result = await command_fn(*args, **kwargs)
except BaseException as e:
if not in_exception_chain(e, (KeyboardInterrupt, asyncio.CancelledError)):
error = f"Error in {self.name}.{fn_str}: {e}"
self.log.debug(error)
self.debug(error)
trace = traceback.format_exc()
self.log.debug(trace)
self.debug(trace)
result = {"_e": (error, trace)}
finally:
self.tasks.pop(client_id, None)
if result is not error_sentinel:
self.log.debug(f"{self.name}: Sending response to {fn_str}: {result}")
self.debug(f"{self.name}: Sending response to {fn_str}: {result}")
await self.send_socket_multipart(client_id, result)
except BaseException as e:
self.log.critical(
f"Unhandled exception in {self.name}.run_and_return({client_id}, {command_fn}, {args}, {kwargs}): {e}"
)
self.log.critical(traceback.format_exc())
finally:
self.log.debug(f"{self.name} finished run-and-return {command_fn.__name__}({args}, {kwargs})")
self.debug(f"{self.name} finished run-and-return {command_fn.__name__}({args}, {kwargs})")

async def run_and_yield(self, client_id, command_fn, *args, **kwargs):
fn_str = f"{command_fn.__name__}({args}, {kwargs})"
with self.client_id_context(client_id):
try:
self.log.debug(f"{self.name} run-and-yield {fn_str}")
self.debug(f"{self.name}: run-and-yield {fn_str}")
try:
async for _ in command_fn(*args, **kwargs):
self.log.debug(f"{self.name}: sending iteration for {command_fn.__name__}(): {_}")
self.debug(f"{self.name}: sending iteration for {command_fn.__name__}(): {_}")
await self.send_socket_multipart(client_id, _)
except BaseException as e:
if not in_exception_chain(e, (KeyboardInterrupt, asyncio.CancelledError)):
error = f"Error in {self.name}.{fn_str}: {e}"
trace = traceback.format_exc()
self.log.debug(error)
self.log.debug(trace)
self.debug(error)
self.debug(trace)
result = {"_e": (error, trace)}
await self.send_socket_multipart(client_id, result)
finally:
self.log.debug(f"{self.name} reached end of run-and-yield iteration for {command_fn.__name__}()")
self.debug(f"{self.name} reached end of run-and-yield iteration for {command_fn.__name__}()")
# _s == special signal that means StopIteration
await self.send_socket_multipart(client_id, {"_s": None})
self.tasks.pop(client_id, None)
Expand All @@ -449,7 +455,7 @@ async def run_and_yield(self, client_id, command_fn, *args, **kwargs):
)
self.log.critical(traceback.format_exc())
finally:
self.log.debug(f"{self.name} finished run-and-yield {command_fn.__name__}()")
self.debug(f"{self.name} finished run-and-yield {command_fn.__name__}()")

async def send_socket_multipart(self, client_id, message):
try:
Expand All @@ -464,7 +470,7 @@ def check_error(self, message):
return True

async def worker(self):
self.log.debug(f"{self.name}: starting worker")
self.debug(f"{self.name}: starting worker")
try:
while 1:
client_id, binary = await self.socket.recv_multipart()
Expand All @@ -480,14 +486,14 @@ async def worker(self):

# -1 == cancel task
if cmd == -1:
self.log.debug(f"{self.name} got cancel signal")
self.debug(f"{self.name} got cancel signal")
await self.send_socket_multipart(client_id, {"m": "CANCEL_OK"})
await self.cancel_task(client_id)
continue

# -99 == shutdown task
if cmd == -99:
self.log.debug(f"{self.name} got shutdown signal")
self.debug(f"{self.name} got shutdown signal")
await self.send_socket_multipart(client_id, {"m": "SHUTDOWN_OK"})
await self._shutdown()
return
Expand Down Expand Up @@ -525,7 +531,7 @@ async def worker(self):
self.log.error(f"{self.name}: error in EngineServer worker: {e}")
self.log.trace(traceback.format_exc())
finally:
self.log.debug(f"{self.name}: finished worker()")
self.debug(f"{self.name}: finished worker()")

async def _shutdown(self):
if not self._shutdown_status:
Expand All @@ -540,7 +546,7 @@ async def _shutdown(self):
self.context.term()
except Exception:
self.log.trace(traceback.format_exc())
self.log.debug(f"{self.name}: finished shutting down")
self.log.verbose(f"{self.name}: finished shutting down")

def new_child_task(self, client_id, coro):
task = asyncio.create_task(coro)
Expand Down Expand Up @@ -573,11 +579,11 @@ async def cancel_task(self, client_id):
if parent_task is None:
return
parent_task, _cmd, _args, _kwargs = parent_task
self.log.debug(f"{self.name}: Cancelling client id {client_id} (task: {parent_task})")
self.debug(f"{self.name}: Cancelling client id {client_id} (task: {parent_task})")
parent_task.cancel()
child_tasks = self.child_tasks.pop(client_id, set())
if child_tasks:
self.log.debug(f"{self.name}: Cancelling {len(child_tasks):,} child tasks for client id {client_id}")
self.debug(f"{self.name}: Cancelling {len(child_tasks):,} child tasks for client id {client_id}")
for child_task in child_tasks:
child_task.cancel()

Expand All @@ -588,7 +594,7 @@ async def _cancel_task(self, task):
try:
await asyncio.wait_for(task, timeout=10)
except (TimeoutError, asyncio.exceptions.TimeoutError):
self.log.debug(f"{self.name}: Timeout cancelling task")
self.log.trace(f"{self.name}: Timeout cancelling task: {task}")
return
except (KeyboardInterrupt, asyncio.CancelledError):
return
Expand Down
Loading

0 comments on commit beb48a8

Please sign in to comment.