diff --git a/bbot/core/event/base.py b/bbot/core/event/base.py
index d3cf2c2ba4..74ec0b6961 100644
--- a/bbot/core/event/base.py
+++ b/bbot/core/event/base.py
@@ -1,24 +1,20 @@
-from collections import defaultdict
import io
import re
import uuid
import json
import base64
-import asyncio
import logging
import tarfile
-import binascii
import datetime
import ipaddress
import traceback
-import xml.etree.ElementTree as ET
-from copy import copy, deepcopy
from pathlib import Path
from typing import Optional
+from copy import copy, deepcopy
from contextlib import suppress
from radixtarget import RadixTarget
-from urllib.parse import urljoin, parse_qs, unquote, quote
+from urllib.parse import urljoin, parse_qs
from pydantic import BaseModel, field_validator
@@ -44,6 +40,7 @@
validators,
get_file_extension,
)
+from bbot.core.helpers.web.envelopes import BaseEnvelope
log = logging.getLogger("bbot.core.event")
@@ -593,6 +590,10 @@ def parent(self, parent):
elif not self._dummy:
log.warning(f"Tried to set invalid parent on {self}: (got: {parent})")
+ @property
+ def children(self):
+ return []
+
@property
def parent_id(self):
parent_id = getattr(self.get_parent(), "id", None)
@@ -648,15 +649,10 @@ def get_parents(self, omit=False, include_self=False):
return parents
def clone(self):
-
# Create a shallow copy of the event first
cloned_event = copy(self)
-
- # Handle attributes that need deep copying manually
- setattr(cloned_event, "envelopes", deepcopy(self.envelopes))
-
# Re-assign a new UUID
- cloned_event.uuid = uuid.uuid4()
+ cloned_event._uuid = uuid.uuid4()
return cloned_event
def _host(self):
@@ -1329,382 +1325,45 @@ class URL_HINT(URL_UNVERIFIED):
class WEB_PARAMETER(DictHostEvent):
-
@property
- def uuid(self):
- return self._uuid
-
- @uuid.setter
- def uuid(self, value):
- self._uuid = value
-
- class ParameterEnvelopes:
-
- @staticmethod
- def preprocess_base64(base64_str):
- return base64.b64decode(base64_str).decode()
-
- @staticmethod
- def postprocess_base64(string):
- return base64.b64encode(string.encode()).decode()
-
- @staticmethod
- def preprocess_hex(hex_str):
- return bytes.fromhex(hex_str).decode()
-
- @staticmethod
- def postprocess_hex(string):
- return string.encode().hex()
-
- @staticmethod
- def preprocess_urlencoded(url_encoded_str):
- return unquote(url_encoded_str)
-
- @staticmethod
- def postprocess_urlencoded(string):
- return quote(string)
-
- @staticmethod
- def is_ascii_printable(s):
- return all(32 <= ord(char) < 127 for char in s)
-
- # Converts XML ElementTree to a JSON-like dictionary
- def xml_to_dict(self, elem):
- """
- Convert XML ElementTree to a dictionary recursively.
- """
- d = {elem.tag: {} if elem.attrib else None}
- children = list(elem)
- if children:
- dd = defaultdict(list)
- for dc in map(self.xml_to_dict, children):
- for k, v in dc.items():
- dd[k].append(v)
- d = {elem.tag: {k: v[0] if len(v) == 1 else v for k, v in dd.items()}}
- if elem.attrib:
- d[elem.tag].update(("@" + k, v) for k, v in elem.attrib.items())
- if elem.text:
- text = elem.text.strip()
- if children or elem.attrib:
- if text:
- d[elem.tag]["#text"] = text
- else:
- d[elem.tag] = text
- return d
-
- def dict_to_xml(self, d):
- """
- Converts a dictionary to an XML string without adding an extra root node.
- Assumes the dictionary was originally an XML structure.
- """
- if not isinstance(d, dict) or len(d) != 1:
- raise ValueError("Expected a dictionary with a single root element.")
-
- # Get the root element directly from the dict keys
- root_tag = list(d.keys())[0]
- root_element = ET.Element(root_tag)
-
- # Recursive function to handle nested dicts
- def _build_tree(element, subdict):
- for key, value in subdict.items():
- if isinstance(value, dict):
- # Nested element
- sub_element = ET.SubElement(element, key)
- _build_tree(sub_element, value)
- else:
- # Leaf element
- sub_element = ET.SubElement(element, key)
- sub_element.text = str(value)
-
- # Start building the tree
- _build_tree(root_element, d[root_tag])
-
- return ET.tostring(root_element, encoding="utf-8").decode("utf-8")
-
- preprocess_map = {
- "base64": preprocess_base64,
- "hex": preprocess_hex,
- "url-encoded": preprocess_urlencoded,
- }
- postprocess_map = {
- "base64": postprocess_base64,
- "hex": postprocess_hex,
- "url-encoded": postprocess_urlencoded,
- }
-
- # Format-specific functions for isolating and updating parameters
- format_isolate_map = {
- "json": lambda self: self.isolate_parameter(),
- "xml": lambda self: self.isolate_parameter(),
- }
- format_update_map = {
- "json": lambda self, value: self.update_json_parameter(value),
- "xml": lambda self, value: self.update_xml_parameter(value), # Placeholder
- }
-
- def initialize_value(self, value=None):
- self.envelopes, end_format_dict = self.recurse_envelopes(value)
- if self.envelopes:
- log.debug(f"Discovered the following envelopes: [{','.join(self.envelopes)}]")
-
- if end_format_dict is not None:
- self.end_format_type = list(end_format_dict.keys())[0]
- log.debug(f"Identified the following end format: [{self.end_format_type}]")
- self.end_format_data = list(end_format_dict.values())[0]
- else:
- self.end_format_type = None
- self.end_format_data = None
- self.end_format_subparameter = None
-
- def remove_envelopes(self, value):
- """
- Remove envelopes from the value, processing each envelope in the order it was applied.
- If the final format is present, trigger the appropriate handler (e.g., for JSON).
- """
- # Apply the preprocess functions in the order the envelopes were applied
- for env in self.envelopes:
- func = self.preprocess_map.get(env)
- if func:
- # python3.9 compatibility hack
- if isinstance(func, staticmethod):
- func = func.__get__(None, self.__class__) # Unwrap staticmethod
- value = func(value)
-
- # Dynamically select the appropriate isolate function based on the final format
- isolate_func = self.format_isolate_map.get(self.end_format_type)
- if isolate_func:
- return isolate_func(self)
- return value
-
- def add_envelopes(self, value):
- """
- Add envelopes back to the value, processing in reverse order.
- If the final format is present, trigger the appropriate handler (e.g., for JSON).
- """
- # Dynamically select the appropriate update function based on the final format
- update_func = self.format_update_map.get(self.end_format_type)
- if update_func:
- # python3.9 compatibility hack
- if isinstance(update_func, staticmethod):
- update_func = update_func.__get__(None, self.__class__)
- value = update_func(self, value)
-
- # Apply the envelopes in reverse order
- for env in self.envelopes[::-1]:
- func = self.postprocess_map.get(env)
- if func:
- # python3.9 compatibility hack
- if isinstance(func, staticmethod):
- func = func.__get__(None, self.__class__)
- value = func(value)
- return value
-
- def recurse_envelopes(self, value, envelopes=None, end_format=None):
- if envelopes is None:
- envelopes = []
- log.debug(
- f"Starting envelope recurse with value: [{value}], current envelopes: [{', '.join(envelopes)}], current end format: {end_format}"
- )
-
- if value is None or value == "" or isinstance(value, int):
- return envelopes, end_format
-
- # Try URL decoding
- try:
- decoded_url = unquote(value)
- if decoded_url != value and self.is_ascii_printable(decoded_url):
- envelopes.append("url-encoded")
- envelopes, end_format_dict = self.recurse_envelopes(decoded_url, envelopes)
- return envelopes, end_format_dict
- except Exception:
- pass # Not valid URL encoding
-
- # Try base64 decoding
- try:
- decoded_base64 = base64.b64decode(value).decode()
- if self.is_ascii_printable(decoded_base64):
- envelopes.append("base64")
- envelopes, end_format_dict = self.recurse_envelopes(decoded_base64, envelopes)
- return envelopes, end_format_dict
- except (binascii.Error, UnicodeDecodeError, ValueError):
- pass # Not valid base64
-
- # Try hex decoding
- try:
- decoded_hex = bytes.fromhex(value).decode("utf-8")
- if self.is_ascii_printable(decoded_hex):
- envelopes.append("hex")
- envelopes, end_format_dict = self.recurse_envelopes(decoded_hex, envelopes)
- return envelopes, end_format_dict
- except (ValueError, UnicodeDecodeError):
- pass # Not valid hex
-
- # Try JSON parsing
- try:
- decoded_json = json.loads(value)
- if isinstance(decoded_json, dict):
- return envelopes, {"json": decoded_json}
- except json.JSONDecodeError:
- pass # Not valid JSON
+ def children(self):
+ # if we have any subparams, raise a new WEB_PARAMETER for each one
+ children = []
+ envelopes = getattr(self, "envelopes", None)
+ if envelopes is not None:
+ subparams = sorted(list(self.envelopes.get_subparams()))
+
+ if envelopes.selected_subparam is None:
+ current_subparam = subparams[0]
+ envelopes.selected_subparam = current_subparam[0]
+ if len(subparams) > 1:
+ for subparam, _ in subparams[1:]:
+ clone = self.clone()
+ clone.envelopes = deepcopy(envelopes)
+ clone.envelopes.selected_subparam = subparam
+ clone.parent = self
+ children.append(clone)
+ return children
- # Try XML parsing
+ def sanitize_data(self, data):
+ original_value = data.get("original_value", None)
+ if original_value is not None:
try:
- decoded_xml = ET.fromstring(value)
- # Pass 'decoded_xml' to 'xml_to_dict'
- xml_dict = self.xml_to_dict(decoded_xml) # Pass decoded XML as the 'elem' argument
- return envelopes, {"xml": xml_dict} # Store as JSON-like dict, not XML
- except ET.ParseError:
- pass # Not valid XML
-
- return envelopes, end_format
-
- def isolate_parameter(self):
- """
- Isolate the specified subparameter from the data structure (JSON/XML).
- The subparameter is accessed using dot notation for nested keys.
- """
- if self.end_format_data and self.end_format_subparameter:
- # Split the dot notation string into keys
- keys = self.end_format_subparameter.split(".")
-
- # Traverse the nested structure using the keys
- subparameter_value = self.end_format_data
- for key in keys:
- if isinstance(subparameter_value, dict):
- subparameter_value = subparameter_value.get(key)
- else:
- # If the structure is broken (not a dict), return None
- return None
-
- return subparameter_value
-
- return None
-
- def update_json_parameter(self, new_value):
- """
- Update the specified subparameter in the JSON structure and rebuild it.
- """
- # Work with a copy to avoid modifying the original `end_format_data`
- end_format_data_copy = deepcopy(self.end_format_data)
-
- if end_format_data_copy:
- end_format_data_copy[self.end_format_subparameter] = new_value
- return json.dumps(end_format_data_copy)
- return new_value
-
- def update_xml_parameter(self, new_value):
- """
- Convert the JSON-like structure back into an XML string after updating the specific parameter.
- """
- if self.end_format_data and self.end_format_subparameter:
- # Split the dot notation into keys
- keys = self.end_format_subparameter.split(".")
-
- # Traverse the nested dictionary using the keys to find the target subparameter
- current_data = self.end_format_data
- for key in keys[:-1]: # Traverse up to the second-to-last key
- current_data = current_data.get(key, {})
-
- # Update the target subparameter with the new value
- if isinstance(current_data, dict):
- current_data[keys[-1]] = new_value
-
- # Convert the JSON-like dict back to an XML string
- return self.dict_to_xml(self.end_format_data)
-
- return new_value
-
- def to_dict(self):
- return {
- "envelopes": self.envelopes,
- "end_format_type": self.end_format_type,
- "end_format_data": self.end_format_data,
- "end_format_subparameter": self.end_format_subparameter,
- }
-
- def __getstate__(self):
- return self.to_dict()
-
- def __str__(self):
- return f"ParameterEnvelopes(envelopes={self.envelopes}, end_format_type={self.end_format_type}, end_format_data={self.end_format_data}, end_format_subparameter={self.end_format_subparameter})"
-
- __repr__ = __str__
-
- @classmethod
- def from_dict(cls, data):
- instance = cls()
- instance.envelopes = data.get("envelopes", [])
- instance.end_format_type = data.get("end_format_type")
- instance.end_format_data = data.get("end_format_data")
- instance.end_format_subparameter = data.get("end_format_subparameter")
- return instance
-
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- if "original_value" in self.data.keys():
- parameterEnvelope_instance = self.ParameterEnvelopes()
- parameterEnvelope_instance.initialize_value(self.data["original_value"])
- setattr(self, "envelopes", parameterEnvelope_instance)
-
- envelopes = getattr(self, "envelopes", None)
- if (
- envelopes is not None
- and getattr(envelopes, "end_format_type", None) is not None
- and getattr(envelopes, "end_format_data", None)
- ):
- end_format_data = envelopes.end_format_data
-
- def extract_keys_with_values(data, parent_key=""):
- """
- Recursively extract all keys from nested dictionaries that have values (non-empty).
- Construct a path-like structure with dot notation (e.g., 'find.search').
- """
- keys = []
- if isinstance(data, dict):
- for key, value in data.items():
- # Construct the full key path using dot notation
- full_key = f"{parent_key}.{key}" if parent_key else key
-
- # Only add keys that have non-empty values
- if value:
- if isinstance(value, dict):
- # Recursively check nested dictionaries
- keys.extend(extract_keys_with_values(value, full_key))
- else:
- # Add the key if it has a non-empty value
- keys.append(full_key)
- return keys
-
- # Extract all keys that have non-empty values
- end_format_data_keys = extract_keys_with_values(end_format_data)
- # If there are keys, assign the first key to end_format_subparameter
- if end_format_data_keys:
-
- # Assign the first key to end_format_subparameter
- setattr(envelopes, "end_format_subparameter", end_format_data_keys[0])
- setattr(envelopes, "end_format_subparameter", end_format_data_keys[0])
-
- # Iterate through the remaining keys, starting from the second one
- for p in end_format_data_keys[1:]:
- log.debug(f"generating copy of event for subparameter {p} of type {envelopes.end_format_type}")
-
- # Make a copy of the current event data
- cloned_event = self.clone()
- cloned_envelopes = getattr(cloned_event, "envelopes")
- cloned_envelopes.end_format_subparameter = p
- asyncio.run_coroutine_threadsafe(
- self.module.emit_event(cloned_event), asyncio.get_event_loop()
- )
+ envelopes = BaseEnvelope.detect(original_value)
+ setattr(self, "envelopes", envelopes)
+ except ValueError as e:
+ log.verbose(f"Error detecting envelopes for {self}: {e}")
+ return data
def _data_id(self):
# dedupe by url:name:param_type
url = self.data.get("url", "")
name = self.data.get("name", "")
param_type = self.data.get("type", "")
- envelopes = getattr(self, "envelopes", None)
- subparameter = getattr(envelopes, "end_format_subparameter", "") if envelopes else ""
+ envelopes = getattr(self, "envelopes", "")
+ subparam = getattr(envelopes, "selected_subparam", "")
- return f"{url}:{name}:{param_type}:{subparameter}"
+ return f"{url}:{name}:{param_type}:{subparam}"
def _outgoing_dedup_hash(self, event):
return hash(
diff --git a/bbot/core/helpers/misc.py b/bbot/core/helpers/misc.py
index 7c6c8a0738..ced61925ce 100644
--- a/bbot/core/helpers/misc.py
+++ b/bbot/core/helpers/misc.py
@@ -2867,3 +2867,15 @@ def clean_requirement(req_string):
dist = distribution("bbot")
return [clean_requirement(r) for r in dist.requires]
+
+
+def is_printable(s):
+ """
+ Check if a string is printable
+ """
+ if not isinstance(s, str):
+ raise ValueError(f"Expected a string, got {type(s)}")
+
+ # Exclude control characters that break display/printing
+ s = set(s)
+ return all(ord(c) >= 32 or c in "\t\n\r" for c in s)
diff --git a/bbot/core/helpers/web/client.py b/bbot/core/helpers/web/client.py
index 83154e5aec..49ddf532be 100644
--- a/bbot/core/helpers/web/client.py
+++ b/bbot/core/helpers/web/client.py
@@ -85,7 +85,6 @@ def __init__(self, *args, **kwargs):
self._cookies = DummyCookies()
def build_request(self, *args, **kwargs):
-
if args:
url = args[0]
kwargs["url"] = url
diff --git a/bbot/core/helpers/web/engine.py b/bbot/core/helpers/web/engine.py
index e0f63cb052..8ffdbe966f 100644
--- a/bbot/core/helpers/web/engine.py
+++ b/bbot/core/helpers/web/engine.py
@@ -50,7 +50,6 @@ def AsyncClient(self, *args, **kwargs):
return client
async def request(self, *args, **kwargs):
-
raise_error = kwargs.pop("raise_error", False)
# TODO: use this
cache_for = kwargs.pop("cache_for", None) # noqa
@@ -75,7 +74,6 @@ async def request(self, *args, **kwargs):
client_kwargs = {}
for k in list(kwargs):
if k in self.client_only_options:
-
v = kwargs.pop(k)
client_kwargs[k] = v
diff --git a/bbot/core/helpers/web/envelopes.py b/bbot/core/helpers/web/envelopes.py
new file mode 100644
index 0000000000..c000c5d1c0
--- /dev/null
+++ b/bbot/core/helpers/web/envelopes.py
@@ -0,0 +1,348 @@
+import json
+import base64
+import binascii
+import xmltodict
+from contextlib import suppress
+from urllib.parse import unquote, quote
+from xml.parsers.expat import ExpatError
+
+from bbot.core.helpers.misc import is_printable
+
+
+# TODO: This logic is perfect for extracting params. We should expand it outwards to include other higher-level envelopes:
+# - QueryStringEnvelope
+# - MultipartFormEnvelope
+# - HeaderEnvelope
+# - CookieEnvelope
+#
+# Once we start ingesting HTTP_REQUEST events, this will make them instantly fuzzable
+
+
+class EnvelopeChildTracker(type):
+ """
+ Keeps track of all the child envelope classes
+ """
+
+ children = []
+
+ def __new__(mcs, name, bases, class_dict):
+ # Create the class
+ cls = super().__new__(mcs, name, bases, class_dict)
+ # Don't register the base class itself
+ if bases and not name.startswith("Base"): # Only register if it has base classes (i.e., is a child)
+ EnvelopeChildTracker.children.append(cls)
+ EnvelopeChildTracker.children.sort(key=lambda x: x.priority)
+ return cls
+
+
+class BaseEnvelope(metaclass=EnvelopeChildTracker):
+ __slots__ = ["subparams", "selected_subparam", "singleton"]
+
+ # determines the order of the envelope detection
+ priority = 5
+ # whether the envelope is the final format, e.g. raw text/binary
+ end_format = False
+ ignore_exceptions = (Exception,)
+ envelope_classes = EnvelopeChildTracker.children
+ # transparent envelopes (i.e. TextEnvelope) are not counted as envelopes or included in the finding descriptions
+ transparent = False
+
+ def __init__(self, s):
+ unpacked_data = self.unpack(s)
+
+ if self.end_format:
+ inner_envelope = unpacked_data
+ else:
+ inner_envelope = self.detect(unpacked_data)
+
+ self.selected_subparam = None
+ # if we have subparams, our inner envelope will be a dictionary
+ if isinstance(inner_envelope, dict):
+ self.subparams = inner_envelope
+ self.singleton = False
+ # otherwise if we just have one value, we make a dictionary with a default key
+ else:
+ self.subparams = {"__default__": inner_envelope}
+ self.singleton = True
+
+ @property
+ def final_envelope(self):
+ try:
+ return self.unpacked_data(recursive=False).final_envelope
+ except AttributeError:
+ return self
+
+ @property
+ def friendly_name(self):
+ if self.friendly_name:
+ return self.friendly_name
+ else:
+ return self.name
+
+ def pack(self, data=None):
+ if data is None:
+ data = self.unpacked_data(recursive=False)
+ with suppress(AttributeError):
+ data = data.pack()
+ return self._pack(data)
+
+ def unpack(self, s):
+ return self._unpack(s)
+
+ def _pack(self, s):
+ """
+ Encodes the string using the class's unique encoder (adds the outer envelope)
+ """
+ raise NotImplementedError("Envelope.pack() must be implemented")
+
+ def _unpack(self, s):
+ """
+ Decodes the string using the class's unique encoder (removes the outer envelope)
+ """
+ raise NotImplementedError("Envelope.unpack() must be implemented")
+
+ def unpacked_data(self, recursive=True):
+ try:
+ unpacked = self.subparams["__default__"]
+ if recursive:
+ with suppress(AttributeError):
+ return unpacked.unpacked_data(recursive=recursive)
+ return unpacked
+ except KeyError:
+ return self.subparams
+
+ @classmethod
+ def detect(cls, s):
+ """
+ Detects the type of envelope used to encode the packed_data
+ """
+ if not isinstance(s, str):
+ raise ValueError(f"Invalid data passed to detect(): {s} ({type(s)})")
+ # if the value is empty, we just return the text envelope
+ if not s.strip():
+ return TextEnvelope(s)
+ for envelope_class in cls.envelope_classes:
+ with suppress(*envelope_class.ignore_exceptions):
+ envelope = envelope_class(s)
+ if envelope is not False:
+ return envelope
+ del envelope
+ raise Exception(f"No envelope detected for data: '{s}' ({type(s)})")
+
+ def get_subparams(self, key=None, data=None, recursive=True):
+ if data is None:
+ data = self.unpacked_data(recursive=recursive)
+ if key is None:
+ key = []
+
+ if isinstance(data, dict):
+ for k, v in data.items():
+ full_key = key + [k]
+ if isinstance(v, dict):
+ yield from self.get_subparams(full_key, v)
+ else:
+ yield full_key, v
+ else:
+ yield [], data
+
+ def get_subparam(self, key=None, recursive=True):
+ if key is None:
+ key = self.selected_subparam
+ envelope = self
+ if recursive:
+ envelope = self.final_envelope
+ data = envelope.unpacked_data(recursive=False)
+ if key is None:
+ if envelope.singleton:
+ key = []
+ else:
+ raise ValueError("No subparam selected")
+ else:
+ for segment in key:
+ data = data[segment]
+ return data
+
+ def set_subparam(self, key=None, value=None, recursive=True):
+ envelope = self
+ if recursive:
+ envelope = self.final_envelope
+
+ # if there's only one value to set, we can just set it directly
+ if envelope.singleton:
+ envelope.subparams["__default__"] = value
+ return
+
+ # if key isn't specified, use the selected subparam
+ if key is None:
+ key = self.selected_subparam
+ if key is None:
+ raise ValueError(f"{self} -> {envelope}: No subparam selected")
+
+ data = envelope.unpacked_data(recursive=False)
+ for segment in key[:-1]:
+ data = data[segment]
+ data[key[-1]] = value
+
+ @property
+ def name(self):
+ return self.__class__.__name__
+
+ @property
+ def num_envelopes(self):
+ num_envelopes = 0 if self.transparent else 1
+ if self.end_format:
+ return num_envelopes
+ for envelope in self.subparams.values():
+ with suppress(AttributeError):
+ num_envelopes += envelope.num_envelopes
+ return num_envelopes
+
+ @property
+ def summary(self):
+ if self.transparent:
+ return ""
+ self_string = f"{self.friendly_name}"
+ with suppress(AttributeError):
+ child_envelope = self.unpacked_data(recursive=False)
+ child_summary = child_envelope.summary
+ if child_summary:
+ self_string += f" -> {child_summary}"
+
+ if self.selected_subparam:
+ self_string += f" [{'.'.join(self.selected_subparam)}]"
+ return self_string
+
+ def to_dict(self):
+ return self.summary
+
+ def __str__(self):
+ return self.summary
+
+ __repr__ = __str__
+
+
+class HexEnvelope(BaseEnvelope):
+ """
+ Hexadecimal encoding
+ """
+
+ friendly_name = "Hexadecimal-Encoded"
+
+ ignore_exceptions = (ValueError, UnicodeDecodeError)
+
+ def _pack(self, s):
+ return s.encode().hex()
+
+ def _unpack(self, s):
+ return bytes.fromhex(s).decode()
+
+
+class B64Envelope(BaseEnvelope):
+ """
+ Base64 encoding
+ """
+
+ friendly_name = "Base64-Encoded"
+
+ ignore_exceptions = (binascii.Error, UnicodeDecodeError, ValueError)
+
+ def unpack(self, s):
+ # it's easy to have a small value that accidentally decodes to base64
+ if len(s) < 8 and not s.endswith("="):
+ raise ValueError("Data is too small to be sure")
+ return super().unpack(s)
+
+ def _pack(self, s):
+ return base64.b64encode(s.encode()).decode()
+
+ def _unpack(self, s):
+ return base64.b64decode(s).decode()
+
+
+class URLEnvelope(BaseEnvelope):
+ """
+ URL encoding
+ """
+
+ friendly_name = "URL-Encoded"
+
+ def unpack(self, s):
+ unpacked = super().unpack(s)
+ if unpacked == s:
+ raise Exception("Data is not URL-encoded")
+ return unpacked
+
+ def _pack(self, s):
+ return quote(s)
+
+ def _unpack(self, s):
+ return unquote(s)
+
+
+class TextEnvelope(BaseEnvelope):
+ """
+ Text encoding
+ """
+
+ end_format = True
+ # lowest priority means text is the ultimate fallback
+ priority = 10
+ transparent = True
+ ignore_exceptions = ()
+
+ def _pack(self, s):
+ return s
+
+ def _unpack(self, s):
+ if not is_printable(s):
+ raise ValueError(f"Non-printable data detected in TextEnvelope: '{s}' ({type(s)})")
+ return s
+
+
+# class BinaryEnvelope(BaseEnvelope):
+# """
+# Binary encoding
+# """
+# end_format = True
+
+# def pack(self, s):
+# return s
+
+# def unpack(self, s):
+# if is_printable(s):
+# raise Exception("Non-binary data detected in BinaryEnvelope")
+# return s
+
+
+class JSONEnvelope(BaseEnvelope):
+ """
+ JSON encoding
+ """
+
+ friendly_name = "JSON-formatted"
+ end_format = True
+ priority = 8
+ ignore_exceptions = (json.JSONDecodeError,)
+
+ def _pack(self, s):
+ return json.dumps(s)
+
+ def _unpack(self, s):
+ return json.loads(s)
+
+
+class XMLEnvelope(BaseEnvelope):
+ """
+ XML encoding
+ """
+
+ friendly_name = "XML-formatted"
+ end_format = True
+ priority = 9
+ ignore_exceptions = (ExpatError,)
+
+ def _pack(self, s):
+ return xmltodict.unparse(s)
+
+ def _unpack(self, s):
+ return xmltodict.parse(s)
diff --git a/bbot/modules/base.py b/bbot/modules/base.py
index 1fa151c33b..4ab2da1528 100644
--- a/bbot/modules/base.py
+++ b/bbot/modules/base.py
@@ -528,8 +528,9 @@ async def emit_event(self, *args, **kwargs):
if v is not None:
emit_kwargs[o] = v
event = self.make_event(*args, **event_kwargs)
- if event:
- await self.queue_outgoing_event(event, **emit_kwargs)
+ children = event.children
+ for e in [event] + children:
+ await self.queue_outgoing_event(e, **emit_kwargs)
return event
async def _events_waiting(self, batch_size=None):
diff --git a/bbot/modules/internal/dnsresolve.py b/bbot/modules/internal/dnsresolve.py
index c746b03451..5bb5c5bc40 100644
--- a/bbot/modules/internal/dnsresolve.py
+++ b/bbot/modules/internal/dnsresolve.py
@@ -83,9 +83,14 @@ async def handle_event(self, event, **kwargs):
event_data_changed = await self.handle_wildcard_event(main_host_event)
if event_data_changed:
# since data has changed, we check again whether it's a duplicate
- if event.type == "DNS_NAME" and self.scan.ingress_module.is_incoming_duplicate(event, add=True):
+ if event.type == "DNS_NAME" and self.scan.ingress_module.is_incoming_duplicate(
+ event, add=True
+ ):
if not event._graph_important:
- return False, "it's a DNS wildcard, and its module already emitted a similar wildcard event"
+ return (
+ False,
+ "it's a DNS wildcard, and its module already emitted a similar wildcard event",
+ )
else:
self.debug(
f"Event {event} was already emitted by its module, but it's graph-important so it gets a pass"
diff --git a/bbot/modules/internal/excavate.py b/bbot/modules/internal/excavate.py
index a61d2f8ba8..1ba79018ca 100644
--- a/bbot/modules/internal/excavate.py
+++ b/bbot/modules/internal/excavate.py
@@ -460,10 +460,8 @@ def extract(self):
# check to see if the format is defined as JSON
if "content_type" in extracted_values.keys():
if extracted_values["content_type"] == "application/json":
-
# If we cant figure out the parameter names, there is no point in continuing
if "data" in extracted_values.keys():
-
if "url" in extracted_values.keys():
form_url = extracted_values["url"]
else:
@@ -481,8 +479,12 @@ def extract(self):
form_parameters[p] = None
for parameter_name in form_parameters:
- yield "BODYJSON", parameter_name, None, form_url, _exclude_key(
- form_parameters, parameter_name
+ yield (
+ "BODYJSON",
+ parameter_name,
+ None,
+ form_url,
+ _exclude_key(form_parameters, parameter_name),
)
class GetForm(ParameterExtractorRule):
@@ -503,7 +505,6 @@ class GetForm(ParameterExtractorRule):
def extract(self):
forms = self.extraction_regex.findall(str(self.result))
for form_action, form_content in forms:
-
if not form_action or form_action == "#":
form_action = None
@@ -514,7 +515,6 @@ def extract(self):
for form_content_regex_name, form_content_regex in self.form_content_regexes.items():
input_tags = form_content_regex.findall(form_content)
if input_tags:
-
if form_content_regex_name == "input_tag_novalue_regex":
form_parameters[input_tags[0]] = None
@@ -530,7 +530,7 @@ def extract(self):
self.output_type,
parameter_name,
original_value,
- form_action,
+ form_action,
_exclude_key(form_parameters, parameter_name),
)
@@ -762,8 +762,10 @@ async def process(self, yara_results, event, yara_rule_settings, discovery_conte
continue
if parsed_url.scheme in ["http", "https"]:
continue
+
def abort_if(e):
return e.scope_distance > 0
+
finding_data = {"host": str(host), "description": f"Non-HTTP URI: {parsed_url.geturl()}"}
await self.report(finding_data, event, yara_rule_settings, discovery_context, abort_if=abort_if)
protocol_data = {"protocol": parsed_url.scheme, "host": str(host)}
@@ -998,6 +1000,8 @@ async def setup(self):
return True
async def search(self, data, event, content_type, discovery_context="HTTP response"):
+ # TODO: replace this JSON/XML extraction with our lightfuzz envelope stuff
+
if not data:
return None
decoded_data = await self.helpers.re.recursive_decode(data)
@@ -1089,7 +1093,6 @@ async def handle_event(self, event):
# If parameter_extraction is enabled and we assigned custom headers, emit them as WEB_PARAMETER
if self.parameter_extraction is True:
-
custom_cookies = self.scan.web_config.get("http_cookies", {})
for custom_cookie_name, custom_cookie_value in custom_cookies.items():
description = f"HTTP Extracted Parameter [{custom_cookie_name}] (Custom Cookie)"
diff --git a/bbot/modules/lightfuzz.py b/bbot/modules/lightfuzz.py
index c61dbb4b25..ee3f04b20b 100644
--- a/bbot/modules/lightfuzz.py
+++ b/bbot/modules/lightfuzz.py
@@ -130,13 +130,10 @@ async def run_submodule(self, submodule, event):
event_data = {"host": str(event.host), "url": event.data["url"], "description": r["description"]}
envelopes = getattr(event, "envelopes", None)
- if envelopes and envelopes.envelopes:
- envelope_summary = f'[{"->".join(envelopes.envelopes)}]'
- if envelopes.end_format_type:
- envelope_summary += f" Format: [{envelopes.end_format_type}] with subparameter [{envelopes.end_format_subparameter}])"
-
+ envelope_summary = getattr(envelopes, "summary", None)
+ if envelope_summary:
# Append the envelope summary to the description
- event_data["description"] += f" Envelopes: {envelope_summary}"
+ event_data["description"] += f" Envelopes: [{envelope_summary}]"
if r["type"] == "VULNERABILITY":
event_data["severity"] = r["severity"]
@@ -147,10 +144,8 @@ async def run_submodule(self, submodule, event):
)
async def handle_event(self, event):
-
if event.type == "URL":
if self.config.get("force_common_headers", False) is False:
-
return False
for h in self.common_headers:
@@ -166,7 +161,6 @@ async def handle_event(self, event):
await self.emit_event(data, "WEB_PARAMETER", event)
elif event.type == "WEB_PARAMETER":
-
# check connectivity to url
connectivity_test = await self.helpers.request(event.data["url"], timeout=10)
@@ -199,5 +193,5 @@ async def finish(self):
async def filter_event(self, event):
if event.type == "WEB_PARAMETER" and self.disable_post and event.data["type"] == "POSTPARAM":
- return False, "POST parameter disabled in lilghtfuzz module"
+ return False, "POST parameter disabled in lightfuzz module"
return True
diff --git a/bbot/modules/lightfuzz_submodules/base.py b/bbot/modules/lightfuzz_submodules/base.py
index 313de8c16f..32d6d80336 100644
--- a/bbot/modules/lightfuzz_submodules/base.py
+++ b/bbot/modules/lightfuzz_submodules/base.py
@@ -19,7 +19,7 @@ def additional_params_process(self, additional_params, additional_params_populat
return new_additional_params
async def send_probe(self, probe):
- probe = self.probe_value_outgoing(probe)
+ probe = self.outgoing_probe_value(probe)
getparams = {self.event.data["name"]: probe}
url = self.lightfuzz.helpers.add_get_params(self.event.data["url"], getparams, encode=False).geturl()
self.lightfuzz.debug(f"lightfuzz sending probe with URL: {url}")
@@ -30,7 +30,7 @@ async def send_probe(self, probe):
def compare_baseline(
self, event_type, probe, cookies, additional_params_populate_empty=False, speculative_mode="GETPARAM"
):
- probe = self.probe_value_outgoing(probe)
+ probe = self.outgoing_probe_value(probe)
http_compare = None
if event_type == "SPECULATIVE":
@@ -104,8 +104,7 @@ async def compare_probe(
additional_params_override={},
speculative_mode="GETPARAM",
):
-
- probe = self.probe_value_outgoing(probe)
+ probe = self.outgoing_probe_value(probe)
additional_params = copy.deepcopy(self.event.data.get("additional_params", {}))
if additional_params_override:
for k, v in additional_params_override.items():
@@ -151,7 +150,7 @@ async def standard_probe(
speculative_mode="GETPARAM",
allow_redirects=False,
):
- probe = self.probe_value_outgoing(probe)
+ probe = self.outgoing_probe_value(probe)
if event_type == "SPECULATIVE":
event_type = speculative_mode
@@ -211,7 +210,6 @@ async def standard_probe(
def metadata(self):
-
metadata_string = f"Parameter: [{self.event.data['name']}] Parameter Type: [{self.event.data['type']}]"
if self.event.data["original_value"] != "" and self.event.data["original_value"] is not None:
metadata_string += (
@@ -219,21 +217,29 @@ def metadata(self):
)
return metadata_string
- def probe_value_incoming(self, populate_empty=True):
- probe_value = self.event.data.get("original_value", "")
- if (probe_value is None or len(str(probe_value)) == 0) and populate_empty is True:
- probe_value = self.lightfuzz.helpers.rand_string(10, numeric_only=True)
- self.lightfuzz.debug(f"probe_value_incoming (before modification): {probe_value}")
- envelopes_instance = getattr(self.event, "envelopes", None)
- probe_value = envelopes_instance.remove_envelopes(probe_value)
- self.lightfuzz.debug(f"probe_value_incoming (after modification): {probe_value}")
- if not isinstance(probe_value, str):
- probe_value = str(probe_value)
+ def incoming_probe_value(self, populate_empty=True):
+ envelopes = getattr(self.event, "envelopes", None)
+ probe_value = ""
+ if envelopes is not None:
+ probe_value = envelopes.get_subparam()
+ self.lightfuzz.debug(f"incoming_probe_value (after unpacking): {probe_value} with envelopes [{envelopes}]")
+ if not probe_value:
+ if populate_empty is True:
+ probe_value = self.lightfuzz.helpers.rand_string(10, numeric_only=True)
+ else:
+ probe_value = ""
+ # if not isinstance(probe_value, str):
+ # raise ValueError(
+ # f"incoming_probe_value should always be a string (got {type(probe_value)} / {probe_value})"
+ # )
+ probe_value = str(probe_value)
return probe_value
- def probe_value_outgoing(self, outgoing_probe_value):
- self.lightfuzz.debug(f"probe_value_outgoing (before modification): {outgoing_probe_value}")
- envelopes_instance = getattr(self.event, "envelopes", None)
- outgoing_probe_value = envelopes_instance.add_envelopes(outgoing_probe_value)
- self.lightfuzz.debug(f"probe_value_outgoing (after modification): {outgoing_probe_value}")
+ def outgoing_probe_value(self, outgoing_probe_value):
+ self.lightfuzz.debug(f"outgoing_probe_value (before packing): {outgoing_probe_value} / {self.event}")
+ envelopes = getattr(self.event, "envelopes", None)
+ if envelopes is not None:
+ envelopes.set_subparam(value=outgoing_probe_value)
+ outgoing_probe_value = envelopes.pack()
+ self.lightfuzz.debug(f"outgoing_probe_value (after packing): {outgoing_probe_value} with envelopes [{envelopes}] / {self.event}")
return outgoing_probe_value
diff --git a/bbot/modules/lightfuzz_submodules/cmdi.py b/bbot/modules/lightfuzz_submodules/cmdi.py
index b9dbb27645..57acfdbb5b 100644
--- a/bbot/modules/lightfuzz_submodules/cmdi.py
+++ b/bbot/modules/lightfuzz_submodules/cmdi.py
@@ -5,11 +5,9 @@
class CmdILightfuzz(BaseLightfuzz):
-
async def fuzz(self):
-
cookies = self.event.data.get("assigned_cookies", {})
- probe_value = self.probe_value_incoming()
+ probe_value = self.incoming_probe_value()
canary = self.lightfuzz.helpers.rand_string(10, numeric_only=True)
http_compare = self.compare_baseline(self.event.data["type"], probe_value, cookies)
@@ -31,7 +29,6 @@ async def fuzz(self):
echo_probe = urllib.parse.quote(echo_probe.encode(), safe="")
cmdi_probe = await self.compare_probe(http_compare, self.event.data["type"], echo_probe, cookies)
if cmdi_probe[3]:
-
if canary in cmdi_probe[3].text and "echo" not in cmdi_probe[3].text:
self.lightfuzz.debug(f"canary [{canary}] found in response when sending probe [{p}]")
if p == "AAAA":
diff --git a/bbot/modules/lightfuzz_submodules/crypto.py b/bbot/modules/lightfuzz_submodules/crypto.py
index 4602713824..3ebb3bacd1 100644
--- a/bbot/modules/lightfuzz_submodules/crypto.py
+++ b/bbot/modules/lightfuzz_submodules/crypto.py
@@ -6,7 +6,6 @@
class CryptoLightfuzz(BaseLightfuzz):
-
@staticmethod
def is_hex(s):
try:
@@ -21,7 +20,6 @@ def is_base64(s):
if base64.b64encode(base64.b64decode(s)).decode() == s:
return True
except Exception:
-
return False
return False
@@ -75,7 +73,6 @@ def format_agnostic_encode(data, encoding, urlencode=False):
@staticmethod
def modify_string(input_string, action="truncate", position=None, extension_length=1):
-
if not isinstance(input_string, str):
input_string = str(input_string)
@@ -136,7 +133,7 @@ async def padding_oracle_execute(self, original_data, encoding, block_size, cook
paddingblock = b"\x00" * block_size
datablock = original_data[-block_size:]
if possible_first_byte:
- baseline_byte = b"\xFF"
+ baseline_byte = b"\xff"
starting_pos = 0
else:
baseline_byte = b"\x00"
@@ -148,7 +145,6 @@ async def padding_oracle_execute(self, original_data, encoding, block_size, cook
)
differ_count = 0
for i in range(starting_pos, starting_pos + 254):
-
byte = bytes([i])
oracle_probe = await self.compare_probe(
baseline,
@@ -176,7 +172,6 @@ async def padding_oracle(self, probe_value, cookies):
possible_block_sizes = self.possible_block_sizes(len(data))
for block_size in possible_block_sizes:
-
padding_oracle_result = await self.padding_oracle_execute(data, encoding, block_size, cookies)
if padding_oracle_result is None:
self.lightfuzz.debug(
@@ -198,7 +193,6 @@ async def padding_oracle(self, probe_value, cookies):
)
async def error_string_search(self, text_dict, baseline_text):
-
matching_techniques = set()
matching_strings = set()
@@ -238,8 +232,10 @@ def identify_hash_function(hash_bytes):
return hash_functions[hash_length]
async def fuzz(self):
+
cookies = self.event.data.get("assigned_cookies", {})
- probe_value = self.probe_value_incoming(populate_empty=False)
+ probe_value = self.incoming_probe_value(populate_empty=False)
+
if not probe_value:
self.lightfuzz.debug(
f"The Cryptography Probe Submodule requires original value, aborting [{self.event.data['type']}] [{self.event.data['name']}]"
@@ -256,7 +252,7 @@ async def fuzz(self):
mutate_probe_value = self.modify_string(probe_value, action="mutate")
except ValueError as e:
self.lightfuzz.debug(
- f"Encountered error modifying value for parameter {self.event.data['name']}: {e} , aborting"
+ f"Encountered error modifying value for parameter [{self.event.data['name']}]: {e} , aborting"
)
return
@@ -311,7 +307,6 @@ async def fuzz(self):
if confirmed_techniques or (
"padding" in truncate_probe[3].text.lower() or "padding" in mutate_probe[3].text.lower()
):
-
# Padding Oracle Test
if possible_block_cipher:
diff --git a/bbot/modules/lightfuzz_submodules/path.py b/bbot/modules/lightfuzz_submodules/path.py
index b1c4d710e3..827af65f2f 100644
--- a/bbot/modules/lightfuzz_submodules/path.py
+++ b/bbot/modules/lightfuzz_submodules/path.py
@@ -6,10 +6,9 @@
class PathTraversalLightfuzz(BaseLightfuzz):
-
async def fuzz(self):
cookies = self.event.data.get("assigned_cookies", {})
- probe_value = self.probe_value_incoming(populate_empty=False)
+ probe_value = self.incoming_probe_value(populate_empty=False)
if not probe_value:
self.lightfuzz.debug(
f"Path Traversal detection requires original value, aborting [{self.event.data['type']}] [{self.event.data['name']}]"
diff --git a/bbot/modules/lightfuzz_submodules/serial.py b/bbot/modules/lightfuzz_submodules/serial.py
index a45940d186..e6cf2da765 100644
--- a/bbot/modules/lightfuzz_submodules/serial.py
+++ b/bbot/modules/lightfuzz_submodules/serial.py
@@ -26,7 +26,7 @@ async def fuzz(self):
"java.io.optionaldataexception",
]
- probe_value = self.probe_value_incoming(populate_empty=False)
+ probe_value = self.incoming_probe_value(populate_empty=False)
if probe_value:
self.lightfuzz.debug(
f"The Serialization Submodule only operates when there if no original value, aborting [{self.event.data['type']}] [{self.event.data['name']}]"
diff --git a/bbot/modules/lightfuzz_submodules/sqli.py b/bbot/modules/lightfuzz_submodules/sqli.py
index cb264c7b8b..1f7d677cce 100644
--- a/bbot/modules/lightfuzz_submodules/sqli.py
+++ b/bbot/modules/lightfuzz_submodules/sqli.py
@@ -38,9 +38,8 @@ def evaluate_delay(self, mean_baseline, measured_delay):
return False
async def fuzz(self):
-
cookies = self.event.data.get("assigned_cookies", {})
- probe_value = self.probe_value_incoming(populate_empty=True)
+ probe_value = self.incoming_probe_value(populate_empty=True)
http_compare = self.compare_baseline(
self.event.data["type"], probe_value, cookies, additional_params_populate_empty=True
)
diff --git a/bbot/test/test_step_1/test_helpers.py b/bbot/test/test_step_1/test_helpers.py
index 2eb67cd13d..329994c748 100644
--- a/bbot/test/test_step_1/test_helpers.py
+++ b/bbot/test/test_step_1/test_helpers.py
@@ -460,6 +460,13 @@ async def test_helpers_misc(helpers, scan, bbot_scanner, bbot_httpserver):
s = "asdf {unused} {used}"
assert helpers.safe_format(s, used="fdsa") == "asdf {unused} fdsa"
+ # is_printable
+ assert helpers.is_printable("asdf") is True
+ assert helpers.is_printable(r"""~!@#$^&*()_+=-<>:"?,./;'[]\{}|""") is True
+ assert helpers.is_printable("ドメイン.テスト") is True
+ assert helpers.is_printable("4") is True
+ assert helpers.is_printable("asdf\x00") is False
+
# punycode
assert helpers.smart_encode_punycode("ドメイン.テスト") == "xn--eckwd4c7c.xn--zckzah"
assert helpers.smart_decode_punycode("xn--eckwd4c7c.xn--zckzah") == "ドメイン.テスト"
diff --git a/bbot/test/test_step_1/test_web_envelopes.py b/bbot/test/test_step_1/test_web_envelopes.py
new file mode 100644
index 0000000000..79da9e829e
--- /dev/null
+++ b/bbot/test/test_step_1/test_web_envelopes.py
@@ -0,0 +1,339 @@
+import pytest
+
+
+async def test_web_envelopes():
+ from bbot.core.helpers.web.envelopes import (
+ BaseEnvelope,
+ TextEnvelope,
+ HexEnvelope,
+ B64Envelope,
+ JSONEnvelope,
+ XMLEnvelope,
+ URLEnvelope,
+ )
+
+ # simple text
+ text_envelope = BaseEnvelope.detect("foo")
+ assert isinstance(text_envelope, TextEnvelope)
+ assert text_envelope.unpacked_data() == "foo"
+ assert text_envelope.subparams == {"__default__": "foo"}
+ expected_subparams = [([], "foo")]
+ assert list(text_envelope.get_subparams()) == expected_subparams
+ for subparam, value in expected_subparams:
+ assert text_envelope.get_subparam(subparam) == value
+ assert text_envelope.pack() == "foo"
+ assert text_envelope.num_envelopes == 0
+ assert text_envelope.get_subparam() == "foo"
+ text_envelope.set_subparam(value="bar")
+ assert text_envelope.get_subparam() == "bar"
+ assert text_envelope.unpacked_data() == "bar"
+
+ # simple binary
+ # binary_envelope = BaseEnvelope.detect("foo\x00")
+ # assert isinstance(binary_envelope, BinaryEnvelope)
+ # assert binary_envelope.unpacked_data == "foo\x00"
+ # assert binary_envelope.packed_data == "foo\x00"
+ # assert binary_envelope.subparams == {"__default__": "foo\x00"}
+
+ # text encoded as hex
+ hex_envelope = BaseEnvelope.detect("706172616d")
+ assert isinstance(hex_envelope, HexEnvelope)
+ assert hex_envelope.unpacked_data(recursive=True) == "param"
+ hex_inner_envelope = hex_envelope.unpacked_data(recursive=False)
+ assert isinstance(hex_inner_envelope, TextEnvelope)
+ assert hex_inner_envelope.unpacked_data(recursive=False) == "param"
+ assert hex_inner_envelope.unpacked_data(recursive=True) == "param"
+ assert list(hex_envelope.get_subparams(recursive=False)) == [([], hex_inner_envelope)]
+ assert list(hex_envelope.get_subparams(recursive=True)) == [([], "param")]
+ assert hex_inner_envelope.unpacked_data() == "param"
+ assert hex_inner_envelope.subparams == {"__default__": "param"}
+ expected_subparams = [([], "param")]
+ assert list(hex_inner_envelope.get_subparams()) == expected_subparams
+ for subparam, value in expected_subparams:
+ assert hex_inner_envelope.get_subparam(subparam) == value
+ assert hex_envelope.pack() == "706172616d"
+ assert hex_envelope.num_envelopes == 1
+ assert hex_envelope.get_subparam() == "param"
+ hex_envelope.set_subparam(value="asdf")
+ assert hex_envelope.get_subparam() == "asdf"
+ assert hex_envelope.unpacked_data() == "asdf"
+ assert hex_envelope.pack() == "61736466"
+
+ # text encoded as base64
+ base64_envelope = BaseEnvelope.detect("cGFyYW0=")
+ assert isinstance(base64_envelope, B64Envelope)
+ assert base64_envelope.unpacked_data() == "param"
+ base64_inner_envelope = base64_envelope.unpacked_data(recursive=False)
+ assert isinstance(base64_inner_envelope, TextEnvelope)
+ assert list(base64_envelope.get_subparams(recursive=False)) == [([], base64_inner_envelope)]
+ assert list(base64_envelope.get_subparams()) == [([], "param")]
+ assert base64_inner_envelope.pack() == "param"
+ assert base64_inner_envelope.unpacked_data() == "param"
+ assert base64_inner_envelope.subparams == {"__default__": "param"}
+ expected_subparams = [([], "param")]
+ assert list(base64_inner_envelope.get_subparams()) == expected_subparams
+ for subparam, value in expected_subparams:
+ assert base64_inner_envelope.get_subparam(subparam) == value
+ assert base64_envelope.num_envelopes == 1
+ base64_envelope.set_subparam(value="asdf")
+ assert base64_envelope.get_subparam() == "asdf"
+ assert base64_envelope.unpacked_data() == "asdf"
+ assert base64_envelope.pack() == "YXNkZg=="
+
+ # test inside hex inside base64
+ hex_envelope = BaseEnvelope.detect("634746795957303d")
+ assert isinstance(hex_envelope, HexEnvelope)
+ assert hex_envelope.get_subparam() == "param"
+ assert hex_envelope.unpacked_data() == "param"
+ base64_envelope = hex_envelope.unpacked_data(recursive=False)
+ assert isinstance(base64_envelope, B64Envelope)
+ assert base64_envelope.get_subparam() == "param"
+ assert base64_envelope.unpacked_data() == "param"
+ text_envelope = base64_envelope.unpacked_data(recursive=False)
+ assert isinstance(text_envelope, TextEnvelope)
+ assert text_envelope.get_subparam() == "param"
+ assert text_envelope.unpacked_data() == "param"
+ hex_envelope.set_subparam(value="asdf")
+ assert hex_envelope.get_subparam() == "asdf"
+ assert hex_envelope.unpacked_data() == "asdf"
+ assert text_envelope.get_subparam() == "asdf"
+ assert text_envelope.unpacked_data() == "asdf"
+ assert base64_envelope.get_subparam() == "asdf"
+ assert base64_envelope.unpacked_data() == "asdf"
+
+ # URL-encoded text
+ url_encoded_envelope = BaseEnvelope.detect("a%20b%20c")
+ assert isinstance(url_encoded_envelope, URLEnvelope)
+ assert url_encoded_envelope.pack() == "a%20b%20c"
+ assert url_encoded_envelope.unpacked_data() == "a b c"
+ url_inner_envelope = url_encoded_envelope.unpacked_data(recursive=False)
+ assert isinstance(url_inner_envelope, TextEnvelope)
+ assert url_inner_envelope.unpacked_data(recursive=False) == "a b c"
+ assert url_inner_envelope.unpacked_data(recursive=True) == "a b c"
+ assert list(url_encoded_envelope.get_subparams(recursive=False)) == [([], url_inner_envelope)]
+ assert list(url_encoded_envelope.get_subparams(recursive=True)) == [([], "a b c")]
+ assert url_inner_envelope.pack() == "a b c"
+ assert url_inner_envelope.unpacked_data() == "a b c"
+ assert url_inner_envelope.subparams == {"__default__": "a b c"}
+ expected_subparams = [([], "a b c")]
+ assert list(url_inner_envelope.get_subparams()) == expected_subparams
+ for subparam, value in expected_subparams:
+ assert url_inner_envelope.get_subparam(subparam) == value
+ assert url_encoded_envelope.num_envelopes == 1
+ url_encoded_envelope.set_subparam(value="a s d f")
+ assert url_encoded_envelope.get_subparam() == "a s d f"
+ assert url_encoded_envelope.unpacked_data() == "a s d f"
+ assert url_encoded_envelope.pack() == "a%20s%20d%20f"
+
+ # json
+ json_envelope = BaseEnvelope.detect('{"param1": "val1", "param2": {"param3": "val3"}}')
+ assert isinstance(json_envelope, JSONEnvelope)
+ assert json_envelope.pack() == '{"param1": "val1", "param2": {"param3": "val3"}}'
+ assert json_envelope.unpacked_data() == {"param1": "val1", "param2": {"param3": "val3"}}
+ assert json_envelope.unpacked_data(recursive=False) == {"param1": "val1", "param2": {"param3": "val3"}}
+ assert json_envelope.unpacked_data(recursive=True) == {"param1": "val1", "param2": {"param3": "val3"}}
+ assert json_envelope.subparams == {"param1": "val1", "param2": {"param3": "val3"}}
+ expected_subparams = [
+ (["param1"], "val1"),
+ (["param2", "param3"], "val3"),
+ ]
+ assert list(json_envelope.get_subparams()) == expected_subparams
+ for subparam, value in expected_subparams:
+ assert json_envelope.get_subparam(subparam) == value
+ json_envelope.selected_subparam = ["param2", "param3"]
+ assert json_envelope.get_subparam() == "val3"
+ assert json_envelope.num_envelopes == 1
+
+ # xml
+ xml_envelope = BaseEnvelope.detect(
+ '
test
", status=200, @@ -1370,7 +1364,6 @@ async def setup_before_prep(self, module_test): ) def check(self, module_test, events): - found_first_cookie = False found_second_cookie = False found_third_cookie = False diff --git a/bbot/test/test_step_2/module_tests/test_module_hunt.py b/bbot/test/test_step_2/module_tests/test_module_hunt.py index 0ce8e93537..867a2565c6 100644 --- a/bbot/test/test_step_2/module_tests/test_module_hunt.py +++ b/bbot/test/test_step_2/module_tests/test_module_hunt.py @@ -23,7 +23,6 @@ def check(self, module_test, events): class TestHunt_Multiple(TestHunt): - async def setup_after_prep(self, module_test): expect_args = {"method": "GET", "uri": "/"} respond_args = {"response_data": 'ping'} diff --git a/bbot/test/test_step_2/module_tests/test_module_lightfuzz.py b/bbot/test/test_step_2/module_tests/test_module_lightfuzz.py index a74ea74f56..4ff3a478c2 100644 --- a/bbot/test/test_step_2/module_tests/test_module_lightfuzz.py +++ b/bbot/test/test_step_2/module_tests/test_module_lightfuzz.py @@ -34,7 +34,6 @@ async def setup_after_prep(self, module_test): module_test.set_expect_requests(expect_args=expect_args, respond_args=respond_args) def request_handler(self, request): - qs = str(request.query_string.decode()) if "filename=" in qs: value = qs.split("=")[1] @@ -52,11 +51,9 @@ def request_handler(self, request): return Response("file not found", status=500) def check(self, module_test, events): - web_parameter_emitted = False pathtraversal_finding_emitted = False for e in events: - if e.type == "WEB_PARAMETER": if "HTTP Extracted Parameter [filename]" in e.data["description"]: web_parameter_emitted = True @@ -74,7 +71,6 @@ def check(self, module_test, events): # Path Traversal Absolute path class Test_Lightfuzz_path_absolute(Test_Lightfuzz_path_singledot): - etc_passwd = """ root:x:0:0:root:/root:/bin/bash daemon:x:1:1:daemon:/usr/sbin:/usr/sbin/nologin @@ -87,7 +83,6 @@ class Test_Lightfuzz_path_absolute(Test_Lightfuzz_path_singledot): """ async def setup_after_prep(self, module_test): - expect_args = {"method": "GET", "uri": "/images", "query_string": "filename=/etc/passwd"} respond_args = {"response_data": self.etc_passwd} module_test.set_expect_requests(expect_args=expect_args, respond_args=respond_args) @@ -104,7 +99,6 @@ async def setup_after_prep(self, module_test): module_test.set_expect_requests(expect_args=expect_args, respond_args=respond_args) def check(self, module_test, events): - web_parameter_emitted = False pathtraversal_finding_emitted = False for e in events: @@ -156,7 +150,6 @@ async def setup_after_prep(self, module_test): module_test.set_expect_requests_handler(expect_args=expect_args, request_handler=self.request_handler) def check(self, module_test, events): - web_parameter_emitted = False ssti_finding_emitted = False for e in events: @@ -189,7 +182,6 @@ class Test_Lightfuzz_xss(ModuleTestBase): } def request_handler(self, request): - qs = str(request.query_string.decode()) parameter_block = """ @@ -219,7 +211,6 @@ async def setup_after_prep(self, module_test): module_test.set_expect_requests_handler(expect_args=expect_args, request_handler=self.request_handler) def check(self, module_test, events): - web_parameter_emitted = False xss_finding_emitted = False for e in events: @@ -238,12 +229,8 @@ def check(self, module_test, events): # Base64 Envelope XSS Detection class Test_Lightfuzz_envelope_base64(Test_Lightfuzz_xss): def request_handler(self, request): - qs = str(request.query_string.decode()) - print("****") - print(qs) - parameter_block = """