Skip to content

Commit

Permalink
Merge branch 'dev' into dnsbrute-active
Browse files Browse the repository at this point in the history
  • Loading branch information
TheTechromancer authored Aug 16, 2024
2 parents a17966e + 207563c commit ae639ff
Show file tree
Hide file tree
Showing 16 changed files with 506 additions and 350 deletions.
70 changes: 38 additions & 32 deletions bbot/core/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ class EngineBase:

ERROR_CLASS = BBOTEngineError

def __init__(self):
def __init__(self, debug=False):
self._shutdown_status = False
self.log = logging.getLogger(f"bbot.core.{self.__class__.__name__.lower()}")
self._debug = debug

def pickle(self, obj):
try:
Expand Down Expand Up @@ -78,6 +79,10 @@ async def _infinite_retry(self, callback, *args, **kwargs):
if max_retries is not None and retries > max_retries:
raise TimeoutError(f"Timed out after {max_retries*interval:,} seconds {context}")

def debug(self, *args, **kwargs):
if self._debug:
self.log.debug(*args, **kwargs)


class EngineClient(EngineBase):
"""
Expand Down Expand Up @@ -114,9 +119,9 @@ class EngineClient(EngineBase):

SERVER_CLASS = None

def __init__(self, **kwargs):
super().__init__()
def __init__(self, debug=False, **kwargs):
self.name = f"EngineClient {self.__class__.__name__}"
super().__init__(debug=debug)
self.process = None
if self.SERVER_CLASS is None:
raise ValueError(f"Must set EngineClient SERVER_CLASS, {self.SERVER_CLASS}")
Expand All @@ -141,7 +146,7 @@ def check_error(self, message):

async def run_and_return(self, command, *args, **kwargs):
fn_str = f"{command}({args}, {kwargs})"
self.log.debug(f"{self.name}: executing run-and-return {fn_str}")
self.debug(f"{self.name}: executing run-and-return {fn_str}")
if self._shutdown_status and not command == "_shutdown":
self.log.verbose(f"{self.name} has been shut down and is not accepting new tasks")
return
Expand All @@ -150,7 +155,7 @@ async def run_and_return(self, command, *args, **kwargs):
message = self.make_message(command, args=args, kwargs=kwargs)
if message is error_sentinel:
return
await self._infinite_retry(socket.send, message)
await socket.send(message)
binary = await self._infinite_retry(socket.recv, _context=f"waiting for return value from {fn_str}")
except BaseException:
try:
Expand All @@ -161,15 +166,15 @@ async def run_and_return(self, command, *args, **kwargs):
raise
# self.log.debug(f"{self.name}.{command}({kwargs}) got binary: {binary}")
message = self.unpickle(binary)
self.log.debug(f"{self.name}: {fn_str} got return value: {message}")
self.debug(f"{self.name}: {fn_str} got return value: {message}")
# error handling
if self.check_error(message):
return
return message

async def run_and_yield(self, command, *args, **kwargs):
fn_str = f"{command}({args}, {kwargs})"
self.log.debug(f"{self.name}: executing run-and-yield {fn_str}")
self.debug(f"{self.name}: executing run-and-yield {fn_str}")
if self._shutdown_status:
self.log.verbose("Engine has been shut down and is not accepting new tasks")
return
Expand All @@ -188,18 +193,18 @@ async def run_and_yield(self, command, *args, **kwargs):
)
# self.log.debug(f"{self.name}.{command}({kwargs}) got binary: {binary}")
message = self.unpickle(binary)
self.log.debug(f"{self.name} {command} got iteration: {message}")
self.debug(f"{self.name}: {fn_str} got iteration: {message}")
# error handling
if self.check_error(message) or self.check_stop(message):
break
yield message
except (StopAsyncIteration, GeneratorExit) as e:
exc_name = e.__class__.__name__
self.log.debug(f"{self.name}.{command} got {exc_name}")
self.debug(f"{self.name}.{command} got {exc_name}")
try:
await self.send_cancel_message(socket, fn_str)
except Exception:
self.log.debug(f"{self.name}.{command} failed to send cancel message after {exc_name}")
self.debug(f"{self.name}.{command} failed to send cancel message after {exc_name}")
self.log.trace(traceback.format_exc())
break

Expand Down Expand Up @@ -266,6 +271,7 @@ def start_server(self):
# this allows us to more easily mock http, etc.
if os.environ.get("BBOT_TESTING", "") == "True":
kwargs["_loop"] = get_event_loop()
kwargs["debug"] = self._debug
self.process = CORE.create_process(
target=self.server_process,
args=(
Expand Down Expand Up @@ -305,7 +311,7 @@ async def new_socket(self):
if self._server_process is None:
self._server_process = self.start_server()
while not self.socket_path.exists():
self.log.debug(f"{self.name}: waiting for server process to start...")
self.debug(f"{self.name}: waiting for server process to start...")
await asyncio.sleep(0.1)
socket = self.context.socket(zmq.DEALER)
socket.setsockopt(zmq.LINGER, 0)
Expand Down Expand Up @@ -366,9 +372,9 @@ class EngineServer(EngineBase):

CMDS = {}

def __init__(self, socket_path):
super().__init__()
def __init__(self, socket_path, debug=False):
self.name = f"EngineServer {self.__class__.__name__}"
super().__init__(debug=debug)
self.socket_path = socket_path
self.client_id_var = contextvars.ContextVar("client_id", default=None)
# task <--> client id mapping
Expand Down Expand Up @@ -397,49 +403,49 @@ async def run_and_return(self, client_id, command_fn, *args, **kwargs):
fn_str = f"{command_fn.__name__}({args}, {kwargs})"
with self.client_id_context(client_id):
try:
self.log.debug(f"{self.name} run-and-return {fn_str}")
self.debug(f"{self.name}: run-and-return {fn_str}")
result = error_sentinel
try:
result = await command_fn(*args, **kwargs)
except BaseException as e:
if not in_exception_chain(e, (KeyboardInterrupt, asyncio.CancelledError)):
error = f"Error in {self.name}.{fn_str}: {e}"
self.log.debug(error)
self.debug(error)
trace = traceback.format_exc()
self.log.debug(trace)
self.debug(trace)
result = {"_e": (error, trace)}
finally:
self.tasks.pop(client_id, None)
if result is not error_sentinel:
self.log.debug(f"{self.name}: Sending response to {fn_str}: {result}")
self.debug(f"{self.name}: Sending response to {fn_str}: {result}")
await self.send_socket_multipart(client_id, result)
except BaseException as e:
self.log.critical(
f"Unhandled exception in {self.name}.run_and_return({client_id}, {command_fn}, {args}, {kwargs}): {e}"
)
self.log.critical(traceback.format_exc())
finally:
self.log.debug(f"{self.name} finished run-and-return {command_fn.__name__}({args}, {kwargs})")
self.debug(f"{self.name} finished run-and-return {command_fn.__name__}({args}, {kwargs})")

async def run_and_yield(self, client_id, command_fn, *args, **kwargs):
fn_str = f"{command_fn.__name__}({args}, {kwargs})"
with self.client_id_context(client_id):
try:
self.log.debug(f"{self.name} run-and-yield {fn_str}")
self.debug(f"{self.name}: run-and-yield {fn_str}")
try:
async for _ in command_fn(*args, **kwargs):
self.log.debug(f"{self.name}: sending iteration for {command_fn.__name__}(): {_}")
self.debug(f"{self.name}: sending iteration for {command_fn.__name__}(): {_}")
await self.send_socket_multipart(client_id, _)
except BaseException as e:
if not in_exception_chain(e, (KeyboardInterrupt, asyncio.CancelledError)):
error = f"Error in {self.name}.{fn_str}: {e}"
trace = traceback.format_exc()
self.log.debug(error)
self.log.debug(trace)
self.debug(error)
self.debug(trace)
result = {"_e": (error, trace)}
await self.send_socket_multipart(client_id, result)
finally:
self.log.debug(f"{self.name} reached end of run-and-yield iteration for {command_fn.__name__}()")
self.debug(f"{self.name} reached end of run-and-yield iteration for {command_fn.__name__}()")
# _s == special signal that means StopIteration
await self.send_socket_multipart(client_id, {"_s": None})
self.tasks.pop(client_id, None)
Expand All @@ -449,7 +455,7 @@ async def run_and_yield(self, client_id, command_fn, *args, **kwargs):
)
self.log.critical(traceback.format_exc())
finally:
self.log.debug(f"{self.name} finished run-and-yield {command_fn.__name__}()")
self.debug(f"{self.name} finished run-and-yield {command_fn.__name__}()")

async def send_socket_multipart(self, client_id, message):
try:
Expand All @@ -464,7 +470,7 @@ def check_error(self, message):
return True

async def worker(self):
self.log.debug(f"{self.name}: starting worker")
self.debug(f"{self.name}: starting worker")
try:
while 1:
client_id, binary = await self.socket.recv_multipart()
Expand All @@ -480,14 +486,14 @@ async def worker(self):

# -1 == cancel task
if cmd == -1:
self.log.debug(f"{self.name} got cancel signal")
self.debug(f"{self.name} got cancel signal")
await self.send_socket_multipart(client_id, {"m": "CANCEL_OK"})
await self.cancel_task(client_id)
continue

# -99 == shutdown task
if cmd == -99:
self.log.debug(f"{self.name} got shutdown signal")
self.debug(f"{self.name} got shutdown signal")
await self.send_socket_multipart(client_id, {"m": "SHUTDOWN_OK"})
await self._shutdown()
return
Expand Down Expand Up @@ -525,7 +531,7 @@ async def worker(self):
self.log.error(f"{self.name}: error in EngineServer worker: {e}")
self.log.trace(traceback.format_exc())
finally:
self.log.debug(f"{self.name}: finished worker()")
self.debug(f"{self.name}: finished worker()")

async def _shutdown(self):
if not self._shutdown_status:
Expand All @@ -540,7 +546,7 @@ async def _shutdown(self):
self.context.term()
except Exception:
self.log.trace(traceback.format_exc())
self.log.debug(f"{self.name}: finished shutting down")
self.log.verbose(f"{self.name}: finished shutting down")

def new_child_task(self, client_id, coro):
task = asyncio.create_task(coro)
Expand Down Expand Up @@ -573,11 +579,11 @@ async def cancel_task(self, client_id):
if parent_task is None:
return
parent_task, _cmd, _args, _kwargs = parent_task
self.log.debug(f"{self.name}: Cancelling client id {client_id} (task: {parent_task})")
self.debug(f"{self.name}: Cancelling client id {client_id} (task: {parent_task})")
parent_task.cancel()
child_tasks = self.child_tasks.pop(client_id, set())
if child_tasks:
self.log.debug(f"{self.name}: Cancelling {len(child_tasks):,} child tasks for client id {client_id}")
self.debug(f"{self.name}: Cancelling {len(child_tasks):,} child tasks for client id {client_id}")
for child_task in child_tasks:
child_task.cancel()

Expand All @@ -588,7 +594,7 @@ async def _cancel_task(self, task):
try:
await asyncio.wait_for(task, timeout=10)
except (TimeoutError, asyncio.exceptions.TimeoutError):
self.log.debug(f"{self.name}: Timeout cancelling task")
self.log.trace(f"{self.name}: Timeout cancelling task: {task}")
return
except (KeyboardInterrupt, asyncio.CancelledError):
return
Expand Down
33 changes: 28 additions & 5 deletions bbot/core/event/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,15 @@ def host_original(self):
return self.host
return self._host_original

@property
def closest_host(self):
"""
Walk up the chain of parents events until we hit the first one with a host
"""
if self.host is not None or self.parent is None or self.parent is self:
return self.host
return self.parent.closest_host

@property
def port(self):
self.host
Expand Down Expand Up @@ -572,7 +581,7 @@ def get_parents(self, omit=False, include_self=False):
return parents

def _host(self):
return ""
return None

def _sanitize_data(self, data):
"""
Expand Down Expand Up @@ -923,6 +932,18 @@ def _host(self):
return make_ip_type(parsed.hostname)


class ClosestHostEvent(DictHostEvent):
# if a host isn't specified, this event type uses the host from the closest parent
# inherited by FINDING and VULNERABILITY
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if "host" not in self.data:
closest_host = self.closest_host
if closest_host is None:
raise ValueError("No host was found in event parents. Host must be specified!")
self.data["host"] = str(closest_host)


class DictPathEvent(DictEvent):
_path_keywords = ["path", "filename"]

Expand Down Expand Up @@ -1300,7 +1321,7 @@ def redirect_location(self):
return location


class VULNERABILITY(DictHostEvent):
class VULNERABILITY(ClosestHostEvent):
_always_emit = True
_quick_emit = True
severity_colors = {
Expand All @@ -1316,10 +1337,11 @@ def sanitize_data(self, data):
return data

class _data_validator(BaseModel):
host: str
host: Optional[str] = None
severity: str
description: str
url: Optional[str] = None
path: Optional[str] = None
_validate_url = field_validator("url")(validators.validate_url)
_validate_host = field_validator("host")(validators.validate_host)
_validate_severity = field_validator("severity")(validators.validate_severity)
Expand All @@ -1328,14 +1350,15 @@ def _pretty_string(self):
return f'[{self.data["severity"]}] {self.data["description"]}'


class FINDING(DictHostEvent):
class FINDING(ClosestHostEvent):
_always_emit = True
_quick_emit = True

class _data_validator(BaseModel):
host: str
host: Optional[str] = None
description: str
url: Optional[str] = None
path: Optional[str] = None
_validate_url = field_validator("url")(validators.validate_url)
_validate_host = field_validator("host")(validators.validate_host)

Expand Down
3 changes: 2 additions & 1 deletion bbot/core/helpers/dns/dns.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ def __init__(self, parent_helper):
self.parent_helper = parent_helper
self.config = self.parent_helper.config
self.dns_config = self.config.get("dns", {})
super().__init__(server_kwargs={"config": self.config})
engine_debug = self.config.get("engine", {}).get("debug", False)
super().__init__(server_kwargs={"config": self.config}, debug=engine_debug)

# resolver
self.timeout = self.dns_config.get("timeout", 5)
Expand Down
4 changes: 2 additions & 2 deletions bbot/core/helpers/dns/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ class DNSEngine(EngineServer):
99: "_mock_dns",
}

def __init__(self, socket_path, config={}):
super().__init__(socket_path)
def __init__(self, socket_path, config={}, debug=False):
super().__init__(socket_path, debug=debug)

self.config = config
self.dns_config = self.config.get("dns", {})
Expand Down
Loading

0 comments on commit ae639ff

Please sign in to comment.