diff --git a/bbot/core/event/base.py b/bbot/core/event/base.py index d3cf2c2ba4..74ec0b6961 100644 --- a/bbot/core/event/base.py +++ b/bbot/core/event/base.py @@ -1,24 +1,20 @@ -from collections import defaultdict import io import re import uuid import json import base64 -import asyncio import logging import tarfile -import binascii import datetime import ipaddress import traceback -import xml.etree.ElementTree as ET -from copy import copy, deepcopy from pathlib import Path from typing import Optional +from copy import copy, deepcopy from contextlib import suppress from radixtarget import RadixTarget -from urllib.parse import urljoin, parse_qs, unquote, quote +from urllib.parse import urljoin, parse_qs from pydantic import BaseModel, field_validator @@ -44,6 +40,7 @@ validators, get_file_extension, ) +from bbot.core.helpers.web.envelopes import BaseEnvelope log = logging.getLogger("bbot.core.event") @@ -593,6 +590,10 @@ def parent(self, parent): elif not self._dummy: log.warning(f"Tried to set invalid parent on {self}: (got: {parent})") + @property + def children(self): + return [] + @property def parent_id(self): parent_id = getattr(self.get_parent(), "id", None) @@ -648,15 +649,10 @@ def get_parents(self, omit=False, include_self=False): return parents def clone(self): - # Create a shallow copy of the event first cloned_event = copy(self) - - # Handle attributes that need deep copying manually - setattr(cloned_event, "envelopes", deepcopy(self.envelopes)) - # Re-assign a new UUID - cloned_event.uuid = uuid.uuid4() + cloned_event._uuid = uuid.uuid4() return cloned_event def _host(self): @@ -1329,382 +1325,45 @@ class URL_HINT(URL_UNVERIFIED): class WEB_PARAMETER(DictHostEvent): - @property - def uuid(self): - return self._uuid - - @uuid.setter - def uuid(self, value): - self._uuid = value - - class ParameterEnvelopes: - - @staticmethod - def preprocess_base64(base64_str): - return base64.b64decode(base64_str).decode() - - @staticmethod - def postprocess_base64(string): - return base64.b64encode(string.encode()).decode() - - @staticmethod - def preprocess_hex(hex_str): - return bytes.fromhex(hex_str).decode() - - @staticmethod - def postprocess_hex(string): - return string.encode().hex() - - @staticmethod - def preprocess_urlencoded(url_encoded_str): - return unquote(url_encoded_str) - - @staticmethod - def postprocess_urlencoded(string): - return quote(string) - - @staticmethod - def is_ascii_printable(s): - return all(32 <= ord(char) < 127 for char in s) - - # Converts XML ElementTree to a JSON-like dictionary - def xml_to_dict(self, elem): - """ - Convert XML ElementTree to a dictionary recursively. - """ - d = {elem.tag: {} if elem.attrib else None} - children = list(elem) - if children: - dd = defaultdict(list) - for dc in map(self.xml_to_dict, children): - for k, v in dc.items(): - dd[k].append(v) - d = {elem.tag: {k: v[0] if len(v) == 1 else v for k, v in dd.items()}} - if elem.attrib: - d[elem.tag].update(("@" + k, v) for k, v in elem.attrib.items()) - if elem.text: - text = elem.text.strip() - if children or elem.attrib: - if text: - d[elem.tag]["#text"] = text - else: - d[elem.tag] = text - return d - - def dict_to_xml(self, d): - """ - Converts a dictionary to an XML string without adding an extra root node. - Assumes the dictionary was originally an XML structure. - """ - if not isinstance(d, dict) or len(d) != 1: - raise ValueError("Expected a dictionary with a single root element.") - - # Get the root element directly from the dict keys - root_tag = list(d.keys())[0] - root_element = ET.Element(root_tag) - - # Recursive function to handle nested dicts - def _build_tree(element, subdict): - for key, value in subdict.items(): - if isinstance(value, dict): - # Nested element - sub_element = ET.SubElement(element, key) - _build_tree(sub_element, value) - else: - # Leaf element - sub_element = ET.SubElement(element, key) - sub_element.text = str(value) - - # Start building the tree - _build_tree(root_element, d[root_tag]) - - return ET.tostring(root_element, encoding="utf-8").decode("utf-8") - - preprocess_map = { - "base64": preprocess_base64, - "hex": preprocess_hex, - "url-encoded": preprocess_urlencoded, - } - postprocess_map = { - "base64": postprocess_base64, - "hex": postprocess_hex, - "url-encoded": postprocess_urlencoded, - } - - # Format-specific functions for isolating and updating parameters - format_isolate_map = { - "json": lambda self: self.isolate_parameter(), - "xml": lambda self: self.isolate_parameter(), - } - format_update_map = { - "json": lambda self, value: self.update_json_parameter(value), - "xml": lambda self, value: self.update_xml_parameter(value), # Placeholder - } - - def initialize_value(self, value=None): - self.envelopes, end_format_dict = self.recurse_envelopes(value) - if self.envelopes: - log.debug(f"Discovered the following envelopes: [{','.join(self.envelopes)}]") - - if end_format_dict is not None: - self.end_format_type = list(end_format_dict.keys())[0] - log.debug(f"Identified the following end format: [{self.end_format_type}]") - self.end_format_data = list(end_format_dict.values())[0] - else: - self.end_format_type = None - self.end_format_data = None - self.end_format_subparameter = None - - def remove_envelopes(self, value): - """ - Remove envelopes from the value, processing each envelope in the order it was applied. - If the final format is present, trigger the appropriate handler (e.g., for JSON). - """ - # Apply the preprocess functions in the order the envelopes were applied - for env in self.envelopes: - func = self.preprocess_map.get(env) - if func: - # python3.9 compatibility hack - if isinstance(func, staticmethod): - func = func.__get__(None, self.__class__) # Unwrap staticmethod - value = func(value) - - # Dynamically select the appropriate isolate function based on the final format - isolate_func = self.format_isolate_map.get(self.end_format_type) - if isolate_func: - return isolate_func(self) - return value - - def add_envelopes(self, value): - """ - Add envelopes back to the value, processing in reverse order. - If the final format is present, trigger the appropriate handler (e.g., for JSON). - """ - # Dynamically select the appropriate update function based on the final format - update_func = self.format_update_map.get(self.end_format_type) - if update_func: - # python3.9 compatibility hack - if isinstance(update_func, staticmethod): - update_func = update_func.__get__(None, self.__class__) - value = update_func(self, value) - - # Apply the envelopes in reverse order - for env in self.envelopes[::-1]: - func = self.postprocess_map.get(env) - if func: - # python3.9 compatibility hack - if isinstance(func, staticmethod): - func = func.__get__(None, self.__class__) - value = func(value) - return value - - def recurse_envelopes(self, value, envelopes=None, end_format=None): - if envelopes is None: - envelopes = [] - log.debug( - f"Starting envelope recurse with value: [{value}], current envelopes: [{', '.join(envelopes)}], current end format: {end_format}" - ) - - if value is None or value == "" or isinstance(value, int): - return envelopes, end_format - - # Try URL decoding - try: - decoded_url = unquote(value) - if decoded_url != value and self.is_ascii_printable(decoded_url): - envelopes.append("url-encoded") - envelopes, end_format_dict = self.recurse_envelopes(decoded_url, envelopes) - return envelopes, end_format_dict - except Exception: - pass # Not valid URL encoding - - # Try base64 decoding - try: - decoded_base64 = base64.b64decode(value).decode() - if self.is_ascii_printable(decoded_base64): - envelopes.append("base64") - envelopes, end_format_dict = self.recurse_envelopes(decoded_base64, envelopes) - return envelopes, end_format_dict - except (binascii.Error, UnicodeDecodeError, ValueError): - pass # Not valid base64 - - # Try hex decoding - try: - decoded_hex = bytes.fromhex(value).decode("utf-8") - if self.is_ascii_printable(decoded_hex): - envelopes.append("hex") - envelopes, end_format_dict = self.recurse_envelopes(decoded_hex, envelopes) - return envelopes, end_format_dict - except (ValueError, UnicodeDecodeError): - pass # Not valid hex - - # Try JSON parsing - try: - decoded_json = json.loads(value) - if isinstance(decoded_json, dict): - return envelopes, {"json": decoded_json} - except json.JSONDecodeError: - pass # Not valid JSON + def children(self): + # if we have any subparams, raise a new WEB_PARAMETER for each one + children = [] + envelopes = getattr(self, "envelopes", None) + if envelopes is not None: + subparams = sorted(list(self.envelopes.get_subparams())) + + if envelopes.selected_subparam is None: + current_subparam = subparams[0] + envelopes.selected_subparam = current_subparam[0] + if len(subparams) > 1: + for subparam, _ in subparams[1:]: + clone = self.clone() + clone.envelopes = deepcopy(envelopes) + clone.envelopes.selected_subparam = subparam + clone.parent = self + children.append(clone) + return children - # Try XML parsing + def sanitize_data(self, data): + original_value = data.get("original_value", None) + if original_value is not None: try: - decoded_xml = ET.fromstring(value) - # Pass 'decoded_xml' to 'xml_to_dict' - xml_dict = self.xml_to_dict(decoded_xml) # Pass decoded XML as the 'elem' argument - return envelopes, {"xml": xml_dict} # Store as JSON-like dict, not XML - except ET.ParseError: - pass # Not valid XML - - return envelopes, end_format - - def isolate_parameter(self): - """ - Isolate the specified subparameter from the data structure (JSON/XML). - The subparameter is accessed using dot notation for nested keys. - """ - if self.end_format_data and self.end_format_subparameter: - # Split the dot notation string into keys - keys = self.end_format_subparameter.split(".") - - # Traverse the nested structure using the keys - subparameter_value = self.end_format_data - for key in keys: - if isinstance(subparameter_value, dict): - subparameter_value = subparameter_value.get(key) - else: - # If the structure is broken (not a dict), return None - return None - - return subparameter_value - - return None - - def update_json_parameter(self, new_value): - """ - Update the specified subparameter in the JSON structure and rebuild it. - """ - # Work with a copy to avoid modifying the original `end_format_data` - end_format_data_copy = deepcopy(self.end_format_data) - - if end_format_data_copy: - end_format_data_copy[self.end_format_subparameter] = new_value - return json.dumps(end_format_data_copy) - return new_value - - def update_xml_parameter(self, new_value): - """ - Convert the JSON-like structure back into an XML string after updating the specific parameter. - """ - if self.end_format_data and self.end_format_subparameter: - # Split the dot notation into keys - keys = self.end_format_subparameter.split(".") - - # Traverse the nested dictionary using the keys to find the target subparameter - current_data = self.end_format_data - for key in keys[:-1]: # Traverse up to the second-to-last key - current_data = current_data.get(key, {}) - - # Update the target subparameter with the new value - if isinstance(current_data, dict): - current_data[keys[-1]] = new_value - - # Convert the JSON-like dict back to an XML string - return self.dict_to_xml(self.end_format_data) - - return new_value - - def to_dict(self): - return { - "envelopes": self.envelopes, - "end_format_type": self.end_format_type, - "end_format_data": self.end_format_data, - "end_format_subparameter": self.end_format_subparameter, - } - - def __getstate__(self): - return self.to_dict() - - def __str__(self): - return f"ParameterEnvelopes(envelopes={self.envelopes}, end_format_type={self.end_format_type}, end_format_data={self.end_format_data}, end_format_subparameter={self.end_format_subparameter})" - - __repr__ = __str__ - - @classmethod - def from_dict(cls, data): - instance = cls() - instance.envelopes = data.get("envelopes", []) - instance.end_format_type = data.get("end_format_type") - instance.end_format_data = data.get("end_format_data") - instance.end_format_subparameter = data.get("end_format_subparameter") - return instance - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - if "original_value" in self.data.keys(): - parameterEnvelope_instance = self.ParameterEnvelopes() - parameterEnvelope_instance.initialize_value(self.data["original_value"]) - setattr(self, "envelopes", parameterEnvelope_instance) - - envelopes = getattr(self, "envelopes", None) - if ( - envelopes is not None - and getattr(envelopes, "end_format_type", None) is not None - and getattr(envelopes, "end_format_data", None) - ): - end_format_data = envelopes.end_format_data - - def extract_keys_with_values(data, parent_key=""): - """ - Recursively extract all keys from nested dictionaries that have values (non-empty). - Construct a path-like structure with dot notation (e.g., 'find.search'). - """ - keys = [] - if isinstance(data, dict): - for key, value in data.items(): - # Construct the full key path using dot notation - full_key = f"{parent_key}.{key}" if parent_key else key - - # Only add keys that have non-empty values - if value: - if isinstance(value, dict): - # Recursively check nested dictionaries - keys.extend(extract_keys_with_values(value, full_key)) - else: - # Add the key if it has a non-empty value - keys.append(full_key) - return keys - - # Extract all keys that have non-empty values - end_format_data_keys = extract_keys_with_values(end_format_data) - # If there are keys, assign the first key to end_format_subparameter - if end_format_data_keys: - - # Assign the first key to end_format_subparameter - setattr(envelopes, "end_format_subparameter", end_format_data_keys[0]) - setattr(envelopes, "end_format_subparameter", end_format_data_keys[0]) - - # Iterate through the remaining keys, starting from the second one - for p in end_format_data_keys[1:]: - log.debug(f"generating copy of event for subparameter {p} of type {envelopes.end_format_type}") - - # Make a copy of the current event data - cloned_event = self.clone() - cloned_envelopes = getattr(cloned_event, "envelopes") - cloned_envelopes.end_format_subparameter = p - asyncio.run_coroutine_threadsafe( - self.module.emit_event(cloned_event), asyncio.get_event_loop() - ) + envelopes = BaseEnvelope.detect(original_value) + setattr(self, "envelopes", envelopes) + except ValueError as e: + log.verbose(f"Error detecting envelopes for {self}: {e}") + return data def _data_id(self): # dedupe by url:name:param_type url = self.data.get("url", "") name = self.data.get("name", "") param_type = self.data.get("type", "") - envelopes = getattr(self, "envelopes", None) - subparameter = getattr(envelopes, "end_format_subparameter", "") if envelopes else "" + envelopes = getattr(self, "envelopes", "") + subparam = getattr(envelopes, "selected_subparam", "") - return f"{url}:{name}:{param_type}:{subparameter}" + return f"{url}:{name}:{param_type}:{subparam}" def _outgoing_dedup_hash(self, event): return hash( diff --git a/bbot/core/helpers/misc.py b/bbot/core/helpers/misc.py index 7c6c8a0738..ced61925ce 100644 --- a/bbot/core/helpers/misc.py +++ b/bbot/core/helpers/misc.py @@ -2867,3 +2867,15 @@ def clean_requirement(req_string): dist = distribution("bbot") return [clean_requirement(r) for r in dist.requires] + + +def is_printable(s): + """ + Check if a string is printable + """ + if not isinstance(s, str): + raise ValueError(f"Expected a string, got {type(s)}") + + # Exclude control characters that break display/printing + s = set(s) + return all(ord(c) >= 32 or c in "\t\n\r" for c in s) diff --git a/bbot/core/helpers/web/client.py b/bbot/core/helpers/web/client.py index 83154e5aec..49ddf532be 100644 --- a/bbot/core/helpers/web/client.py +++ b/bbot/core/helpers/web/client.py @@ -85,7 +85,6 @@ def __init__(self, *args, **kwargs): self._cookies = DummyCookies() def build_request(self, *args, **kwargs): - if args: url = args[0] kwargs["url"] = url diff --git a/bbot/core/helpers/web/engine.py b/bbot/core/helpers/web/engine.py index e0f63cb052..8ffdbe966f 100644 --- a/bbot/core/helpers/web/engine.py +++ b/bbot/core/helpers/web/engine.py @@ -50,7 +50,6 @@ def AsyncClient(self, *args, **kwargs): return client async def request(self, *args, **kwargs): - raise_error = kwargs.pop("raise_error", False) # TODO: use this cache_for = kwargs.pop("cache_for", None) # noqa @@ -75,7 +74,6 @@ async def request(self, *args, **kwargs): client_kwargs = {} for k in list(kwargs): if k in self.client_only_options: - v = kwargs.pop(k) client_kwargs[k] = v diff --git a/bbot/core/helpers/web/envelopes.py b/bbot/core/helpers/web/envelopes.py new file mode 100644 index 0000000000..c000c5d1c0 --- /dev/null +++ b/bbot/core/helpers/web/envelopes.py @@ -0,0 +1,348 @@ +import json +import base64 +import binascii +import xmltodict +from contextlib import suppress +from urllib.parse import unquote, quote +from xml.parsers.expat import ExpatError + +from bbot.core.helpers.misc import is_printable + + +# TODO: This logic is perfect for extracting params. We should expand it outwards to include other higher-level envelopes: +# - QueryStringEnvelope +# - MultipartFormEnvelope +# - HeaderEnvelope +# - CookieEnvelope +# +# Once we start ingesting HTTP_REQUEST events, this will make them instantly fuzzable + + +class EnvelopeChildTracker(type): + """ + Keeps track of all the child envelope classes + """ + + children = [] + + def __new__(mcs, name, bases, class_dict): + # Create the class + cls = super().__new__(mcs, name, bases, class_dict) + # Don't register the base class itself + if bases and not name.startswith("Base"): # Only register if it has base classes (i.e., is a child) + EnvelopeChildTracker.children.append(cls) + EnvelopeChildTracker.children.sort(key=lambda x: x.priority) + return cls + + +class BaseEnvelope(metaclass=EnvelopeChildTracker): + __slots__ = ["subparams", "selected_subparam", "singleton"] + + # determines the order of the envelope detection + priority = 5 + # whether the envelope is the final format, e.g. raw text/binary + end_format = False + ignore_exceptions = (Exception,) + envelope_classes = EnvelopeChildTracker.children + # transparent envelopes (i.e. TextEnvelope) are not counted as envelopes or included in the finding descriptions + transparent = False + + def __init__(self, s): + unpacked_data = self.unpack(s) + + if self.end_format: + inner_envelope = unpacked_data + else: + inner_envelope = self.detect(unpacked_data) + + self.selected_subparam = None + # if we have subparams, our inner envelope will be a dictionary + if isinstance(inner_envelope, dict): + self.subparams = inner_envelope + self.singleton = False + # otherwise if we just have one value, we make a dictionary with a default key + else: + self.subparams = {"__default__": inner_envelope} + self.singleton = True + + @property + def final_envelope(self): + try: + return self.unpacked_data(recursive=False).final_envelope + except AttributeError: + return self + + @property + def friendly_name(self): + if self.friendly_name: + return self.friendly_name + else: + return self.name + + def pack(self, data=None): + if data is None: + data = self.unpacked_data(recursive=False) + with suppress(AttributeError): + data = data.pack() + return self._pack(data) + + def unpack(self, s): + return self._unpack(s) + + def _pack(self, s): + """ + Encodes the string using the class's unique encoder (adds the outer envelope) + """ + raise NotImplementedError("Envelope.pack() must be implemented") + + def _unpack(self, s): + """ + Decodes the string using the class's unique encoder (removes the outer envelope) + """ + raise NotImplementedError("Envelope.unpack() must be implemented") + + def unpacked_data(self, recursive=True): + try: + unpacked = self.subparams["__default__"] + if recursive: + with suppress(AttributeError): + return unpacked.unpacked_data(recursive=recursive) + return unpacked + except KeyError: + return self.subparams + + @classmethod + def detect(cls, s): + """ + Detects the type of envelope used to encode the packed_data + """ + if not isinstance(s, str): + raise ValueError(f"Invalid data passed to detect(): {s} ({type(s)})") + # if the value is empty, we just return the text envelope + if not s.strip(): + return TextEnvelope(s) + for envelope_class in cls.envelope_classes: + with suppress(*envelope_class.ignore_exceptions): + envelope = envelope_class(s) + if envelope is not False: + return envelope + del envelope + raise Exception(f"No envelope detected for data: '{s}' ({type(s)})") + + def get_subparams(self, key=None, data=None, recursive=True): + if data is None: + data = self.unpacked_data(recursive=recursive) + if key is None: + key = [] + + if isinstance(data, dict): + for k, v in data.items(): + full_key = key + [k] + if isinstance(v, dict): + yield from self.get_subparams(full_key, v) + else: + yield full_key, v + else: + yield [], data + + def get_subparam(self, key=None, recursive=True): + if key is None: + key = self.selected_subparam + envelope = self + if recursive: + envelope = self.final_envelope + data = envelope.unpacked_data(recursive=False) + if key is None: + if envelope.singleton: + key = [] + else: + raise ValueError("No subparam selected") + else: + for segment in key: + data = data[segment] + return data + + def set_subparam(self, key=None, value=None, recursive=True): + envelope = self + if recursive: + envelope = self.final_envelope + + # if there's only one value to set, we can just set it directly + if envelope.singleton: + envelope.subparams["__default__"] = value + return + + # if key isn't specified, use the selected subparam + if key is None: + key = self.selected_subparam + if key is None: + raise ValueError(f"{self} -> {envelope}: No subparam selected") + + data = envelope.unpacked_data(recursive=False) + for segment in key[:-1]: + data = data[segment] + data[key[-1]] = value + + @property + def name(self): + return self.__class__.__name__ + + @property + def num_envelopes(self): + num_envelopes = 0 if self.transparent else 1 + if self.end_format: + return num_envelopes + for envelope in self.subparams.values(): + with suppress(AttributeError): + num_envelopes += envelope.num_envelopes + return num_envelopes + + @property + def summary(self): + if self.transparent: + return "" + self_string = f"{self.friendly_name}" + with suppress(AttributeError): + child_envelope = self.unpacked_data(recursive=False) + child_summary = child_envelope.summary + if child_summary: + self_string += f" -> {child_summary}" + + if self.selected_subparam: + self_string += f" [{'.'.join(self.selected_subparam)}]" + return self_string + + def to_dict(self): + return self.summary + + def __str__(self): + return self.summary + + __repr__ = __str__ + + +class HexEnvelope(BaseEnvelope): + """ + Hexadecimal encoding + """ + + friendly_name = "Hexadecimal-Encoded" + + ignore_exceptions = (ValueError, UnicodeDecodeError) + + def _pack(self, s): + return s.encode().hex() + + def _unpack(self, s): + return bytes.fromhex(s).decode() + + +class B64Envelope(BaseEnvelope): + """ + Base64 encoding + """ + + friendly_name = "Base64-Encoded" + + ignore_exceptions = (binascii.Error, UnicodeDecodeError, ValueError) + + def unpack(self, s): + # it's easy to have a small value that accidentally decodes to base64 + if len(s) < 8 and not s.endswith("="): + raise ValueError("Data is too small to be sure") + return super().unpack(s) + + def _pack(self, s): + return base64.b64encode(s.encode()).decode() + + def _unpack(self, s): + return base64.b64decode(s).decode() + + +class URLEnvelope(BaseEnvelope): + """ + URL encoding + """ + + friendly_name = "URL-Encoded" + + def unpack(self, s): + unpacked = super().unpack(s) + if unpacked == s: + raise Exception("Data is not URL-encoded") + return unpacked + + def _pack(self, s): + return quote(s) + + def _unpack(self, s): + return unquote(s) + + +class TextEnvelope(BaseEnvelope): + """ + Text encoding + """ + + end_format = True + # lowest priority means text is the ultimate fallback + priority = 10 + transparent = True + ignore_exceptions = () + + def _pack(self, s): + return s + + def _unpack(self, s): + if not is_printable(s): + raise ValueError(f"Non-printable data detected in TextEnvelope: '{s}' ({type(s)})") + return s + + +# class BinaryEnvelope(BaseEnvelope): +# """ +# Binary encoding +# """ +# end_format = True + +# def pack(self, s): +# return s + +# def unpack(self, s): +# if is_printable(s): +# raise Exception("Non-binary data detected in BinaryEnvelope") +# return s + + +class JSONEnvelope(BaseEnvelope): + """ + JSON encoding + """ + + friendly_name = "JSON-formatted" + end_format = True + priority = 8 + ignore_exceptions = (json.JSONDecodeError,) + + def _pack(self, s): + return json.dumps(s) + + def _unpack(self, s): + return json.loads(s) + + +class XMLEnvelope(BaseEnvelope): + """ + XML encoding + """ + + friendly_name = "XML-formatted" + end_format = True + priority = 9 + ignore_exceptions = (ExpatError,) + + def _pack(self, s): + return xmltodict.unparse(s) + + def _unpack(self, s): + return xmltodict.parse(s) diff --git a/bbot/modules/base.py b/bbot/modules/base.py index 1fa151c33b..4ab2da1528 100644 --- a/bbot/modules/base.py +++ b/bbot/modules/base.py @@ -528,8 +528,9 @@ async def emit_event(self, *args, **kwargs): if v is not None: emit_kwargs[o] = v event = self.make_event(*args, **event_kwargs) - if event: - await self.queue_outgoing_event(event, **emit_kwargs) + children = event.children + for e in [event] + children: + await self.queue_outgoing_event(e, **emit_kwargs) return event async def _events_waiting(self, batch_size=None): diff --git a/bbot/modules/internal/dnsresolve.py b/bbot/modules/internal/dnsresolve.py index c746b03451..5bb5c5bc40 100644 --- a/bbot/modules/internal/dnsresolve.py +++ b/bbot/modules/internal/dnsresolve.py @@ -83,9 +83,14 @@ async def handle_event(self, event, **kwargs): event_data_changed = await self.handle_wildcard_event(main_host_event) if event_data_changed: # since data has changed, we check again whether it's a duplicate - if event.type == "DNS_NAME" and self.scan.ingress_module.is_incoming_duplicate(event, add=True): + if event.type == "DNS_NAME" and self.scan.ingress_module.is_incoming_duplicate( + event, add=True + ): if not event._graph_important: - return False, "it's a DNS wildcard, and its module already emitted a similar wildcard event" + return ( + False, + "it's a DNS wildcard, and its module already emitted a similar wildcard event", + ) else: self.debug( f"Event {event} was already emitted by its module, but it's graph-important so it gets a pass" diff --git a/bbot/modules/internal/excavate.py b/bbot/modules/internal/excavate.py index a61d2f8ba8..1ba79018ca 100644 --- a/bbot/modules/internal/excavate.py +++ b/bbot/modules/internal/excavate.py @@ -460,10 +460,8 @@ def extract(self): # check to see if the format is defined as JSON if "content_type" in extracted_values.keys(): if extracted_values["content_type"] == "application/json": - # If we cant figure out the parameter names, there is no point in continuing if "data" in extracted_values.keys(): - if "url" in extracted_values.keys(): form_url = extracted_values["url"] else: @@ -481,8 +479,12 @@ def extract(self): form_parameters[p] = None for parameter_name in form_parameters: - yield "BODYJSON", parameter_name, None, form_url, _exclude_key( - form_parameters, parameter_name + yield ( + "BODYJSON", + parameter_name, + None, + form_url, + _exclude_key(form_parameters, parameter_name), ) class GetForm(ParameterExtractorRule): @@ -503,7 +505,6 @@ class GetForm(ParameterExtractorRule): def extract(self): forms = self.extraction_regex.findall(str(self.result)) for form_action, form_content in forms: - if not form_action or form_action == "#": form_action = None @@ -514,7 +515,6 @@ def extract(self): for form_content_regex_name, form_content_regex in self.form_content_regexes.items(): input_tags = form_content_regex.findall(form_content) if input_tags: - if form_content_regex_name == "input_tag_novalue_regex": form_parameters[input_tags[0]] = None @@ -530,7 +530,7 @@ def extract(self): self.output_type, parameter_name, original_value, - form_action, + form_action, _exclude_key(form_parameters, parameter_name), ) @@ -762,8 +762,10 @@ async def process(self, yara_results, event, yara_rule_settings, discovery_conte continue if parsed_url.scheme in ["http", "https"]: continue + def abort_if(e): return e.scope_distance > 0 + finding_data = {"host": str(host), "description": f"Non-HTTP URI: {parsed_url.geturl()}"} await self.report(finding_data, event, yara_rule_settings, discovery_context, abort_if=abort_if) protocol_data = {"protocol": parsed_url.scheme, "host": str(host)} @@ -998,6 +1000,8 @@ async def setup(self): return True async def search(self, data, event, content_type, discovery_context="HTTP response"): + # TODO: replace this JSON/XML extraction with our lightfuzz envelope stuff + if not data: return None decoded_data = await self.helpers.re.recursive_decode(data) @@ -1089,7 +1093,6 @@ async def handle_event(self, event): # If parameter_extraction is enabled and we assigned custom headers, emit them as WEB_PARAMETER if self.parameter_extraction is True: - custom_cookies = self.scan.web_config.get("http_cookies", {}) for custom_cookie_name, custom_cookie_value in custom_cookies.items(): description = f"HTTP Extracted Parameter [{custom_cookie_name}] (Custom Cookie)" diff --git a/bbot/modules/lightfuzz.py b/bbot/modules/lightfuzz.py index c61dbb4b25..ee3f04b20b 100644 --- a/bbot/modules/lightfuzz.py +++ b/bbot/modules/lightfuzz.py @@ -130,13 +130,10 @@ async def run_submodule(self, submodule, event): event_data = {"host": str(event.host), "url": event.data["url"], "description": r["description"]} envelopes = getattr(event, "envelopes", None) - if envelopes and envelopes.envelopes: - envelope_summary = f'[{"->".join(envelopes.envelopes)}]' - if envelopes.end_format_type: - envelope_summary += f" Format: [{envelopes.end_format_type}] with subparameter [{envelopes.end_format_subparameter}])" - + envelope_summary = getattr(envelopes, "summary", None) + if envelope_summary: # Append the envelope summary to the description - event_data["description"] += f" Envelopes: {envelope_summary}" + event_data["description"] += f" Envelopes: [{envelope_summary}]" if r["type"] == "VULNERABILITY": event_data["severity"] = r["severity"] @@ -147,10 +144,8 @@ async def run_submodule(self, submodule, event): ) async def handle_event(self, event): - if event.type == "URL": if self.config.get("force_common_headers", False) is False: - return False for h in self.common_headers: @@ -166,7 +161,6 @@ async def handle_event(self, event): await self.emit_event(data, "WEB_PARAMETER", event) elif event.type == "WEB_PARAMETER": - # check connectivity to url connectivity_test = await self.helpers.request(event.data["url"], timeout=10) @@ -199,5 +193,5 @@ async def finish(self): async def filter_event(self, event): if event.type == "WEB_PARAMETER" and self.disable_post and event.data["type"] == "POSTPARAM": - return False, "POST parameter disabled in lilghtfuzz module" + return False, "POST parameter disabled in lightfuzz module" return True diff --git a/bbot/modules/lightfuzz_submodules/base.py b/bbot/modules/lightfuzz_submodules/base.py index 313de8c16f..32d6d80336 100644 --- a/bbot/modules/lightfuzz_submodules/base.py +++ b/bbot/modules/lightfuzz_submodules/base.py @@ -19,7 +19,7 @@ def additional_params_process(self, additional_params, additional_params_populat return new_additional_params async def send_probe(self, probe): - probe = self.probe_value_outgoing(probe) + probe = self.outgoing_probe_value(probe) getparams = {self.event.data["name"]: probe} url = self.lightfuzz.helpers.add_get_params(self.event.data["url"], getparams, encode=False).geturl() self.lightfuzz.debug(f"lightfuzz sending probe with URL: {url}") @@ -30,7 +30,7 @@ async def send_probe(self, probe): def compare_baseline( self, event_type, probe, cookies, additional_params_populate_empty=False, speculative_mode="GETPARAM" ): - probe = self.probe_value_outgoing(probe) + probe = self.outgoing_probe_value(probe) http_compare = None if event_type == "SPECULATIVE": @@ -104,8 +104,7 @@ async def compare_probe( additional_params_override={}, speculative_mode="GETPARAM", ): - - probe = self.probe_value_outgoing(probe) + probe = self.outgoing_probe_value(probe) additional_params = copy.deepcopy(self.event.data.get("additional_params", {})) if additional_params_override: for k, v in additional_params_override.items(): @@ -151,7 +150,7 @@ async def standard_probe( speculative_mode="GETPARAM", allow_redirects=False, ): - probe = self.probe_value_outgoing(probe) + probe = self.outgoing_probe_value(probe) if event_type == "SPECULATIVE": event_type = speculative_mode @@ -211,7 +210,6 @@ async def standard_probe( def metadata(self): - metadata_string = f"Parameter: [{self.event.data['name']}] Parameter Type: [{self.event.data['type']}]" if self.event.data["original_value"] != "" and self.event.data["original_value"] is not None: metadata_string += ( @@ -219,21 +217,29 @@ def metadata(self): ) return metadata_string - def probe_value_incoming(self, populate_empty=True): - probe_value = self.event.data.get("original_value", "") - if (probe_value is None or len(str(probe_value)) == 0) and populate_empty is True: - probe_value = self.lightfuzz.helpers.rand_string(10, numeric_only=True) - self.lightfuzz.debug(f"probe_value_incoming (before modification): {probe_value}") - envelopes_instance = getattr(self.event, "envelopes", None) - probe_value = envelopes_instance.remove_envelopes(probe_value) - self.lightfuzz.debug(f"probe_value_incoming (after modification): {probe_value}") - if not isinstance(probe_value, str): - probe_value = str(probe_value) + def incoming_probe_value(self, populate_empty=True): + envelopes = getattr(self.event, "envelopes", None) + probe_value = "" + if envelopes is not None: + probe_value = envelopes.get_subparam() + self.lightfuzz.debug(f"incoming_probe_value (after unpacking): {probe_value} with envelopes [{envelopes}]") + if not probe_value: + if populate_empty is True: + probe_value = self.lightfuzz.helpers.rand_string(10, numeric_only=True) + else: + probe_value = "" + # if not isinstance(probe_value, str): + # raise ValueError( + # f"incoming_probe_value should always be a string (got {type(probe_value)} / {probe_value})" + # ) + probe_value = str(probe_value) return probe_value - def probe_value_outgoing(self, outgoing_probe_value): - self.lightfuzz.debug(f"probe_value_outgoing (before modification): {outgoing_probe_value}") - envelopes_instance = getattr(self.event, "envelopes", None) - outgoing_probe_value = envelopes_instance.add_envelopes(outgoing_probe_value) - self.lightfuzz.debug(f"probe_value_outgoing (after modification): {outgoing_probe_value}") + def outgoing_probe_value(self, outgoing_probe_value): + self.lightfuzz.debug(f"outgoing_probe_value (before packing): {outgoing_probe_value} / {self.event}") + envelopes = getattr(self.event, "envelopes", None) + if envelopes is not None: + envelopes.set_subparam(value=outgoing_probe_value) + outgoing_probe_value = envelopes.pack() + self.lightfuzz.debug(f"outgoing_probe_value (after packing): {outgoing_probe_value} with envelopes [{envelopes}] / {self.event}") return outgoing_probe_value diff --git a/bbot/modules/lightfuzz_submodules/cmdi.py b/bbot/modules/lightfuzz_submodules/cmdi.py index b9dbb27645..57acfdbb5b 100644 --- a/bbot/modules/lightfuzz_submodules/cmdi.py +++ b/bbot/modules/lightfuzz_submodules/cmdi.py @@ -5,11 +5,9 @@ class CmdILightfuzz(BaseLightfuzz): - async def fuzz(self): - cookies = self.event.data.get("assigned_cookies", {}) - probe_value = self.probe_value_incoming() + probe_value = self.incoming_probe_value() canary = self.lightfuzz.helpers.rand_string(10, numeric_only=True) http_compare = self.compare_baseline(self.event.data["type"], probe_value, cookies) @@ -31,7 +29,6 @@ async def fuzz(self): echo_probe = urllib.parse.quote(echo_probe.encode(), safe="") cmdi_probe = await self.compare_probe(http_compare, self.event.data["type"], echo_probe, cookies) if cmdi_probe[3]: - if canary in cmdi_probe[3].text and "echo" not in cmdi_probe[3].text: self.lightfuzz.debug(f"canary [{canary}] found in response when sending probe [{p}]") if p == "AAAA": diff --git a/bbot/modules/lightfuzz_submodules/crypto.py b/bbot/modules/lightfuzz_submodules/crypto.py index 4602713824..3ebb3bacd1 100644 --- a/bbot/modules/lightfuzz_submodules/crypto.py +++ b/bbot/modules/lightfuzz_submodules/crypto.py @@ -6,7 +6,6 @@ class CryptoLightfuzz(BaseLightfuzz): - @staticmethod def is_hex(s): try: @@ -21,7 +20,6 @@ def is_base64(s): if base64.b64encode(base64.b64decode(s)).decode() == s: return True except Exception: - return False return False @@ -75,7 +73,6 @@ def format_agnostic_encode(data, encoding, urlencode=False): @staticmethod def modify_string(input_string, action="truncate", position=None, extension_length=1): - if not isinstance(input_string, str): input_string = str(input_string) @@ -136,7 +133,7 @@ async def padding_oracle_execute(self, original_data, encoding, block_size, cook paddingblock = b"\x00" * block_size datablock = original_data[-block_size:] if possible_first_byte: - baseline_byte = b"\xFF" + baseline_byte = b"\xff" starting_pos = 0 else: baseline_byte = b"\x00" @@ -148,7 +145,6 @@ async def padding_oracle_execute(self, original_data, encoding, block_size, cook ) differ_count = 0 for i in range(starting_pos, starting_pos + 254): - byte = bytes([i]) oracle_probe = await self.compare_probe( baseline, @@ -176,7 +172,6 @@ async def padding_oracle(self, probe_value, cookies): possible_block_sizes = self.possible_block_sizes(len(data)) for block_size in possible_block_sizes: - padding_oracle_result = await self.padding_oracle_execute(data, encoding, block_size, cookies) if padding_oracle_result is None: self.lightfuzz.debug( @@ -198,7 +193,6 @@ async def padding_oracle(self, probe_value, cookies): ) async def error_string_search(self, text_dict, baseline_text): - matching_techniques = set() matching_strings = set() @@ -238,8 +232,10 @@ def identify_hash_function(hash_bytes): return hash_functions[hash_length] async def fuzz(self): + cookies = self.event.data.get("assigned_cookies", {}) - probe_value = self.probe_value_incoming(populate_empty=False) + probe_value = self.incoming_probe_value(populate_empty=False) + if not probe_value: self.lightfuzz.debug( f"The Cryptography Probe Submodule requires original value, aborting [{self.event.data['type']}] [{self.event.data['name']}]" @@ -256,7 +252,7 @@ async def fuzz(self): mutate_probe_value = self.modify_string(probe_value, action="mutate") except ValueError as e: self.lightfuzz.debug( - f"Encountered error modifying value for parameter {self.event.data['name']}: {e} , aborting" + f"Encountered error modifying value for parameter [{self.event.data['name']}]: {e} , aborting" ) return @@ -311,7 +307,6 @@ async def fuzz(self): if confirmed_techniques or ( "padding" in truncate_probe[3].text.lower() or "padding" in mutate_probe[3].text.lower() ): - # Padding Oracle Test if possible_block_cipher: diff --git a/bbot/modules/lightfuzz_submodules/path.py b/bbot/modules/lightfuzz_submodules/path.py index b1c4d710e3..827af65f2f 100644 --- a/bbot/modules/lightfuzz_submodules/path.py +++ b/bbot/modules/lightfuzz_submodules/path.py @@ -6,10 +6,9 @@ class PathTraversalLightfuzz(BaseLightfuzz): - async def fuzz(self): cookies = self.event.data.get("assigned_cookies", {}) - probe_value = self.probe_value_incoming(populate_empty=False) + probe_value = self.incoming_probe_value(populate_empty=False) if not probe_value: self.lightfuzz.debug( f"Path Traversal detection requires original value, aborting [{self.event.data['type']}] [{self.event.data['name']}]" diff --git a/bbot/modules/lightfuzz_submodules/serial.py b/bbot/modules/lightfuzz_submodules/serial.py index a45940d186..e6cf2da765 100644 --- a/bbot/modules/lightfuzz_submodules/serial.py +++ b/bbot/modules/lightfuzz_submodules/serial.py @@ -26,7 +26,7 @@ async def fuzz(self): "java.io.optionaldataexception", ] - probe_value = self.probe_value_incoming(populate_empty=False) + probe_value = self.incoming_probe_value(populate_empty=False) if probe_value: self.lightfuzz.debug( f"The Serialization Submodule only operates when there if no original value, aborting [{self.event.data['type']}] [{self.event.data['name']}]" diff --git a/bbot/modules/lightfuzz_submodules/sqli.py b/bbot/modules/lightfuzz_submodules/sqli.py index cb264c7b8b..1f7d677cce 100644 --- a/bbot/modules/lightfuzz_submodules/sqli.py +++ b/bbot/modules/lightfuzz_submodules/sqli.py @@ -38,9 +38,8 @@ def evaluate_delay(self, mean_baseline, measured_delay): return False async def fuzz(self): - cookies = self.event.data.get("assigned_cookies", {}) - probe_value = self.probe_value_incoming(populate_empty=True) + probe_value = self.incoming_probe_value(populate_empty=True) http_compare = self.compare_baseline( self.event.data["type"], probe_value, cookies, additional_params_populate_empty=True ) diff --git a/bbot/test/test_step_1/test_helpers.py b/bbot/test/test_step_1/test_helpers.py index 2eb67cd13d..329994c748 100644 --- a/bbot/test/test_step_1/test_helpers.py +++ b/bbot/test/test_step_1/test_helpers.py @@ -460,6 +460,13 @@ async def test_helpers_misc(helpers, scan, bbot_scanner, bbot_httpserver): s = "asdf {unused} {used}" assert helpers.safe_format(s, used="fdsa") == "asdf {unused} fdsa" + # is_printable + assert helpers.is_printable("asdf") is True + assert helpers.is_printable(r"""~!@#$^&*()_+=-<>:"?,./;'[]\{}|""") is True + assert helpers.is_printable("ドメイン.テスト") is True + assert helpers.is_printable("4") is True + assert helpers.is_printable("asdf\x00") is False + # punycode assert helpers.smart_encode_punycode("ドメイン.テスト") == "xn--eckwd4c7c.xn--zckzah" assert helpers.smart_decode_punycode("xn--eckwd4c7c.xn--zckzah") == "ドメイン.テスト" diff --git a/bbot/test/test_step_1/test_web_envelopes.py b/bbot/test/test_step_1/test_web_envelopes.py new file mode 100644 index 0000000000..79da9e829e --- /dev/null +++ b/bbot/test/test_step_1/test_web_envelopes.py @@ -0,0 +1,339 @@ +import pytest + + +async def test_web_envelopes(): + from bbot.core.helpers.web.envelopes import ( + BaseEnvelope, + TextEnvelope, + HexEnvelope, + B64Envelope, + JSONEnvelope, + XMLEnvelope, + URLEnvelope, + ) + + # simple text + text_envelope = BaseEnvelope.detect("foo") + assert isinstance(text_envelope, TextEnvelope) + assert text_envelope.unpacked_data() == "foo" + assert text_envelope.subparams == {"__default__": "foo"} + expected_subparams = [([], "foo")] + assert list(text_envelope.get_subparams()) == expected_subparams + for subparam, value in expected_subparams: + assert text_envelope.get_subparam(subparam) == value + assert text_envelope.pack() == "foo" + assert text_envelope.num_envelopes == 0 + assert text_envelope.get_subparam() == "foo" + text_envelope.set_subparam(value="bar") + assert text_envelope.get_subparam() == "bar" + assert text_envelope.unpacked_data() == "bar" + + # simple binary + # binary_envelope = BaseEnvelope.detect("foo\x00") + # assert isinstance(binary_envelope, BinaryEnvelope) + # assert binary_envelope.unpacked_data == "foo\x00" + # assert binary_envelope.packed_data == "foo\x00" + # assert binary_envelope.subparams == {"__default__": "foo\x00"} + + # text encoded as hex + hex_envelope = BaseEnvelope.detect("706172616d") + assert isinstance(hex_envelope, HexEnvelope) + assert hex_envelope.unpacked_data(recursive=True) == "param" + hex_inner_envelope = hex_envelope.unpacked_data(recursive=False) + assert isinstance(hex_inner_envelope, TextEnvelope) + assert hex_inner_envelope.unpacked_data(recursive=False) == "param" + assert hex_inner_envelope.unpacked_data(recursive=True) == "param" + assert list(hex_envelope.get_subparams(recursive=False)) == [([], hex_inner_envelope)] + assert list(hex_envelope.get_subparams(recursive=True)) == [([], "param")] + assert hex_inner_envelope.unpacked_data() == "param" + assert hex_inner_envelope.subparams == {"__default__": "param"} + expected_subparams = [([], "param")] + assert list(hex_inner_envelope.get_subparams()) == expected_subparams + for subparam, value in expected_subparams: + assert hex_inner_envelope.get_subparam(subparam) == value + assert hex_envelope.pack() == "706172616d" + assert hex_envelope.num_envelopes == 1 + assert hex_envelope.get_subparam() == "param" + hex_envelope.set_subparam(value="asdf") + assert hex_envelope.get_subparam() == "asdf" + assert hex_envelope.unpacked_data() == "asdf" + assert hex_envelope.pack() == "61736466" + + # text encoded as base64 + base64_envelope = BaseEnvelope.detect("cGFyYW0=") + assert isinstance(base64_envelope, B64Envelope) + assert base64_envelope.unpacked_data() == "param" + base64_inner_envelope = base64_envelope.unpacked_data(recursive=False) + assert isinstance(base64_inner_envelope, TextEnvelope) + assert list(base64_envelope.get_subparams(recursive=False)) == [([], base64_inner_envelope)] + assert list(base64_envelope.get_subparams()) == [([], "param")] + assert base64_inner_envelope.pack() == "param" + assert base64_inner_envelope.unpacked_data() == "param" + assert base64_inner_envelope.subparams == {"__default__": "param"} + expected_subparams = [([], "param")] + assert list(base64_inner_envelope.get_subparams()) == expected_subparams + for subparam, value in expected_subparams: + assert base64_inner_envelope.get_subparam(subparam) == value + assert base64_envelope.num_envelopes == 1 + base64_envelope.set_subparam(value="asdf") + assert base64_envelope.get_subparam() == "asdf" + assert base64_envelope.unpacked_data() == "asdf" + assert base64_envelope.pack() == "YXNkZg==" + + # test inside hex inside base64 + hex_envelope = BaseEnvelope.detect("634746795957303d") + assert isinstance(hex_envelope, HexEnvelope) + assert hex_envelope.get_subparam() == "param" + assert hex_envelope.unpacked_data() == "param" + base64_envelope = hex_envelope.unpacked_data(recursive=False) + assert isinstance(base64_envelope, B64Envelope) + assert base64_envelope.get_subparam() == "param" + assert base64_envelope.unpacked_data() == "param" + text_envelope = base64_envelope.unpacked_data(recursive=False) + assert isinstance(text_envelope, TextEnvelope) + assert text_envelope.get_subparam() == "param" + assert text_envelope.unpacked_data() == "param" + hex_envelope.set_subparam(value="asdf") + assert hex_envelope.get_subparam() == "asdf" + assert hex_envelope.unpacked_data() == "asdf" + assert text_envelope.get_subparam() == "asdf" + assert text_envelope.unpacked_data() == "asdf" + assert base64_envelope.get_subparam() == "asdf" + assert base64_envelope.unpacked_data() == "asdf" + + # URL-encoded text + url_encoded_envelope = BaseEnvelope.detect("a%20b%20c") + assert isinstance(url_encoded_envelope, URLEnvelope) + assert url_encoded_envelope.pack() == "a%20b%20c" + assert url_encoded_envelope.unpacked_data() == "a b c" + url_inner_envelope = url_encoded_envelope.unpacked_data(recursive=False) + assert isinstance(url_inner_envelope, TextEnvelope) + assert url_inner_envelope.unpacked_data(recursive=False) == "a b c" + assert url_inner_envelope.unpacked_data(recursive=True) == "a b c" + assert list(url_encoded_envelope.get_subparams(recursive=False)) == [([], url_inner_envelope)] + assert list(url_encoded_envelope.get_subparams(recursive=True)) == [([], "a b c")] + assert url_inner_envelope.pack() == "a b c" + assert url_inner_envelope.unpacked_data() == "a b c" + assert url_inner_envelope.subparams == {"__default__": "a b c"} + expected_subparams = [([], "a b c")] + assert list(url_inner_envelope.get_subparams()) == expected_subparams + for subparam, value in expected_subparams: + assert url_inner_envelope.get_subparam(subparam) == value + assert url_encoded_envelope.num_envelopes == 1 + url_encoded_envelope.set_subparam(value="a s d f") + assert url_encoded_envelope.get_subparam() == "a s d f" + assert url_encoded_envelope.unpacked_data() == "a s d f" + assert url_encoded_envelope.pack() == "a%20s%20d%20f" + + # json + json_envelope = BaseEnvelope.detect('{"param1": "val1", "param2": {"param3": "val3"}}') + assert isinstance(json_envelope, JSONEnvelope) + assert json_envelope.pack() == '{"param1": "val1", "param2": {"param3": "val3"}}' + assert json_envelope.unpacked_data() == {"param1": "val1", "param2": {"param3": "val3"}} + assert json_envelope.unpacked_data(recursive=False) == {"param1": "val1", "param2": {"param3": "val3"}} + assert json_envelope.unpacked_data(recursive=True) == {"param1": "val1", "param2": {"param3": "val3"}} + assert json_envelope.subparams == {"param1": "val1", "param2": {"param3": "val3"}} + expected_subparams = [ + (["param1"], "val1"), + (["param2", "param3"], "val3"), + ] + assert list(json_envelope.get_subparams()) == expected_subparams + for subparam, value in expected_subparams: + assert json_envelope.get_subparam(subparam) == value + json_envelope.selected_subparam = ["param2", "param3"] + assert json_envelope.get_subparam() == "val3" + assert json_envelope.num_envelopes == 1 + + # xml + xml_envelope = BaseEnvelope.detect( + 'val1val3' + ) + assert isinstance(xml_envelope, XMLEnvelope) + assert ( + xml_envelope.pack() + == '\nval1val3' + ) + assert xml_envelope.unpacked_data() == { + "root": {"param1": {"@attr": "attr1", "#text": "val1"}, "param2": {"param3": "val3"}} + } + assert xml_envelope.unpacked_data(recursive=False) == { + "root": {"param1": {"@attr": "attr1", "#text": "val1"}, "param2": {"param3": "val3"}} + } + assert xml_envelope.unpacked_data(recursive=True) == { + "root": {"param1": {"@attr": "attr1", "#text": "val1"}, "param2": {"param3": "val3"}} + } + assert xml_envelope.subparams == { + "root": {"param1": {"@attr": "attr1", "#text": "val1"}, "param2": {"param3": "val3"}} + } + expected_subparams = [ + (["root", "param1", "@attr"], "attr1"), + (["root", "param1", "#text"], "val1"), + (["root", "param2", "param3"], "val3"), + ] + assert list(xml_envelope.get_subparams()) == expected_subparams + for subparam, value in expected_subparams: + assert xml_envelope.get_subparam(subparam) == value + assert xml_envelope.num_envelopes == 1 + + # json inside base64 + base64_json_envelope = BaseEnvelope.detect("eyJwYXJhbTEiOiAidmFsMSIsICJwYXJhbTIiOiB7InBhcmFtMyI6ICJ2YWwzIn19") + assert isinstance(base64_json_envelope, B64Envelope) + assert base64_json_envelope.pack() == "eyJwYXJhbTEiOiAidmFsMSIsICJwYXJhbTIiOiB7InBhcmFtMyI6ICJ2YWwzIn19" + assert base64_json_envelope.unpacked_data() == {"param1": "val1", "param2": {"param3": "val3"}} + base64_inner_envelope = base64_json_envelope.unpacked_data(recursive=False) + assert isinstance(base64_inner_envelope, JSONEnvelope) + assert base64_inner_envelope.pack() == '{"param1": "val1", "param2": {"param3": "val3"}}' + assert base64_inner_envelope.unpacked_data() == {"param1": "val1", "param2": {"param3": "val3"}} + assert base64_inner_envelope.subparams == {"param1": "val1", "param2": {"param3": "val3"}} + expected_subparams = [ + (["param1"], "val1"), + (["param2", "param3"], "val3"), + ] + assert list(base64_json_envelope.get_subparams()) == expected_subparams + for subparam, value in expected_subparams: + assert base64_json_envelope.get_subparam(subparam) == value + assert base64_json_envelope.num_envelopes == 2 + with pytest.raises(ValueError): + assert base64_json_envelope.get_subparam() + base64_json_envelope.selected_subparam = ["param2", "param3"] + assert base64_json_envelope.get_subparam() == "val3" + + # xml inside url inside hex inside base64 + nested_xml_envelope = BaseEnvelope.detect( + "MjUzMzYzMjUzNzMyMjUzNjY2MjUzNjY2MjUzNzM0MjUzMzY1MjUzMzYzMjUzNzMwMjUzNjMxMjUzNzMyMjUzNjMxMjUzNjY0MjUzMzMxMjUzMjMwMjUzNjMxMjUzNzM0MjUzNzM0MjUzNzMyMjUzMzY0MjUzMjMyMjUzNzM2MjUzNjMxMjUzNjYzMjUzMzMxMjUzMjMyMjUzMzY1MjUzNzM2MjUzNjMxMjUzNjYzMjUzMzMxMjUzMzYzMjUzMjY2MjUzNzMwMjUzNjMxMjUzNzMyMjUzNjMxMjUzNjY0MjUzMzMxMjUzMzY1MjUzMzYzMjUzNzMwMjUzNjMxMjUzNzMyMjUzNjMxMjUzNjY0MjUzMzMyMjUzMzY1MjUzMzYzMjUzNzMwMjUzNjMxMjUzNzMyMjUzNjMxMjUzNjY0MjUzMzMzMjUzMzY1MjUzNzM2MjUzNjMxMjUzNjYzMjUzMzMzMjUzMzYzMjUzMjY2MjUzNzMwMjUzNjMxMjUzNzMyMjUzNjMxMjUzNjY0MjUzMzMzMjUzMzY1MjUzMzYzMjUzMjY2MjUzNzMwMjUzNjMxMjUzNzMyMjUzNjMxMjUzNjY0MjUzMzMyMjUzMzY1MjUzMzYzMjUzMjY2MjUzNzMyMjUzNjY2MjUzNjY2MjUzNzM0MjUzMzY1" + ) + assert isinstance(nested_xml_envelope, B64Envelope) + assert nested_xml_envelope.unpacked_data() == { + "root": {"param1": {"@attr": "val1", "#text": "val1"}, "param2": {"param3": "val3"}} + } + assert ( + nested_xml_envelope.pack() + == "MjUzMzQzMjUzMzQ2Nzg2ZDZjMjUzMjMwNzY2NTcyNzM2OTZmNmUyNTMzNDQyNTMyMzIzMTJlMzAyNTMyMzIyNTMyMzA2NTZlNjM2ZjY0Njk2ZTY3MjUzMzQ0MjUzMjMyNzU3NDY2MmQzODI1MzIzMjI1MzM0NjI1MzM0NTI1MzA0MTI1MzM0MzcyNmY2Zjc0MjUzMzQ1MjUzMzQzNzA2MTcyNjE2ZDMxMjUzMjMwNjE3NDc0NzIyNTMzNDQyNTMyMzI3NjYxNmMzMTI1MzIzMjI1MzM0NTc2NjE2YzMxMjUzMzQzMmY3MDYxNzI2MTZkMzEyNTMzNDUyNTMzNDM3MDYxNzI2MTZkMzIyNTMzNDUyNTMzNDM3MDYxNzI2MTZkMzMyNTMzNDU3NjYxNmMzMzI1MzM0MzJmNzA2MTcyNjE2ZDMzMjUzMzQ1MjUzMzQzMmY3MDYxNzI2MTZkMzIyNTMzNDUyNTMzNDMyZjcyNmY2Zjc0MjUzMzQ1" + ) + inner_hex_envelope = nested_xml_envelope.unpacked_data(recursive=False) + assert isinstance(inner_hex_envelope, HexEnvelope) + assert ( + inner_hex_envelope.pack() + == "253343253346786d6c25323076657273696f6e253344253232312e30253232253230656e636f64696e672533442532327574662d38253232253346253345253041253343726f6f74253345253343706172616d312532306174747225334425323276616c3125323225334576616c312533432f706172616d31253345253343706172616d32253345253343706172616d3325334576616c332533432f706172616d332533452533432f706172616d322533452533432f726f6f74253345" + ) + inner_url_envelope = inner_hex_envelope.unpacked_data(recursive=False) + assert isinstance(inner_url_envelope, URLEnvelope) + assert ( + inner_url_envelope.pack() + == r"%3C%3Fxml%20version%3D%221.0%22%20encoding%3D%22utf-8%22%3F%3E%0A%3Croot%3E%3Cparam1%20attr%3D%22val1%22%3Eval1%3C/param1%3E%3Cparam2%3E%3Cparam3%3Eval3%3C/param3%3E%3C/param2%3E%3C/root%3E" + ) + inner_xml_envelope = inner_url_envelope.unpacked_data(recursive=False) + assert isinstance(inner_xml_envelope, XMLEnvelope) + assert ( + inner_xml_envelope.pack() + == '\nval1val3' + ) + assert inner_xml_envelope.unpacked_data() == { + "root": {"param1": {"@attr": "val1", "#text": "val1"}, "param2": {"param3": "val3"}} + } + assert inner_xml_envelope.subparams == { + "root": {"param1": {"@attr": "val1", "#text": "val1"}, "param2": {"param3": "val3"}} + } + expected_subparams = [ + (["root", "param1", "@attr"], "val1"), + (["root", "param1", "#text"], "val1"), + (["root", "param2", "param3"], "val3"), + ] + assert list(nested_xml_envelope.get_subparams()) == expected_subparams + for subparam, value in expected_subparams: + assert nested_xml_envelope.get_subparam(subparam) == value + assert nested_xml_envelope.num_envelopes == 4 + + # manipulating text inside hex + hex_envelope = BaseEnvelope.detect("706172616d") + expected_subparams = [([], "param")] + assert list(hex_envelope.get_subparams()) == expected_subparams + for subparam, value in expected_subparams: + assert hex_envelope.get_subparam(subparam) == value + hex_envelope.set_subparam([], "asdf") + expected_subparams = [([], "asdf")] + assert list(hex_envelope.get_subparams()) == expected_subparams + for subparam, value in expected_subparams: + assert hex_envelope.get_subparam(subparam) == value + assert hex_envelope.unpacked_data() == "asdf" + + # manipulating json inside base64 + base64_json_envelope = BaseEnvelope.detect("eyJwYXJhbTEiOiAidmFsMSIsICJwYXJhbTIiOiB7InBhcmFtMyI6ICJ2YWwzIn19") + expected_subparams = [ + (["param1"], "val1"), + (["param2", "param3"], "val3"), + ] + assert list(base64_json_envelope.get_subparams()) == expected_subparams + for subparam, value in expected_subparams: + assert base64_json_envelope.get_subparam(subparam) == value + base64_json_envelope.set_subparam(["param1"], {"asdf": [None], "fdsa": 1.0}) + expected_subparams = [ + (["param1", "asdf"], [None]), + (["param1", "fdsa"], 1.0), + (["param2", "param3"], "val3"), + ] + assert list(base64_json_envelope.get_subparams()) == expected_subparams + for subparam, value in expected_subparams: + assert base64_json_envelope.get_subparam(subparam) == value + base64_json_envelope.set_subparam(["param2", "param3"], {"1234": [None], "4321": 1.0}) + expected_subparams = [ + (["param1", "asdf"], [None]), + (["param1", "fdsa"], 1.0), + (["param2", "param3", "1234"], [None]), + (["param2", "param3", "4321"], 1.0), + ] + assert list(base64_json_envelope.get_subparams()) == expected_subparams + base64_json_envelope.set_subparam(["param2"], None) + expected_subparams = [ + (["param1", "asdf"], [None]), + (["param1", "fdsa"], 1.0), + (["param2"], None), + ] + assert list(base64_json_envelope.get_subparams()) == expected_subparams + + # xml inside url inside base64 + xml_envelope = BaseEnvelope.detect( + "JTNDP3htbCUyMHZlcnNpb249JTIyMS4wJTIyJTIwZW5jb2Rpbmc9JTIydXRmLTglMjI/JTNFJTBBJTNDcm9vdCUzRSUzQ3BhcmFtMSUyMGF0dHI9JTIydmFsMSUyMiUzRXZhbDElM0MvcGFyYW0xJTNFJTNDcGFyYW0yJTNFJTNDcGFyYW0zJTNFdmFsMyUzQy9wYXJhbTMlM0UlM0MvcGFyYW0yJTNFJTNDL3Jvb3QlM0U=" + ) + assert ( + xml_envelope.pack() + == "JTNDJTNGeG1sJTIwdmVyc2lvbiUzRCUyMjEuMCUyMiUyMGVuY29kaW5nJTNEJTIydXRmLTglMjIlM0YlM0UlMEElM0Nyb290JTNFJTNDcGFyYW0xJTIwYXR0ciUzRCUyMnZhbDElMjIlM0V2YWwxJTNDL3BhcmFtMSUzRSUzQ3BhcmFtMiUzRSUzQ3BhcmFtMyUzRXZhbDMlM0MvcGFyYW0zJTNFJTNDL3BhcmFtMiUzRSUzQy9yb290JTNF" + ) + expected_subparams = [ + (["root", "param1", "@attr"], "val1"), + (["root", "param1", "#text"], "val1"), + (["root", "param2", "param3"], "val3"), + ] + assert list(xml_envelope.get_subparams()) == expected_subparams + xml_envelope.set_subparam(["root", "param1", "@attr"], "asdf") + expected_subparams = [ + (["root", "param1", "@attr"], "asdf"), + (["root", "param1", "#text"], "val1"), + (["root", "param2", "param3"], "val3"), + ] + assert list(xml_envelope.get_subparams()) == expected_subparams + assert ( + xml_envelope.pack() + == "JTNDJTNGeG1sJTIwdmVyc2lvbiUzRCUyMjEuMCUyMiUyMGVuY29kaW5nJTNEJTIydXRmLTglMjIlM0YlM0UlMEElM0Nyb290JTNFJTNDcGFyYW0xJTIwYXR0ciUzRCUyMmFzZGYlMjIlM0V2YWwxJTNDL3BhcmFtMSUzRSUzQ3BhcmFtMiUzRSUzQ3BhcmFtMyUzRXZhbDMlM0MvcGFyYW0zJTNFJTNDL3BhcmFtMiUzRSUzQy9yb290JTNF" + ) + xml_envelope.set_subparam(["root", "param2", "param3"], {"1234": [None], "4321": 1.0}) + expected_subparams = [ + (["root", "param1", "@attr"], "asdf"), + (["root", "param1", "#text"], "val1"), + (["root", "param2", "param3", "1234"], [None]), + (["root", "param2", "param3", "4321"], 1.0), + ] + assert list(xml_envelope.get_subparams()) == expected_subparams + + # null + null_envelope = BaseEnvelope.detect("null") + assert isinstance(null_envelope, JSONEnvelope) + assert null_envelope.unpacked_data() is None + assert null_envelope.pack() == "null" + expected_subparams = [([], None)] + assert list(null_envelope.get_subparams()) == expected_subparams + for subparam, value in expected_subparams: + assert null_envelope.get_subparam(subparam) == value + + tiny_base64 = BaseEnvelope.detect("YWJi") + assert isinstance(tiny_base64, TextEnvelope) diff --git a/bbot/test/test_step_2/module_tests/test_module_excavate.py b/bbot/test/test_step_2/module_tests/test_module_excavate.py index 67fb43ce18..3795260452 100644 --- a/bbot/test/test_step_2/module_tests/test_module_excavate.py +++ b/bbot/test/test_step_2/module_tests/test_module_excavate.py @@ -503,9 +503,7 @@ def check(self, module_test, events): found_select_noquotes = False for e in events: - if e.type == "WEB_PARAMETER": - if e.data["description"] == "HTTP Extracted Parameter [jqueryget] (GET jquery Submodule)": found_jquery_get = True if e.data["original_value"] == "value1": @@ -541,7 +539,10 @@ def check(self, module_test, events): if "fit" in e.data["additional_params"].keys(): found_htmltags_img = True - if e.data["description"] == "HTTP Extracted Parameter [blog-post-author-display] (POST Form Submodule)": + if ( + e.data["description"] + == "HTTP Extracted Parameter [blog-post-author-display] (POST Form Submodule)" + ): if e.data["original_value"] == "user.name": if "csrf" in e.data["additional_params"].keys(): found_select_noquotes = True @@ -558,10 +559,10 @@ def check(self, module_test, events): assert found_form_generic_original_value, "Did not extract Form (Generic) parameter original_value" assert found_htmltags_a, "Did not extract parameter(s) from a-tag" assert found_htmltags_img, "Did not extract parameter(s) from img-tag" - assert found_select_noquotes, "Did not extract parameter(s) from select-tag" + assert found_select_noquotes, "Did not extract parameter(s) from select-tag" -class TestExcavateParameterExtraction_postformnoaction(ModuleTestBase): +class TestExcavateParameterExtraction_postformnoaction(ModuleTestBase): targets = ["http://127.0.0.1:8888/"] # hunt is added as parameter extraction is only activated by one or more modules that consume WEB_PARAMETER @@ -583,7 +584,6 @@ async def setup_after_prep(self, module_test): module_test.set_expect_requests(respond_args=respond_args) def check(self, module_test, events): - excavate_getparam_extraction = False for e in events: if e.type == "WEB_PARAMETER": @@ -656,7 +656,6 @@ def check(self, module_test, events): class TestExcavateParameterExtraction_relativeurl(ModuleTestBase): - targets = ["http://127.0.0.1:8888/"] # hunt is added as parameter extraction is only activated by one or more modules that consume WEB_PARAMETER @@ -681,7 +680,6 @@ class TestExcavateParameterExtraction_relativeurl(ModuleTestBase): root_page_html = "Root page" async def setup_after_prep(self, module_test): - module_test.httpserver.expect_request("/").respond_with_data(self.primary_page_html) module_test.httpserver.expect_request("/secondary").respond_with_data(self.secondary_page_html) module_test.httpserver.expect_request("/root.html").respond_with_data(self.root_page_html) @@ -781,7 +779,6 @@ def check(self, module_test, events): class TestExcavateParameterExtraction_inputtagnovalue(ModuleTestBase): - targets = ["http://127.0.0.1:8888/"] # hunt is added as parameter extraction is only activated by one or more modules that consume WEB_PARAMETER @@ -1262,7 +1259,6 @@ class TestExcavate(ModuleTestBase): config_overrides = {"web": {"spider_distance": 1, "spider_depth": 1}} async def setup_before_prep(self, module_test): - response_data = """ ftp://ftp.test.notreal \\nhttps://www1.test.notreal @@ -1350,13 +1346,11 @@ def check(self, module_test, events): class TestExcavateHeaders_blacklist(ModuleTestBase): - targets = ["http://127.0.0.1:8888/"] modules_overrides = ["excavate", "httpx", "hunt"] config_overrides = {"web": {"spider_distance": 1, "spider_depth": 1}} async def setup_before_prep(self, module_test): - module_test.httpserver.expect_request("/").respond_with_data( "

test

", status=200, @@ -1370,7 +1364,6 @@ async def setup_before_prep(self, module_test): ) def check(self, module_test, events): - found_first_cookie = False found_second_cookie = False found_third_cookie = False diff --git a/bbot/test/test_step_2/module_tests/test_module_hunt.py b/bbot/test/test_step_2/module_tests/test_module_hunt.py index 0ce8e93537..867a2565c6 100644 --- a/bbot/test/test_step_2/module_tests/test_module_hunt.py +++ b/bbot/test/test_step_2/module_tests/test_module_hunt.py @@ -23,7 +23,6 @@ def check(self, module_test, events): class TestHunt_Multiple(TestHunt): - async def setup_after_prep(self, module_test): expect_args = {"method": "GET", "uri": "/"} respond_args = {"response_data": 'ping'} diff --git a/bbot/test/test_step_2/module_tests/test_module_lightfuzz.py b/bbot/test/test_step_2/module_tests/test_module_lightfuzz.py index a74ea74f56..4ff3a478c2 100644 --- a/bbot/test/test_step_2/module_tests/test_module_lightfuzz.py +++ b/bbot/test/test_step_2/module_tests/test_module_lightfuzz.py @@ -34,7 +34,6 @@ async def setup_after_prep(self, module_test): module_test.set_expect_requests(expect_args=expect_args, respond_args=respond_args) def request_handler(self, request): - qs = str(request.query_string.decode()) if "filename=" in qs: value = qs.split("=")[1] @@ -52,11 +51,9 @@ def request_handler(self, request): return Response("file not found", status=500) def check(self, module_test, events): - web_parameter_emitted = False pathtraversal_finding_emitted = False for e in events: - if e.type == "WEB_PARAMETER": if "HTTP Extracted Parameter [filename]" in e.data["description"]: web_parameter_emitted = True @@ -74,7 +71,6 @@ def check(self, module_test, events): # Path Traversal Absolute path class Test_Lightfuzz_path_absolute(Test_Lightfuzz_path_singledot): - etc_passwd = """ root:x:0:0:root:/root:/bin/bash daemon:x:1:1:daemon:/usr/sbin:/usr/sbin/nologin @@ -87,7 +83,6 @@ class Test_Lightfuzz_path_absolute(Test_Lightfuzz_path_singledot): """ async def setup_after_prep(self, module_test): - expect_args = {"method": "GET", "uri": "/images", "query_string": "filename=/etc/passwd"} respond_args = {"response_data": self.etc_passwd} module_test.set_expect_requests(expect_args=expect_args, respond_args=respond_args) @@ -104,7 +99,6 @@ async def setup_after_prep(self, module_test): module_test.set_expect_requests(expect_args=expect_args, respond_args=respond_args) def check(self, module_test, events): - web_parameter_emitted = False pathtraversal_finding_emitted = False for e in events: @@ -156,7 +150,6 @@ async def setup_after_prep(self, module_test): module_test.set_expect_requests_handler(expect_args=expect_args, request_handler=self.request_handler) def check(self, module_test, events): - web_parameter_emitted = False ssti_finding_emitted = False for e in events: @@ -189,7 +182,6 @@ class Test_Lightfuzz_xss(ModuleTestBase): } def request_handler(self, request): - qs = str(request.query_string.decode()) parameter_block = """ @@ -219,7 +211,6 @@ async def setup_after_prep(self, module_test): module_test.set_expect_requests_handler(expect_args=expect_args, request_handler=self.request_handler) def check(self, module_test, events): - web_parameter_emitted = False xss_finding_emitted = False for e in events: @@ -238,12 +229,8 @@ def check(self, module_test, events): # Base64 Envelope XSS Detection class Test_Lightfuzz_envelope_base64(Test_Lightfuzz_xss): def request_handler(self, request): - qs = str(request.query_string.decode()) - print("****") - print(qs) - parameter_block = """