From 0fef49ca5c3f10bdcb0f845dee3b8d34b7e275a8 Mon Sep 17 00:00:00 2001 From: github-actions Date: Wed, 27 Nov 2024 18:09:13 -0500 Subject: [PATCH 01/13] tests passing --- bbot/core/event/base.py | 407 ++---------------- bbot/core/helpers/misc.py | 12 + bbot/core/helpers/web/client.py | 1 - bbot/core/helpers/web/engine.py | 2 - bbot/core/helpers/web/envelopes.py | 352 +++++++++++++++ bbot/modules/base.py | 5 +- bbot/modules/internal/cloudcheck.py | 4 +- bbot/modules/internal/dnsresolve.py | 4 +- bbot/modules/internal/excavate.py | 23 +- bbot/modules/lightfuzz.py | 12 +- bbot/modules/lightfuzz_submodules/base.py | 38 +- bbot/modules/lightfuzz_submodules/cmdi.py | 3 - bbot/modules/lightfuzz_submodules/crypto.py | 9 +- bbot/modules/lightfuzz_submodules/path.py | 1 - bbot/modules/lightfuzz_submodules/sqli.py | 1 - bbot/modules/report/asn.py | 9 +- bbot/scanner/preset/args.py | 4 +- bbot/scanner/preset/preset.py | 2 +- bbot/test/test_step_1/test__module__tests.py | 1 - bbot/test/test_step_1/test_bbot_fastapi.py | 3 - bbot/test/test_step_1/test_bloom_filter.py | 1 - bbot/test/test_step_1/test_dns.py | 3 - bbot/test/test_step_1/test_engine.py | 2 - bbot/test/test_step_1/test_events.py | 2 - bbot/test/test_step_1/test_helpers.py | 1 - bbot/test/test_step_1/test_presets.py | 7 +- bbot/test/test_step_1/test_target.py | 1 - bbot/test/test_step_1/test_web.py | 2 - bbot/test/test_step_1/test_web_envelopes.py | 339 +++++++++++++++ .../module_tests/test_module_baddns_direct.py | 6 +- .../module_tests/test_module_excavate.py | 11 - .../module_tests/test_module_gowitness.py | 4 +- .../module_tests/test_module_hunt.py | 1 - .../module_tests/test_module_lightfuzz.py | 322 ++++++-------- .../module_tests/test_module_ntlm.py | 3 +- .../test_module_reflected_parameters.py | 2 - .../module_tests/test_module_speculate.py | 4 +- 37 files changed, 930 insertions(+), 674 deletions(-) create mode 100644 bbot/core/helpers/web/envelopes.py create mode 100644 bbot/test/test_step_1/test_web_envelopes.py diff --git a/bbot/core/event/base.py b/bbot/core/event/base.py index d3cf2c2ba4..fc2a35ea2d 100644 --- a/bbot/core/event/base.py +++ b/bbot/core/event/base.py @@ -1,24 +1,20 @@ -from collections import defaultdict import io import re import uuid import json import base64 -import asyncio import logging import tarfile -import binascii import datetime import ipaddress import traceback -import xml.etree.ElementTree as ET -from copy import copy, deepcopy from pathlib import Path from typing import Optional +from copy import copy, deepcopy from contextlib import suppress from radixtarget import RadixTarget -from urllib.parse import urljoin, parse_qs, unquote, quote +from urllib.parse import urljoin, parse_qs from pydantic import BaseModel, field_validator @@ -44,6 +40,7 @@ validators, get_file_extension, ) +from bbot.core.helpers.web.envelopes import BaseEnvelope log = logging.getLogger("bbot.core.event") @@ -593,6 +590,10 @@ def parent(self, parent): elif not self._dummy: log.warning(f"Tried to set invalid parent on {self}: (got: {parent})") + @property + def children(self): + return [] + @property def parent_id(self): parent_id = getattr(self.get_parent(), "id", None) @@ -648,15 +649,10 @@ def get_parents(self, omit=False, include_self=False): return parents def clone(self): - # Create a shallow copy of the event first cloned_event = copy(self) - - # Handle attributes that need deep copying manually - setattr(cloned_event, "envelopes", deepcopy(self.envelopes)) - # Re-assign a new UUID - cloned_event.uuid = uuid.uuid4() + cloned_event._uuid = uuid.uuid4() return cloned_event def _host(self): @@ -1329,382 +1325,39 @@ class URL_HINT(URL_UNVERIFIED): class WEB_PARAMETER(DictHostEvent): - @property - def uuid(self): - return self._uuid - - @uuid.setter - def uuid(self, value): - self._uuid = value - - class ParameterEnvelopes: - - @staticmethod - def preprocess_base64(base64_str): - return base64.b64decode(base64_str).decode() - - @staticmethod - def postprocess_base64(string): - return base64.b64encode(string.encode()).decode() - - @staticmethod - def preprocess_hex(hex_str): - return bytes.fromhex(hex_str).decode() - - @staticmethod - def postprocess_hex(string): - return string.encode().hex() - - @staticmethod - def preprocess_urlencoded(url_encoded_str): - return unquote(url_encoded_str) - - @staticmethod - def postprocess_urlencoded(string): - return quote(string) - - @staticmethod - def is_ascii_printable(s): - return all(32 <= ord(char) < 127 for char in s) - - # Converts XML ElementTree to a JSON-like dictionary - def xml_to_dict(self, elem): - """ - Convert XML ElementTree to a dictionary recursively. - """ - d = {elem.tag: {} if elem.attrib else None} - children = list(elem) - if children: - dd = defaultdict(list) - for dc in map(self.xml_to_dict, children): - for k, v in dc.items(): - dd[k].append(v) - d = {elem.tag: {k: v[0] if len(v) == 1 else v for k, v in dd.items()}} - if elem.attrib: - d[elem.tag].update(("@" + k, v) for k, v in elem.attrib.items()) - if elem.text: - text = elem.text.strip() - if children or elem.attrib: - if text: - d[elem.tag]["#text"] = text - else: - d[elem.tag] = text - return d - - def dict_to_xml(self, d): - """ - Converts a dictionary to an XML string without adding an extra root node. - Assumes the dictionary was originally an XML structure. - """ - if not isinstance(d, dict) or len(d) != 1: - raise ValueError("Expected a dictionary with a single root element.") - - # Get the root element directly from the dict keys - root_tag = list(d.keys())[0] - root_element = ET.Element(root_tag) - - # Recursive function to handle nested dicts - def _build_tree(element, subdict): - for key, value in subdict.items(): - if isinstance(value, dict): - # Nested element - sub_element = ET.SubElement(element, key) - _build_tree(sub_element, value) - else: - # Leaf element - sub_element = ET.SubElement(element, key) - sub_element.text = str(value) - - # Start building the tree - _build_tree(root_element, d[root_tag]) - - return ET.tostring(root_element, encoding="utf-8").decode("utf-8") - - preprocess_map = { - "base64": preprocess_base64, - "hex": preprocess_hex, - "url-encoded": preprocess_urlencoded, - } - postprocess_map = { - "base64": postprocess_base64, - "hex": postprocess_hex, - "url-encoded": postprocess_urlencoded, - } - - # Format-specific functions for isolating and updating parameters - format_isolate_map = { - "json": lambda self: self.isolate_parameter(), - "xml": lambda self: self.isolate_parameter(), - } - format_update_map = { - "json": lambda self, value: self.update_json_parameter(value), - "xml": lambda self, value: self.update_xml_parameter(value), # Placeholder - } - - def initialize_value(self, value=None): - self.envelopes, end_format_dict = self.recurse_envelopes(value) - if self.envelopes: - log.debug(f"Discovered the following envelopes: [{','.join(self.envelopes)}]") - - if end_format_dict is not None: - self.end_format_type = list(end_format_dict.keys())[0] - log.debug(f"Identified the following end format: [{self.end_format_type}]") - self.end_format_data = list(end_format_dict.values())[0] - else: - self.end_format_type = None - self.end_format_data = None - self.end_format_subparameter = None - - def remove_envelopes(self, value): - """ - Remove envelopes from the value, processing each envelope in the order it was applied. - If the final format is present, trigger the appropriate handler (e.g., for JSON). - """ - # Apply the preprocess functions in the order the envelopes were applied - for env in self.envelopes: - func = self.preprocess_map.get(env) - if func: - # python3.9 compatibility hack - if isinstance(func, staticmethod): - func = func.__get__(None, self.__class__) # Unwrap staticmethod - value = func(value) - - # Dynamically select the appropriate isolate function based on the final format - isolate_func = self.format_isolate_map.get(self.end_format_type) - if isolate_func: - return isolate_func(self) - return value - - def add_envelopes(self, value): - """ - Add envelopes back to the value, processing in reverse order. - If the final format is present, trigger the appropriate handler (e.g., for JSON). - """ - # Dynamically select the appropriate update function based on the final format - update_func = self.format_update_map.get(self.end_format_type) - if update_func: - # python3.9 compatibility hack - if isinstance(update_func, staticmethod): - update_func = update_func.__get__(None, self.__class__) - value = update_func(self, value) - - # Apply the envelopes in reverse order - for env in self.envelopes[::-1]: - func = self.postprocess_map.get(env) - if func: - # python3.9 compatibility hack - if isinstance(func, staticmethod): - func = func.__get__(None, self.__class__) - value = func(value) - return value - - def recurse_envelopes(self, value, envelopes=None, end_format=None): - if envelopes is None: - envelopes = [] - log.debug( - f"Starting envelope recurse with value: [{value}], current envelopes: [{', '.join(envelopes)}], current end format: {end_format}" - ) - - if value is None or value == "" or isinstance(value, int): - return envelopes, end_format - - # Try URL decoding - try: - decoded_url = unquote(value) - if decoded_url != value and self.is_ascii_printable(decoded_url): - envelopes.append("url-encoded") - envelopes, end_format_dict = self.recurse_envelopes(decoded_url, envelopes) - return envelopes, end_format_dict - except Exception: - pass # Not valid URL encoding - - # Try base64 decoding - try: - decoded_base64 = base64.b64decode(value).decode() - if self.is_ascii_printable(decoded_base64): - envelopes.append("base64") - envelopes, end_format_dict = self.recurse_envelopes(decoded_base64, envelopes) - return envelopes, end_format_dict - except (binascii.Error, UnicodeDecodeError, ValueError): - pass # Not valid base64 - - # Try hex decoding - try: - decoded_hex = bytes.fromhex(value).decode("utf-8") - if self.is_ascii_printable(decoded_hex): - envelopes.append("hex") - envelopes, end_format_dict = self.recurse_envelopes(decoded_hex, envelopes) - return envelopes, end_format_dict - except (ValueError, UnicodeDecodeError): - pass # Not valid hex - - # Try JSON parsing - try: - decoded_json = json.loads(value) - if isinstance(decoded_json, dict): - return envelopes, {"json": decoded_json} - except json.JSONDecodeError: - pass # Not valid JSON - - # Try XML parsing - try: - decoded_xml = ET.fromstring(value) - # Pass 'decoded_xml' to 'xml_to_dict' - xml_dict = self.xml_to_dict(decoded_xml) # Pass decoded XML as the 'elem' argument - return envelopes, {"xml": xml_dict} # Store as JSON-like dict, not XML - except ET.ParseError: - pass # Not valid XML - - return envelopes, end_format - - def isolate_parameter(self): - """ - Isolate the specified subparameter from the data structure (JSON/XML). - The subparameter is accessed using dot notation for nested keys. - """ - if self.end_format_data and self.end_format_subparameter: - # Split the dot notation string into keys - keys = self.end_format_subparameter.split(".") - - # Traverse the nested structure using the keys - subparameter_value = self.end_format_data - for key in keys: - if isinstance(subparameter_value, dict): - subparameter_value = subparameter_value.get(key) - else: - # If the structure is broken (not a dict), return None - return None - - return subparameter_value - - return None - - def update_json_parameter(self, new_value): - """ - Update the specified subparameter in the JSON structure and rebuild it. - """ - # Work with a copy to avoid modifying the original `end_format_data` - end_format_data_copy = deepcopy(self.end_format_data) - - if end_format_data_copy: - end_format_data_copy[self.end_format_subparameter] = new_value - return json.dumps(end_format_data_copy) - return new_value - - def update_xml_parameter(self, new_value): - """ - Convert the JSON-like structure back into an XML string after updating the specific parameter. - """ - if self.end_format_data and self.end_format_subparameter: - # Split the dot notation into keys - keys = self.end_format_subparameter.split(".") - - # Traverse the nested dictionary using the keys to find the target subparameter - current_data = self.end_format_data - for key in keys[:-1]: # Traverse up to the second-to-last key - current_data = current_data.get(key, {}) - - # Update the target subparameter with the new value - if isinstance(current_data, dict): - current_data[keys[-1]] = new_value - - # Convert the JSON-like dict back to an XML string - return self.dict_to_xml(self.end_format_data) - - return new_value - - def to_dict(self): - return { - "envelopes": self.envelopes, - "end_format_type": self.end_format_type, - "end_format_data": self.end_format_data, - "end_format_subparameter": self.end_format_subparameter, - } - - def __getstate__(self): - return self.to_dict() - - def __str__(self): - return f"ParameterEnvelopes(envelopes={self.envelopes}, end_format_type={self.end_format_type}, end_format_data={self.end_format_data}, end_format_subparameter={self.end_format_subparameter})" - - __repr__ = __str__ - - @classmethod - def from_dict(cls, data): - instance = cls() - instance.envelopes = data.get("envelopes", []) - instance.end_format_type = data.get("end_format_type") - instance.end_format_data = data.get("end_format_data") - instance.end_format_subparameter = data.get("end_format_subparameter") - return instance + def children(self): + # if we have any subparams, raise a new WEB_PARAMETER for each one + children = [] + envelopes = getattr(self, "envelopes", None) + if envelopes is not None: + subparams = list(self.envelopes.get_subparams()) + + if envelopes.selected_subparam is None: + for subparam, _ in subparams: + clone = self.clone() + clone.envelopes = deepcopy(envelopes) + clone.envelopes.selected_subparam = subparam + clone.parent = self + children.append(clone) + return children def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - if "original_value" in self.data.keys(): - parameterEnvelope_instance = self.ParameterEnvelopes() - parameterEnvelope_instance.initialize_value(self.data["original_value"]) - setattr(self, "envelopes", parameterEnvelope_instance) - - envelopes = getattr(self, "envelopes", None) - if ( - envelopes is not None - and getattr(envelopes, "end_format_type", None) is not None - and getattr(envelopes, "end_format_data", None) - ): - end_format_data = envelopes.end_format_data - - def extract_keys_with_values(data, parent_key=""): - """ - Recursively extract all keys from nested dictionaries that have values (non-empty). - Construct a path-like structure with dot notation (e.g., 'find.search'). - """ - keys = [] - if isinstance(data, dict): - for key, value in data.items(): - # Construct the full key path using dot notation - full_key = f"{parent_key}.{key}" if parent_key else key - - # Only add keys that have non-empty values - if value: - if isinstance(value, dict): - # Recursively check nested dictionaries - keys.extend(extract_keys_with_values(value, full_key)) - else: - # Add the key if it has a non-empty value - keys.append(full_key) - return keys - - # Extract all keys that have non-empty values - end_format_data_keys = extract_keys_with_values(end_format_data) - # If there are keys, assign the first key to end_format_subparameter - if end_format_data_keys: - - # Assign the first key to end_format_subparameter - setattr(envelopes, "end_format_subparameter", end_format_data_keys[0]) - setattr(envelopes, "end_format_subparameter", end_format_data_keys[0]) - - # Iterate through the remaining keys, starting from the second one - for p in end_format_data_keys[1:]: - log.debug(f"generating copy of event for subparameter {p} of type {envelopes.end_format_type}") - - # Make a copy of the current event data - cloned_event = self.clone() - cloned_envelopes = getattr(cloned_event, "envelopes") - cloned_envelopes.end_format_subparameter = p - asyncio.run_coroutine_threadsafe( - self.module.emit_event(cloned_event), asyncio.get_event_loop() - ) + original_value = self.data.get("original_value", None) + if original_value is not None: + envelopes = BaseEnvelope.detect(original_value) + setattr(self, "envelopes", envelopes) def _data_id(self): # dedupe by url:name:param_type url = self.data.get("url", "") name = self.data.get("name", "") param_type = self.data.get("type", "") - envelopes = getattr(self, "envelopes", None) - subparameter = getattr(envelopes, "end_format_subparameter", "") if envelopes else "" + envelopes = getattr(self, "envelopes", "") + subparam = getattr(envelopes, "selected_subparam", "") - return f"{url}:{name}:{param_type}:{subparameter}" + return f"{url}:{name}:{param_type}:{subparam}" def _outgoing_dedup_hash(self, event): return hash( diff --git a/bbot/core/helpers/misc.py b/bbot/core/helpers/misc.py index 7c6c8a0738..73bd45f305 100644 --- a/bbot/core/helpers/misc.py +++ b/bbot/core/helpers/misc.py @@ -2867,3 +2867,15 @@ def clean_requirement(req_string): dist = distribution("bbot") return [clean_requirement(r) for r in dist.requires] + + +printable_chars = set(string.printable) + + +def is_printable(s): + """ + Check if a string is printable + """ + if not isinstance(s, str): + raise ValueError(f"Expected a string, got {type(s)}") + return set(s) <= printable_chars diff --git a/bbot/core/helpers/web/client.py b/bbot/core/helpers/web/client.py index 83154e5aec..49ddf532be 100644 --- a/bbot/core/helpers/web/client.py +++ b/bbot/core/helpers/web/client.py @@ -85,7 +85,6 @@ def __init__(self, *args, **kwargs): self._cookies = DummyCookies() def build_request(self, *args, **kwargs): - if args: url = args[0] kwargs["url"] = url diff --git a/bbot/core/helpers/web/engine.py b/bbot/core/helpers/web/engine.py index e0f63cb052..8ffdbe966f 100644 --- a/bbot/core/helpers/web/engine.py +++ b/bbot/core/helpers/web/engine.py @@ -50,7 +50,6 @@ def AsyncClient(self, *args, **kwargs): return client async def request(self, *args, **kwargs): - raise_error = kwargs.pop("raise_error", False) # TODO: use this cache_for = kwargs.pop("cache_for", None) # noqa @@ -75,7 +74,6 @@ async def request(self, *args, **kwargs): client_kwargs = {} for k in list(kwargs): if k in self.client_only_options: - v = kwargs.pop(k) client_kwargs[k] = v diff --git a/bbot/core/helpers/web/envelopes.py b/bbot/core/helpers/web/envelopes.py new file mode 100644 index 0000000000..e84be51ba5 --- /dev/null +++ b/bbot/core/helpers/web/envelopes.py @@ -0,0 +1,352 @@ +import json +import base64 +import binascii +import xmltodict +from contextlib import suppress +from urllib.parse import unquote, quote +from xml.parsers.expat import ExpatError + +from bbot.core.helpers.misc import is_printable + + +# TODO: This logic is perfect for extracting params. We should expand it outwards to other higher-level envelopes: +# - QueryStringEnvelope +# - MultipartFormEnvelope +# - HeaderEnvelope +# - CookieEnvelope +# +# Once we start ingesting HTTP_REQUEST events, this will make them instantly fuzzable + + +class EnvelopeChildTracker(type): + """ + Keeps track of all the child envelope classes + """ + + children = [] + + def __new__(mcs, name, bases, class_dict): + # Create the class + cls = super().__new__(mcs, name, bases, class_dict) + # Don't register the base class itself + if bases and not name.startswith("Base"): # Only register if it has base classes (i.e., is a child) + EnvelopeChildTracker.children.append(cls) + EnvelopeChildTracker.children.sort(key=lambda x: x.priority) + return cls + + +class BaseEnvelope(metaclass=EnvelopeChildTracker): + __slots__ = ["subparams", "selected_subparam", "singleton"] + + # determines the order of the envelope detection + priority = 5 + # whether the envelope is the final format, e.g. raw text/binary + end_format = False + ignore_exceptions = (Exception,) + envelope_classes = EnvelopeChildTracker.children + # transparent envelopes (i.e. TextEnvelope) are not counted as envelopes or included in the finding descriptions + transparent = False + + def __init__(self, s): + unpacked_data = self.unpack(s) + + if self.end_format: + inner_envelope = unpacked_data + else: + inner_envelope = self.detect(unpacked_data) + + self.selected_subparam = None + # if we have subparams, our inner envelope will be a dictionary + if isinstance(inner_envelope, dict): + self.subparams = inner_envelope + self.singleton = False + # otherwise if we just have one value, we make a dictionary with a default key + else: + self.subparams = {"__default__": inner_envelope} + self.singleton = True + + @property + def final_envelope(self): + try: + return self.unpacked_data(recursive=False).final_envelope + except AttributeError: + return self + + def pack(self, data=None): + if data is None: + data = self.unpacked_data(recursive=False) + with suppress(AttributeError): + data = data.pack() + return self._pack(data) + + def unpack(self, s): + return self._unpack(s) + + def _pack(self, s): + """ + Encodes the string using the class's unique encoder (adds the outer envelope) + """ + raise NotImplementedError("Envelope.pack() must be implemented") + + def _unpack(self, s): + """ + Decodes the string using the class's unique encoder (removes the outer envelope) + """ + raise NotImplementedError("Envelope.unpack() must be implemented") + + def unpacked_data(self, recursive=True): + try: + unpacked = self.subparams["__default__"] + if recursive: + with suppress(AttributeError): + return unpacked.unpacked_data(recursive=recursive) + return unpacked + except KeyError: + return self.subparams + + @classmethod + def detect(cls, s): + """ + Detects the type of envelope used to encode the packed_data + """ + if not isinstance(s, str): + raise ValueError(f"Invalid data passed to detect(): {s} ({type(s)})") + # if the value is empty, we just return the text envelope + if not s.strip(): + return TextEnvelope(s) + for envelope_class in cls.envelope_classes: + with suppress(*envelope_class.ignore_exceptions): + envelope = envelope_class(s) + if envelope is not False: + return envelope + del envelope + raise Exception("No envelope detected") + + def get_subparams(self, key=None, data=None, recursive=True): + if data is None: + data = self.unpacked_data(recursive=recursive) + if key is None: + key = [] + + if isinstance(data, dict): + for k, v in data.items(): + full_key = key + [k] + if isinstance(v, dict): + yield from self.get_subparams(full_key, v) + else: + yield full_key, v + else: + yield [], data + + def get_subparam(self, key=None, recursive=True): + if key is None: + key = self.selected_subparam + envelope = self + if recursive: + envelope = self.final_envelope + data = envelope.unpacked_data(recursive=False) + if key is None: + if envelope.singleton: + key = [] + else: + raise ValueError("No subparam selected") + else: + for segment in key: + data = data[segment] + return data + + def set_subparam(self, key=None, value=None, recursive=True): + envelope = self + if recursive: + envelope = self.final_envelope + + # if there's only one value to set, we can just set it directly + if envelope.singleton: + envelope.subparams["__default__"] = value + return + + # if key isn't specified, use the selected subparam + if key is None: + key = self.selected_subparam + if key is None: + raise ValueError(f"{self} -> {envelope}: No subparam selected") + + data = envelope.unpacked_data(recursive=False) + for segment in key[:-1]: + data = data[segment] + data[key[-1]] = value + + @property + def name(self): + return self.__class__.__name__ + + @property + def num_envelopes(self): + num_envelopes = 0 if self.transparent else 1 + if self.end_format: + return num_envelopes + for envelope in self.subparams.values(): + with suppress(AttributeError): + num_envelopes += envelope.num_envelopes + return num_envelopes + + @property + def summary(self): + if self.transparent: + return "" + self_string = f"{self.name}" + if self.selected_subparam: + self_string += f" [{self.selected_subparam}]" + with suppress(AttributeError): + child_envelope = self.unpacked_data(recursive=False) + child_summary = child_envelope.summary + if child_summary: + self_string += f" -> {child_summary}" + return self_string + + +class HexEnvelope(BaseEnvelope): + """ + Hexadecimal encoding + """ + + ignore_exceptions = (ValueError, UnicodeDecodeError) + + def _pack(self, s): + return s.encode().hex() + + def _unpack(self, s): + return bytes.fromhex(s).decode() + + +class B64Envelope(BaseEnvelope): + """ + Base64 encoding + """ + + ignore_exceptions = (binascii.Error, UnicodeDecodeError, ValueError) + + def unpack(self, s): + # it's easy to have a small value that accidentally decodes to base64 + if len(s) < 8 and not s.endswith("="): + raise ValueError("Data is too small to be sure") + return super().unpack(s) + + def _pack(self, s): + return base64.b64encode(s.encode()).decode() + + def _unpack(self, s): + return base64.b64decode(s).decode() + + +class URLEnvelope(BaseEnvelope): + """ + URL encoding + """ + + def unpack(self, s): + unpacked = super().unpack(s) + if unpacked == s: + raise Exception("Data is not URL-encoded") + return unpacked + + def _pack(self, s): + return quote(s) + + def _unpack(self, s): + return unquote(s) + + +class TextEnvelope(BaseEnvelope): + """ + Text encoding + """ + + end_format = True + # lowest priority means text is the ultimate fallback + priority = 10 + transparent = True + + def _pack(self, s): + return s + + def _unpack(self, s): + if not is_printable(s): + raise Exception("Non-printable data detected in TextEnvelope") + return s + + +# class BinaryEnvelope(BaseEnvelope): +# """ +# Binary encoding +# """ +# end_format = True + +# def pack(self, s): +# return s + +# def unpack(self, s): +# if is_printable(s): +# raise Exception("Non-binary data detected in BinaryEnvelope") +# return s + + +class BaseSubparamEnvelope(BaseEnvelope): + """ + An envelope that contains subparameters + """ + + end_format = True + + # def get_subparam(self, dot_key=None): + # if dot_key is None: + # dot_key = self.subparam + # data = self.unpacked_data + # for key in dot_key.split("."): + # data = data[key] + # return data + + # def values(self): + # """ + # Returns the values of the unpacked data + + # { + # "key1": "value1", + # "key2": "value2" + # } + # """ + # if isinstance(self.unpacked_data, dict): + # return self.unpacked_data.values() + # return [self.unpacked_data] + + +class JSONEnvelope(BaseEnvelope): + """ + JSON encoding + """ + + end_format = True + priority = 8 + ignore_exceptions = (json.JSONDecodeError,) + + def _pack(self, s): + return json.dumps(s) + + def _unpack(self, s): + return json.loads(s) + + +class XMLEnvelope(BaseEnvelope): + """ + XML encoding + """ + + end_format = True + priority = 9 + ignore_exceptions = (ExpatError,) + + def _pack(self, s): + return xmltodict.unparse(s) + + def _unpack(self, s): + return xmltodict.parse(s) diff --git a/bbot/modules/base.py b/bbot/modules/base.py index 1fa151c33b..4ab2da1528 100644 --- a/bbot/modules/base.py +++ b/bbot/modules/base.py @@ -528,8 +528,9 @@ async def emit_event(self, *args, **kwargs): if v is not None: emit_kwargs[o] = v event = self.make_event(*args, **event_kwargs) - if event: - await self.queue_outgoing_event(event, **emit_kwargs) + children = event.children + for e in [event] + children: + await self.queue_outgoing_event(e, **emit_kwargs) return event async def _events_waiting(self, batch_size=None): diff --git a/bbot/modules/internal/cloudcheck.py b/bbot/modules/internal/cloudcheck.py index 685d67f9d7..86b6130d71 100644 --- a/bbot/modules/internal/cloudcheck.py +++ b/bbot/modules/internal/cloudcheck.py @@ -57,7 +57,9 @@ async def handle_event(self, event, **kwargs): for provider in self.helpers.cloud.providers.values(): provider_name = provider.name.lower() base_kwargs = { - "parent": event, "tags": [f"{provider.provider_type}-{provider_name}"], "_provider": provider_name + "parent": event, + "tags": [f"{provider.provider_type}-{provider_name}"], + "_provider": provider_name, } # loop through the provider's regex signatures, if any for event_type, sigs in provider.signatures.items(): diff --git a/bbot/modules/internal/dnsresolve.py b/bbot/modules/internal/dnsresolve.py index 15facec564..bdca0ea5c3 100644 --- a/bbot/modules/internal/dnsresolve.py +++ b/bbot/modules/internal/dnsresolve.py @@ -306,9 +306,7 @@ def get_dns_parent(self, event): @property def emit_raw_records(self): if self._emit_raw_records is None: - watching_raw_records = any( - "RAW_DNS_RECORD" in m.get_watched_events() for m in self.scan.modules.values() - ) + watching_raw_records = any("RAW_DNS_RECORD" in m.get_watched_events() for m in self.scan.modules.values()) omitted_event_types = self.scan.config.get("omit_event_types", []) omit_raw_records = "RAW_DNS_RECORD" in omitted_event_types self._emit_raw_records = watching_raw_records or not omit_raw_records diff --git a/bbot/modules/internal/excavate.py b/bbot/modules/internal/excavate.py index d9da92a088..78b19c8575 100644 --- a/bbot/modules/internal/excavate.py +++ b/bbot/modules/internal/excavate.py @@ -460,10 +460,8 @@ def extract(self): # check to see if the format is defined as JSON if "content_type" in extracted_values.keys(): if extracted_values["content_type"] == "application/json": - # If we cant figure out the parameter names, there is no point in continuing if "data" in extracted_values.keys(): - if "url" in extracted_values.keys(): form_url = extracted_values["url"] else: @@ -481,8 +479,12 @@ def extract(self): form_parameters[p] = None for parameter_name in form_parameters: - yield "BODYJSON", parameter_name, None, form_url, _exclude_key( - form_parameters, parameter_name + yield ( + "BODYJSON", + parameter_name, + None, + form_url, + _exclude_key(form_parameters, parameter_name), ) class GetForm(ParameterExtractorRule): @@ -501,7 +503,6 @@ class GetForm(ParameterExtractorRule): def extract(self): forms = self.extraction_regex.findall(str(self.result)) for form_action, form_content in forms: - if not form_action or form_action == "#": form_action = None @@ -512,7 +513,6 @@ def extract(self): for form_content_regex_name, form_content_regex in self.form_content_regexes.items(): input_tags = form_content_regex.findall(form_content) if input_tags: - if form_content_regex_name == "input_tag_novalue_regex": form_parameters[input_tags[0]] = None @@ -525,11 +525,11 @@ def extract(self): for parameter_name, original_value in form_parameters.items(): yield ( - self.output_type, - parameter_name, - original_value, + self.output_type, + parameter_name, + original_value, form_action, - _exclude_key(form_parameters, parameter_name), + _exclude_key(form_parameters, parameter_name), ) class GetForm2(GetForm): @@ -993,6 +993,8 @@ async def setup(self): return True async def search(self, data, event, content_type, discovery_context="HTTP response"): + # TODO: replace this JSON/XML extraction with our lightfuzz envelope stuff + if not data: return None decoded_data = await self.helpers.re.recursive_decode(data) @@ -1084,7 +1086,6 @@ async def handle_event(self, event): # If parameter_extraction is enabled and we assigned custom headers, emit them as WEB_PARAMETER if self.parameter_extraction == True: - custom_cookies = self.scan.web_config.get("http_cookies", {}) for custom_cookie_name, custom_cookie_value in custom_cookies.items(): description = f"HTTP Extracted Parameter [{custom_cookie_name}] (Custom Cookie)" diff --git a/bbot/modules/lightfuzz.py b/bbot/modules/lightfuzz.py index b1fcd19b27..a28564ab83 100644 --- a/bbot/modules/lightfuzz.py +++ b/bbot/modules/lightfuzz.py @@ -130,11 +130,8 @@ async def run_submodule(self, submodule, event): event_data = {"host": str(event.host), "url": event.data["url"], "description": r["description"]} envelopes = getattr(event, "envelopes", None) - if envelopes and envelopes.envelopes: - envelope_summary = f'[{"->".join(envelopes.envelopes)}]' - if envelopes.end_format_type: - envelope_summary += f" Format: [{envelopes.end_format_type}] with subparameter [{envelopes.end_format_subparameter}])" - + envelope_summary = getattr(envelopes, "summary", None) + if envelope_summary: # Append the envelope summary to the description event_data["description"] += f" Envelopes: {envelope_summary}" @@ -147,10 +144,8 @@ async def run_submodule(self, submodule, event): ) async def handle_event(self, event): - if event.type == "URL": if self.config.get("force_common_headers", False) == False: - return False for h in self.common_headers: @@ -166,7 +161,6 @@ async def handle_event(self, event): await self.emit_event(data, "WEB_PARAMETER", event) elif event.type == "WEB_PARAMETER": - # check connectivity to url connectivity_test = await self.helpers.request(event.data["url"], timeout=10) @@ -199,5 +193,5 @@ async def finish(self): async def filter_event(self, event): if event.type == "WEB_PARAMETER" and self.disable_post and event.data["type"] == "POSTPARAM": - return False, "POST parameter disabled in lilghtfuzz module" + return False, "POST parameter disabled in lightfuzz module" return True diff --git a/bbot/modules/lightfuzz_submodules/base.py b/bbot/modules/lightfuzz_submodules/base.py index efe8f8e76b..e031d1742d 100644 --- a/bbot/modules/lightfuzz_submodules/base.py +++ b/bbot/modules/lightfuzz_submodules/base.py @@ -104,7 +104,6 @@ async def compare_probe( additional_params_override={}, speculative_mode="GETPARAM", ): - probe = self.probe_value_outgoing(probe) additional_params = copy.deepcopy(self.event.data.get("additional_params", {})) if additional_params_override: @@ -150,7 +149,6 @@ async def standard_probe( additional_params_populate_empty=False, speculative_mode="GETPARAM", ): - probe = self.probe_value_outgoing(probe) if event_type == "SPECULATIVE": @@ -209,7 +207,6 @@ async def standard_probe( ) def metadata(self): - metadata_string = f"Parameter: [{self.event.data['name']}] Parameter Type: [{self.event.data['type']}]" if self.event.data["original_value"] != "" and self.event.data["original_value"] != None: metadata_string += ( @@ -218,20 +215,31 @@ def metadata(self): return metadata_string def probe_value_incoming(self, populate_empty=True): - probe_value = self.event.data.get("original_value", "") - if (probe_value is None or len(str(probe_value)) == 0) and populate_empty == True: - probe_value = self.lightfuzz.helpers.rand_string(10, numeric_only=True) - self.lightfuzz.debug(f"probe_value_incoming (before modification): {probe_value}") - envelopes_instance = getattr(self.event, "envelopes", None) - probe_value = envelopes_instance.remove_envelopes(probe_value) - self.lightfuzz.debug(f"probe_value_incoming (after modification): {probe_value}") + envelopes = getattr(self.event, "envelopes", None) + probe_value = "" + if envelopes is not None: + probe_value = envelopes.get_subparam() + if not probe_value: + if populate_empty is True: + probe_value = self.lightfuzz.helpers.rand_string(10, numeric_only=True) + else: + probe_value = "" + self.lightfuzz.debug(f"probe_value_incoming (before unpacking): {probe_value}") + if envelopes is not None: + envelopes.set_subparam(value=probe_value) + probe_value = envelopes.pack() + self.lightfuzz.debug(f"probe_value_incoming (after unpacking): {probe_value}") if not isinstance(probe_value, str): - probe_value = str(probe_value) + raise ValueError( + f"probe_value_incoming should always be a string (got {type(probe_value)} / {probe_value})" + ) return probe_value def probe_value_outgoing(self, outgoing_probe_value): - self.lightfuzz.debug(f"probe_value_outgoing (before modification): {outgoing_probe_value}") - envelopes_instance = getattr(self.event, "envelopes", None) - outgoing_probe_value = envelopes_instance.add_envelopes(outgoing_probe_value) - self.lightfuzz.debug(f"probe_value_outgoing (after modification): {outgoing_probe_value}") + self.lightfuzz.debug(f"probe_value_outgoing (before packing): {outgoing_probe_value} / {self.event}") + envelopes = getattr(self.event, "envelopes", None) + if envelopes is not None: + envelopes.set_subparam(value=outgoing_probe_value) + outgoing_probe_value = envelopes.pack() + self.lightfuzz.debug(f"probe_value_outgoing (after packing): {outgoing_probe_value} / {self.event}") return outgoing_probe_value diff --git a/bbot/modules/lightfuzz_submodules/cmdi.py b/bbot/modules/lightfuzz_submodules/cmdi.py index b9dbb27645..450697a8d1 100644 --- a/bbot/modules/lightfuzz_submodules/cmdi.py +++ b/bbot/modules/lightfuzz_submodules/cmdi.py @@ -5,9 +5,7 @@ class CmdILightfuzz(BaseLightfuzz): - async def fuzz(self): - cookies = self.event.data.get("assigned_cookies", {}) probe_value = self.probe_value_incoming() @@ -31,7 +29,6 @@ async def fuzz(self): echo_probe = urllib.parse.quote(echo_probe.encode(), safe="") cmdi_probe = await self.compare_probe(http_compare, self.event.data["type"], echo_probe, cookies) if cmdi_probe[3]: - if canary in cmdi_probe[3].text and "echo" not in cmdi_probe[3].text: self.lightfuzz.debug(f"canary [{canary}] found in response when sending probe [{p}]") if p == "AAAA": diff --git a/bbot/modules/lightfuzz_submodules/crypto.py b/bbot/modules/lightfuzz_submodules/crypto.py index 5635774bf3..b2d44e5aac 100644 --- a/bbot/modules/lightfuzz_submodules/crypto.py +++ b/bbot/modules/lightfuzz_submodules/crypto.py @@ -6,7 +6,6 @@ class CryptoLightfuzz(BaseLightfuzz): - @staticmethod def is_hex(s): try: @@ -21,7 +20,6 @@ def is_base64(s): if base64.b64encode(base64.b64decode(s)).decode() == s: return True except Exception: - return False return False @@ -75,7 +73,6 @@ def format_agnostic_encode(data, encoding, urlencode=False): @staticmethod def modify_string(input_string, action="truncate", position=None, extension_length=1): - if not isinstance(input_string, str): input_string = str(input_string) @@ -136,7 +133,7 @@ async def padding_oracle_execute(self, original_data, encoding, block_size, cook paddingblock = b"\x00" * block_size datablock = original_data[-block_size:] if possible_first_byte: - baseline_byte = b"\xFF" + baseline_byte = b"\xff" starting_pos = 0 else: baseline_byte = b"\x00" @@ -148,7 +145,6 @@ async def padding_oracle_execute(self, original_data, encoding, block_size, cook ) differ_count = 0 for i in range(starting_pos, starting_pos + 254): - byte = bytes([i]) oracle_probe = await self.compare_probe( baseline, @@ -176,7 +172,6 @@ async def padding_oracle(self, probe_value, cookies): possible_block_sizes = self.possible_block_sizes(len(data)) for block_size in possible_block_sizes: - padding_oracle_result = await self.padding_oracle_execute(data, encoding, block_size, cookies) if padding_oracle_result == None: self.lightfuzz.debug( @@ -198,7 +193,6 @@ async def padding_oracle(self, probe_value, cookies): ) async def error_string_search(self, text_dict, baseline_text): - matching_techniques = set() matching_strings = set() @@ -311,7 +305,6 @@ async def fuzz(self): if confirmed_techniques or ( "padding" in truncate_probe[3].text.lower() or "padding" in mutate_probe[3].text.lower() ): - # Padding Oracle Test if possible_block_cipher: diff --git a/bbot/modules/lightfuzz_submodules/path.py b/bbot/modules/lightfuzz_submodules/path.py index c4043e1089..9ac9fa5c8d 100644 --- a/bbot/modules/lightfuzz_submodules/path.py +++ b/bbot/modules/lightfuzz_submodules/path.py @@ -6,7 +6,6 @@ class PathTraversalLightfuzz(BaseLightfuzz): - async def fuzz(self): cookies = self.event.data.get("assigned_cookies", {}) probe_value = self.probe_value_incoming(populate_empty=False) diff --git a/bbot/modules/lightfuzz_submodules/sqli.py b/bbot/modules/lightfuzz_submodules/sqli.py index 62853964f7..68fc9a1181 100644 --- a/bbot/modules/lightfuzz_submodules/sqli.py +++ b/bbot/modules/lightfuzz_submodules/sqli.py @@ -38,7 +38,6 @@ def evaluate_delay(self, mean_baseline, measured_delay): return False async def fuzz(self): - cookies = self.event.data.get("assigned_cookies", {}) probe_value = self.probe_value_incoming(populate_empty=True) http_compare = self.compare_baseline( diff --git a/bbot/modules/report/asn.py b/bbot/modules/report/asn.py index 771e4b4f7f..3b3c488d15 100644 --- a/bbot/modules/report/asn.py +++ b/bbot/modules/report/asn.py @@ -207,7 +207,14 @@ async def get_asn_bgpview(self, ip): return False asns_tried.add(asn) asns.append( - {"asn": asn, "subnet": subnet, "name": name, "description": description, "country": country, "emails": emails} + { + "asn": asn, + "subnet": subnet, + "name": name, + "description": description, + "country": country, + "emails": emails, + } ) if not asns: self.debug(f'No results for "{ip}"') diff --git a/bbot/scanner/preset/args.py b/bbot/scanner/preset/args.py index 8b3cb988ef..88219000ec 100644 --- a/bbot/scanner/preset/args.py +++ b/bbot/scanner/preset/args.py @@ -178,7 +178,9 @@ def preset_from_args(self): def create_parser(self, *args, **kwargs): kwargs.update( { - "description": "Bighuge BLS OSINT Tool", "formatter_class": argparse.RawTextHelpFormatter, "epilog": self.epilog + "description": "Bighuge BLS OSINT Tool", + "formatter_class": argparse.RawTextHelpFormatter, + "epilog": self.epilog, } ) p = argparse.ArgumentParser(*args, **kwargs) diff --git a/bbot/scanner/preset/preset.py b/bbot/scanner/preset/preset.py index 9e67f2c803..b275cc1f72 100644 --- a/bbot/scanner/preset/preset.py +++ b/bbot/scanner/preset/preset.py @@ -967,7 +967,7 @@ def presets_table(self, include_modules=True): header = ["Preset", "Category", "Description", "# Modules"] if include_modules: header.append("Modules") - for (loaded_preset, category, preset_path, original_file) in self.all_presets.values(): + for loaded_preset, category, preset_path, original_file in self.all_presets.values(): loaded_preset = loaded_preset.bake() num_modules = f"{len(loaded_preset.scan_modules):,}" row = [loaded_preset.name, category, loaded_preset.description, num_modules] diff --git a/bbot/test/test_step_1/test__module__tests.py b/bbot/test/test_step_1/test__module__tests.py index 791e58f58a..e50f67a910 100644 --- a/bbot/test/test_step_1/test__module__tests.py +++ b/bbot/test/test_step_1/test__module__tests.py @@ -15,7 +15,6 @@ def test__module__tests(): - preset = Preset() # make sure each module has a .py file diff --git a/bbot/test/test_step_1/test_bbot_fastapi.py b/bbot/test/test_step_1/test_bbot_fastapi.py index add7ad099a..1136963a3d 100644 --- a/bbot/test/test_step_1/test_bbot_fastapi.py +++ b/bbot/test/test_step_1/test_bbot_fastapi.py @@ -17,7 +17,6 @@ def run_bbot_multiprocess(queue): def test_bbot_multiprocess(bbot_httpserver): - bbot_httpserver.expect_request("/").respond_with_data("test@blacklanternsecurity.com") queue = multiprocessing.Queue() @@ -32,12 +31,10 @@ def test_bbot_multiprocess(bbot_httpserver): def test_bbot_fastapi(bbot_httpserver): - bbot_httpserver.expect_request("/").respond_with_data("test@blacklanternsecurity.com") fastapi_process = start_fastapi_server() try: - # wait for the server to start with a timeout of 60 seconds start_time = time.time() while True: diff --git a/bbot/test/test_step_1/test_bloom_filter.py b/bbot/test/test_step_1/test_bloom_filter.py index 22ec4db323..f954bfbc6e 100644 --- a/bbot/test/test_step_1/test_bloom_filter.py +++ b/bbot/test/test_step_1/test_bloom_filter.py @@ -6,7 +6,6 @@ @pytest.mark.asyncio async def test_bloom_filter(): - def generate_random_strings(n, length=10): """Generate a list of n random strings.""" return ["".join(random.choices(string.ascii_letters + string.digits, k=length)) for _ in range(n)] diff --git a/bbot/test/test_step_1/test_dns.py b/bbot/test/test_step_1/test_dns.py index dbbfe68d65..c032b44e48 100644 --- a/bbot/test/test_step_1/test_dns.py +++ b/bbot/test/test_step_1/test_dns.py @@ -185,7 +185,6 @@ async def test_dns_resolution(bbot_scanner): @pytest.mark.asyncio async def test_wildcards(bbot_scanner): - scan = bbot_scanner("1.1.1.1") helpers = scan.helpers @@ -634,7 +633,6 @@ def custom_lookup(query, rdtype): @pytest.mark.asyncio async def test_wildcard_deduplication(bbot_scanner): - custom_lookup = """ def custom_lookup(query, rdtype): if rdtype == "TXT" and query.strip(".").endswith("evilcorp.com"): @@ -670,7 +668,6 @@ async def handle_event(self, event): @pytest.mark.asyncio async def test_dns_raw_records(bbot_scanner): - from bbot.modules.base import BaseModule class DummyModule(BaseModule): diff --git a/bbot/test/test_step_1/test_engine.py b/bbot/test/test_step_1/test_engine.py index dbb21246f2..653c3dcd6c 100644 --- a/bbot/test/test_step_1/test_engine.py +++ b/bbot/test/test_step_1/test_engine.py @@ -14,7 +14,6 @@ async def test_engine(): return_errored = False class TestEngineServer(EngineServer): - CMDS = { 0: "return_thing", 1: "yield_stuff", @@ -54,7 +53,6 @@ async def yield_stuff(self, n): raise class TestEngineClient(EngineClient): - SERVER_CLASS = TestEngineServer async def return_thing(self, n): diff --git a/bbot/test/test_step_1/test_events.py b/bbot/test/test_step_1/test_events.py index 39be4d704b..195f08ea89 100644 --- a/bbot/test/test_step_1/test_events.py +++ b/bbot/test/test_step_1/test_events.py @@ -9,7 +9,6 @@ @pytest.mark.asyncio async def test_events(events, helpers): - scan = Scanner() await scan._prep() @@ -617,7 +616,6 @@ async def test_events(events, helpers): @pytest.mark.asyncio async def test_event_discovery_context(): - from bbot.modules.base import BaseModule scan = Scanner("evilcorp.com") diff --git a/bbot/test/test_step_1/test_helpers.py b/bbot/test/test_step_1/test_helpers.py index 16b0dc9ec5..2eb67cd13d 100644 --- a/bbot/test/test_step_1/test_helpers.py +++ b/bbot/test/test_step_1/test_helpers.py @@ -857,7 +857,6 @@ def test_liststring_invalidfnchars(helpers): # test parameter validation @pytest.mark.asyncio async def test_parameter_validation(helpers): - getparam_valid_params = { "name", "age", diff --git a/bbot/test/test_step_1/test_presets.py b/bbot/test/test_step_1/test_presets.py index 73fdcf23a5..5b1564f12c 100644 --- a/bbot/test/test_step_1/test_presets.py +++ b/bbot/test/test_step_1/test_presets.py @@ -16,7 +16,7 @@ def test_preset_descriptions(): # ensure very preset has a description preset = Preset() - for (loaded_preset, category, preset_path, original_filename) in preset.all_presets.values(): + for loaded_preset, category, preset_path, original_filename in preset.all_presets.values(): assert ( loaded_preset.description ), f'Preset "{loaded_preset.name}" at {original_filename} does not have a description.' @@ -68,7 +68,6 @@ def test_core(): def test_preset_yaml(clean_default_config): - import yaml preset1 = Preset( @@ -171,7 +170,6 @@ def test_preset_cache(): def test_preset_scope(): - # test target merging scan = Scanner("1.2.3.4", preset=Preset.from_dict({"target": ["evilcorp.com"]})) assert {str(h) for h in scan.preset.target.seeds.hosts} == {"1.2.3.4/32", "evilcorp.com"} @@ -378,7 +376,6 @@ def test_preset_scope(): @pytest.mark.asyncio async def test_preset_logging(): - scan = Scanner() # test individual verbosity levels @@ -711,7 +708,6 @@ class TestModule5(BaseModule): def test_preset_include(): - # test recursive preset inclusion custom_preset_dir_1 = bbot_test_dir / "custom_preset_dir" @@ -883,7 +879,6 @@ def test_preset_module_disablement(clean_default_config): def test_preset_require_exclude(): - def get_module_flags(p): for m in p.scan_modules: preloaded = p.preloaded_module(m) diff --git a/bbot/test/test_step_1/test_target.py b/bbot/test/test_step_1/test_target.py index 3c9a9832b5..8f2a6bf91f 100644 --- a/bbot/test/test_step_1/test_target.py +++ b/bbot/test/test_step_1/test_target.py @@ -337,7 +337,6 @@ async def test_target(bbot_scanner): @pytest.mark.asyncio async def test_blacklist_regex(bbot_scanner, bbot_httpserver): - from bbot.scanner.target import ScanBlacklist blacklist = ScanBlacklist("evilcorp.com") diff --git a/bbot/test/test_step_1/test_web.py b/bbot/test/test_step_1/test_web.py index 372f1eee6a..4d324abced 100644 --- a/bbot/test/test_step_1/test_web.py +++ b/bbot/test/test_step_1/test_web.py @@ -6,7 +6,6 @@ @pytest.mark.asyncio async def test_web_engine(bbot_scanner, bbot_httpserver, httpx_mock): - from werkzeug.wrappers import Response def server_handler(request): @@ -134,7 +133,6 @@ def server_handler(request): @pytest.mark.asyncio async def test_web_helpers(bbot_scanner, bbot_httpserver, httpx_mock): - # json conversion scan = bbot_scanner("evilcorp.com") url = "http://www.evilcorp.com/json_test?a=b" diff --git a/bbot/test/test_step_1/test_web_envelopes.py b/bbot/test/test_step_1/test_web_envelopes.py new file mode 100644 index 0000000000..a5904db141 --- /dev/null +++ b/bbot/test/test_step_1/test_web_envelopes.py @@ -0,0 +1,339 @@ +import pytest + + +async def test_web_envelopes(): + from bbot.core.helpers.web.envelopes import ( + BaseEnvelope, + TextEnvelope, + HexEnvelope, + B64Envelope, + JSONEnvelope, + XMLEnvelope, + URLEnvelope, + ) + + # simple text + text_envelope = BaseEnvelope.detect("foo") + assert isinstance(text_envelope, TextEnvelope) + assert text_envelope.unpacked_data() == "foo" + assert text_envelope.subparams == {"__default__": "foo"} + expected_subparams = [([], "foo")] + assert list(text_envelope.get_subparams()) == expected_subparams + for subparam, value in expected_subparams: + assert text_envelope.get_subparam(subparam) == value + assert text_envelope.pack() == "foo" + assert text_envelope.num_envelopes == 0 + assert text_envelope.get_subparam() == "foo" + text_envelope.set_subparam(value="bar") + assert text_envelope.get_subparam() == "bar" + assert text_envelope.unpacked_data() == "bar" + + # simple binary + # binary_envelope = BaseEnvelope.detect("foo\x00") + # assert isinstance(binary_envelope, BinaryEnvelope) + # assert binary_envelope.unpacked_data == "foo\x00" + # assert binary_envelope.packed_data == "foo\x00" + # assert binary_envelope.subparams == {"__default__": "foo\x00"} + + # text encoded as hex + hex_envelope = BaseEnvelope.detect("706172616d") + assert isinstance(hex_envelope, HexEnvelope) + assert hex_envelope.unpacked_data(recursive=True) == "param" + hex_inner_envelope = hex_envelope.unpacked_data(recursive=False) + assert isinstance(hex_inner_envelope, TextEnvelope) + assert hex_inner_envelope.unpacked_data(recursive=False) == "param" + assert hex_inner_envelope.unpacked_data(recursive=True) == "param" + assert list(hex_envelope.get_subparams(recursive=False)) == [([], hex_inner_envelope)] + assert list(hex_envelope.get_subparams(recursive=True)) == [([], "param")] + assert hex_inner_envelope.unpacked_data() == "param" + assert hex_inner_envelope.subparams == {"__default__": "param"} + expected_subparams = [([], "param")] + assert list(hex_inner_envelope.get_subparams()) == expected_subparams + for subparam, value in expected_subparams: + assert hex_inner_envelope.get_subparam(subparam) == value + assert hex_envelope.pack() == "706172616d" + assert hex_envelope.num_envelopes == 1 + assert hex_envelope.get_subparam() == "param" + hex_envelope.set_subparam(value="asdf") + assert hex_envelope.get_subparam() == "asdf" + assert hex_envelope.unpacked_data() == "asdf" + assert hex_envelope.pack() == "61736466" + + # text encoded as base64 + base64_envelope = BaseEnvelope.detect("cGFyYW0=") + assert isinstance(base64_envelope, B64Envelope) + assert base64_envelope.unpacked_data() == "param" + base64_inner_envelope = base64_envelope.unpacked_data(recursive=False) + assert isinstance(base64_inner_envelope, TextEnvelope) + assert list(base64_envelope.get_subparams(recursive=False)) == [([], base64_inner_envelope)] + assert list(base64_envelope.get_subparams()) == [([], "param")] + assert base64_inner_envelope.pack() == "param" + assert base64_inner_envelope.unpacked_data() == "param" + assert base64_inner_envelope.subparams == {"__default__": "param"} + expected_subparams = [([], "param")] + assert list(base64_inner_envelope.get_subparams()) == expected_subparams + for subparam, value in expected_subparams: + assert base64_inner_envelope.get_subparam(subparam) == value + assert base64_envelope.num_envelopes == 1 + base64_envelope.set_subparam(value="asdf") + assert base64_envelope.get_subparam() == "asdf" + assert base64_envelope.unpacked_data() == "asdf" + assert base64_envelope.pack() == "YXNkZg==" + + # test inside hex inside base64 + hex_envelope = BaseEnvelope.detect("634746795957303d") + assert isinstance(hex_envelope, HexEnvelope) + assert hex_envelope.get_subparam() == "param" + assert hex_envelope.unpacked_data() == "param" + base64_envelope = hex_envelope.unpacked_data(recursive=False) + assert isinstance(base64_envelope, B64Envelope) + assert base64_envelope.get_subparam() == "param" + assert base64_envelope.unpacked_data() == "param" + text_envelope = base64_envelope.unpacked_data(recursive=False) + assert isinstance(text_envelope, TextEnvelope) + assert text_envelope.get_subparam() == "param" + assert text_envelope.unpacked_data() == "param" + hex_envelope.set_subparam(value="asdf") + assert hex_envelope.get_subparam() == "asdf" + assert hex_envelope.unpacked_data() == "asdf" + assert text_envelope.get_subparam() == "asdf" + assert text_envelope.unpacked_data() == "asdf" + assert base64_envelope.get_subparam() == "asdf" + assert base64_envelope.unpacked_data() == "asdf" + + # URL-encoded text + url_encoded_envelope = BaseEnvelope.detect("a%20b%20c") + assert isinstance(url_encoded_envelope, URLEnvelope) + assert url_encoded_envelope.pack() == "a%20b%20c" + assert url_encoded_envelope.unpacked_data() == "a b c" + url_inner_envelope = url_encoded_envelope.unpacked_data(recursive=False) + assert isinstance(url_inner_envelope, TextEnvelope) + assert url_inner_envelope.unpacked_data(recursive=False) == "a b c" + assert url_inner_envelope.unpacked_data(recursive=True) == "a b c" + assert list(url_encoded_envelope.get_subparams(recursive=False)) == [([], url_inner_envelope)] + assert list(url_encoded_envelope.get_subparams(recursive=True)) == [([], "a b c")] + assert url_inner_envelope.pack() == "a b c" + assert url_inner_envelope.unpacked_data() == "a b c" + assert url_inner_envelope.subparams == {"__default__": "a b c"} + expected_subparams = [([], "a b c")] + assert list(url_inner_envelope.get_subparams()) == expected_subparams + for subparam, value in expected_subparams: + assert url_inner_envelope.get_subparam(subparam) == value + assert url_encoded_envelope.num_envelopes == 1 + url_encoded_envelope.set_subparam(value="a s d f") + assert url_encoded_envelope.get_subparam() == "a s d f" + assert url_encoded_envelope.unpacked_data() == "a s d f" + assert url_encoded_envelope.pack() == "a%20s%20d%20f" + + # json + json_envelope = BaseEnvelope.detect('{"param1": "val1", "param2": {"param3": "val3"}}') + assert isinstance(json_envelope, JSONEnvelope) + assert json_envelope.pack() == '{"param1": "val1", "param2": {"param3": "val3"}}' + assert json_envelope.unpacked_data() == {"param1": "val1", "param2": {"param3": "val3"}} + assert json_envelope.unpacked_data(recursive=False) == {"param1": "val1", "param2": {"param3": "val3"}} + assert json_envelope.unpacked_data(recursive=True) == {"param1": "val1", "param2": {"param3": "val3"}} + assert json_envelope.subparams == {"param1": "val1", "param2": {"param3": "val3"}} + expected_subparams = [ + (["param1"], "val1"), + (["param2", "param3"], "val3"), + ] + assert list(json_envelope.get_subparams()) == expected_subparams + for subparam, value in expected_subparams: + assert json_envelope.get_subparam(subparam) == value + json_envelope.selected_subparam = ["param2", "param3"] + assert json_envelope.get_subparam() == "val3" + assert json_envelope.num_envelopes == 1 + + # xml + xml_envelope = BaseEnvelope.detect( + 'val1val3' + ) + assert isinstance(xml_envelope, XMLEnvelope) + assert ( + xml_envelope.pack() + == '\nval1val3' + ) + assert xml_envelope.unpacked_data() == { + "root": {"param1": {"@attr": "attr1", "#text": "val1"}, "param2": {"param3": "val3"}} + } + assert xml_envelope.unpacked_data(recursive=False) == { + "root": {"param1": {"@attr": "attr1", "#text": "val1"}, "param2": {"param3": "val3"}} + } + assert xml_envelope.unpacked_data(recursive=True) == { + "root": {"param1": {"@attr": "attr1", "#text": "val1"}, "param2": {"param3": "val3"}} + } + assert xml_envelope.subparams == { + "root": {"param1": {"@attr": "attr1", "#text": "val1"}, "param2": {"param3": "val3"}} + } + expected_subparams = [ + (["root", "param1", "@attr"], "attr1"), + (["root", "param1", "#text"], "val1"), + (["root", "param2", "param3"], "val3"), + ] + assert list(xml_envelope.get_subparams()) == expected_subparams + for subparam, value in expected_subparams: + assert xml_envelope.get_subparam(subparam) == value + assert xml_envelope.num_envelopes == 1 + + # json inside base64 + base64_json_envelope = BaseEnvelope.detect("eyJwYXJhbTEiOiAidmFsMSIsICJwYXJhbTIiOiB7InBhcmFtMyI6ICJ2YWwzIn19") + assert isinstance(base64_json_envelope, B64Envelope) + assert base64_json_envelope.pack() == "eyJwYXJhbTEiOiAidmFsMSIsICJwYXJhbTIiOiB7InBhcmFtMyI6ICJ2YWwzIn19" + assert base64_json_envelope.unpacked_data() == {"param1": "val1", "param2": {"param3": "val3"}} + base64_inner_envelope = base64_json_envelope.unpacked_data(recursive=False) + assert isinstance(base64_inner_envelope, JSONEnvelope) + assert base64_inner_envelope.pack() == '{"param1": "val1", "param2": {"param3": "val3"}}' + assert base64_inner_envelope.unpacked_data() == {"param1": "val1", "param2": {"param3": "val3"}} + assert base64_inner_envelope.subparams == {"param1": "val1", "param2": {"param3": "val3"}} + expected_subparams = [ + (["param1"], "val1"), + (["param2", "param3"], "val3"), + ] + assert list(base64_json_envelope.get_subparams()) == expected_subparams + for subparam, value in expected_subparams: + assert base64_json_envelope.get_subparam(subparam) == value + assert base64_json_envelope.num_envelopes == 2 + with pytest.raises(ValueError): + assert base64_json_envelope.get_subparam() + base64_json_envelope.selected_subparam = ["param2", "param3"] + assert base64_json_envelope.get_subparam() == "val3" + + # xml inside url inside hex inside base64 + nested_xml_envelope = BaseEnvelope.detect( + "MjUzMzYzMjUzNzMyMjUzNjY2MjUzNjY2MjUzNzM0MjUzMzY1MjUzMzYzMjUzNzMwMjUzNjMxMjUzNzMyMjUzNjMxMjUzNjY0MjUzMzMxMjUzMjMwMjUzNjMxMjUzNzM0MjUzNzM0MjUzNzMyMjUzMzY0MjUzMjMyMjUzNzM2MjUzNjMxMjUzNjYzMjUzMzMxMjUzMjMyMjUzMzY1MjUzNzM2MjUzNjMxMjUzNjYzMjUzMzMxMjUzMzYzMjUzMjY2MjUzNzMwMjUzNjMxMjUzNzMyMjUzNjMxMjUzNjY0MjUzMzMxMjUzMzY1MjUzMzYzMjUzNzMwMjUzNjMxMjUzNzMyMjUzNjMxMjUzNjY0MjUzMzMyMjUzMzY1MjUzMzYzMjUzNzMwMjUzNjMxMjUzNzMyMjUzNjMxMjUzNjY0MjUzMzMzMjUzMzY1MjUzNzM2MjUzNjMxMjUzNjYzMjUzMzMzMjUzMzYzMjUzMjY2MjUzNzMwMjUzNjMxMjUzNzMyMjUzNjMxMjUzNjY0MjUzMzMzMjUzMzY1MjUzMzYzMjUzMjY2MjUzNzMwMjUzNjMxMjUzNzMyMjUzNjMxMjUzNjY0MjUzMzMyMjUzMzY1MjUzMzYzMjUzMjY2MjUzNzMyMjUzNjY2MjUzNjY2MjUzNzM0MjUzMzY1" + ) + assert isinstance(nested_xml_envelope, B64Envelope) + assert nested_xml_envelope.unpacked_data() == { + "root": {"param1": {"@attr": "val1", "#text": "val1"}, "param2": {"param3": "val3"}} + } + assert ( + nested_xml_envelope.pack() + == "MjUzMzQzMjUzMzQ2Nzg2ZDZjMjUzMjMwNzY2NTcyNzM2OTZmNmUyNTMzNDQyNTMyMzIzMTJlMzAyNTMyMzIyNTMyMzA2NTZlNjM2ZjY0Njk2ZTY3MjUzMzQ0MjUzMjMyNzU3NDY2MmQzODI1MzIzMjI1MzM0NjI1MzM0NTI1MzA0MTI1MzM0MzcyNmY2Zjc0MjUzMzQ1MjUzMzQzNzA2MTcyNjE2ZDMxMjUzMjMwNjE3NDc0NzIyNTMzNDQyNTMyMzI3NjYxNmMzMTI1MzIzMjI1MzM0NTc2NjE2YzMxMjUzMzQzMmY3MDYxNzI2MTZkMzEyNTMzNDUyNTMzNDM3MDYxNzI2MTZkMzIyNTMzNDUyNTMzNDM3MDYxNzI2MTZkMzMyNTMzNDU3NjYxNmMzMzI1MzM0MzJmNzA2MTcyNjE2ZDMzMjUzMzQ1MjUzMzQzMmY3MDYxNzI2MTZkMzIyNTMzNDUyNTMzNDMyZjcyNmY2Zjc0MjUzMzQ1" + ) + inner_hex_envelope = nested_xml_envelope.unpacked_data(recursive=False) + assert isinstance(inner_hex_envelope, HexEnvelope) + assert ( + inner_hex_envelope.pack() + == "253343253346786d6c25323076657273696f6e253344253232312e30253232253230656e636f64696e672533442532327574662d38253232253346253345253041253343726f6f74253345253343706172616d312532306174747225334425323276616c3125323225334576616c312533432f706172616d31253345253343706172616d32253345253343706172616d3325334576616c332533432f706172616d332533452533432f706172616d322533452533432f726f6f74253345" + ) + inner_url_envelope = inner_hex_envelope.unpacked_data(recursive=False) + assert isinstance(inner_url_envelope, URLEnvelope) + assert ( + inner_url_envelope.pack() + == r"%3C%3Fxml%20version%3D%221.0%22%20encoding%3D%22utf-8%22%3F%3E%0A%3Croot%3E%3Cparam1%20attr%3D%22val1%22%3Eval1%3C/param1%3E%3Cparam2%3E%3Cparam3%3Eval3%3C/param3%3E%3C/param2%3E%3C/root%3E" + ) + inner_xml_envelope = inner_url_envelope.unpacked_data(recursive=False) + assert isinstance(inner_xml_envelope, XMLEnvelope) + assert ( + inner_xml_envelope.pack() + == '\nval1val3' + ) + assert inner_xml_envelope.unpacked_data() == { + "root": {"param1": {"@attr": "val1", "#text": "val1"}, "param2": {"param3": "val3"}} + } + assert inner_xml_envelope.subparams == { + "root": {"param1": {"@attr": "val1", "#text": "val1"}, "param2": {"param3": "val3"}} + } + expected_subparams = [ + (["root", "param1", "@attr"], "val1"), + (["root", "param1", "#text"], "val1"), + (["root", "param2", "param3"], "val3"), + ] + assert list(nested_xml_envelope.get_subparams()) == expected_subparams + for subparam, value in expected_subparams: + assert nested_xml_envelope.get_subparam(subparam) == value + assert nested_xml_envelope.num_envelopes == 4 + + # manipulating text inside hex + hex_envelope = BaseEnvelope.detect("706172616d") + expected_subparams = [([], "param")] + assert list(hex_envelope.get_subparams()) == expected_subparams + for subparam, value in expected_subparams: + assert hex_envelope.get_subparam(subparam) == value + hex_envelope.set_subparam([], "asdf") + expected_subparams = [([], "asdf")] + assert list(hex_envelope.get_subparams()) == expected_subparams + for subparam, value in expected_subparams: + assert hex_envelope.get_subparam(subparam) == value + assert hex_envelope.unpacked_data() == "asdf" + + # manipulating json inside base64 + base64_json_envelope = BaseEnvelope.detect("eyJwYXJhbTEiOiAidmFsMSIsICJwYXJhbTIiOiB7InBhcmFtMyI6ICJ2YWwzIn19") + expected_subparams = [ + (["param1"], "val1"), + (["param2", "param3"], "val3"), + ] + assert list(base64_json_envelope.get_subparams()) == expected_subparams + for subparam, value in expected_subparams: + assert base64_json_envelope.get_subparam(subparam) == value + base64_json_envelope.set_subparam(["param1"], {"asdf": [None], "fdsa": 1.0}) + expected_subparams = [ + (["param1", "asdf"], [None]), + (["param1", "fdsa"], 1.0), + (["param2", "param3"], "val3"), + ] + assert list(base64_json_envelope.get_subparams()) == expected_subparams + for subparam, value in expected_subparams: + assert base64_json_envelope.get_subparam(subparam) == value + base64_json_envelope.set_subparam(["param2", "param3"], {"1234": [None], "4321": 1.0}) + expected_subparams = [ + (["param1", "asdf"], [None]), + (["param1", "fdsa"], 1.0), + (["param2", "param3", "1234"], [None]), + (["param2", "param3", "4321"], 1.0), + ] + assert list(base64_json_envelope.get_subparams()) == expected_subparams + base64_json_envelope.set_subparam(["param2"], None) + expected_subparams = [ + (["param1", "asdf"], [None]), + (["param1", "fdsa"], 1.0), + (["param2"], None), + ] + assert list(base64_json_envelope.get_subparams()) == expected_subparams + + # xml inside url inside base64 + xml_envelope = BaseEnvelope.detect( + "JTNDP3htbCUyMHZlcnNpb249JTIyMS4wJTIyJTIwZW5jb2Rpbmc9JTIydXRmLTglMjI/JTNFJTBBJTNDcm9vdCUzRSUzQ3BhcmFtMSUyMGF0dHI9JTIydmFsMSUyMiUzRXZhbDElM0MvcGFyYW0xJTNFJTNDcGFyYW0yJTNFJTNDcGFyYW0zJTNFdmFsMyUzQy9wYXJhbTMlM0UlM0MvcGFyYW0yJTNFJTNDL3Jvb3QlM0U=" + ) + assert ( + xml_envelope.pack() + == "JTNDJTNGeG1sJTIwdmVyc2lvbiUzRCUyMjEuMCUyMiUyMGVuY29kaW5nJTNEJTIydXRmLTglMjIlM0YlM0UlMEElM0Nyb290JTNFJTNDcGFyYW0xJTIwYXR0ciUzRCUyMnZhbDElMjIlM0V2YWwxJTNDL3BhcmFtMSUzRSUzQ3BhcmFtMiUzRSUzQ3BhcmFtMyUzRXZhbDMlM0MvcGFyYW0zJTNFJTNDL3BhcmFtMiUzRSUzQy9yb290JTNF" + ) + expected_subparams = [ + (["root", "param1", "@attr"], "val1"), + (["root", "param1", "#text"], "val1"), + (["root", "param2", "param3"], "val3"), + ] + assert list(xml_envelope.get_subparams()) == expected_subparams + xml_envelope.set_subparam(["root", "param1", "@attr"], "asdf") + expected_subparams = [ + (["root", "param1", "@attr"], "asdf"), + (["root", "param1", "#text"], "val1"), + (["root", "param2", "param3"], "val3"), + ] + assert list(xml_envelope.get_subparams()) == expected_subparams + assert ( + xml_envelope.pack() + == "JTNDJTNGeG1sJTIwdmVyc2lvbiUzRCUyMjEuMCUyMiUyMGVuY29kaW5nJTNEJTIydXRmLTglMjIlM0YlM0UlMEElM0Nyb290JTNFJTNDcGFyYW0xJTIwYXR0ciUzRCUyMmFzZGYlMjIlM0V2YWwxJTNDL3BhcmFtMSUzRSUzQ3BhcmFtMiUzRSUzQ3BhcmFtMyUzRXZhbDMlM0MvcGFyYW0zJTNFJTNDL3BhcmFtMiUzRSUzQy9yb290JTNF" + ) + xml_envelope.set_subparam(["root", "param2", "param3"], {"1234": [None], "4321": 1.0}) + expected_subparams = [ + (["root", "param1", "@attr"], "asdf"), + (["root", "param1", "#text"], "val1"), + (["root", "param2", "param3", "1234"], [None]), + (["root", "param2", "param3", "4321"], 1.0), + ] + assert list(xml_envelope.get_subparams()) == expected_subparams + + # null + null_envelope = BaseEnvelope.detect("null") + assert isinstance(null_envelope, JSONEnvelope) + assert null_envelope.unpacked_data() == None + assert null_envelope.pack() == "null" + expected_subparams = [([], None)] + assert list(null_envelope.get_subparams()) == expected_subparams + for subparam, value in expected_subparams: + assert null_envelope.get_subparam(subparam) == value + + tiny_base64 = BaseEnvelope.detect("YWJi") + assert isinstance(tiny_base64, TextEnvelope) diff --git a/bbot/test/test_step_2/module_tests/test_module_baddns_direct.py b/bbot/test/test_step_2/module_tests/test_module_baddns_direct.py index 77a86153c7..b2b49717c8 100644 --- a/bbot/test/test_step_2/module_tests/test_module_baddns_direct.py +++ b/bbot/test/test_step_2/module_tests/test_module_baddns_direct.py @@ -55,8 +55,8 @@ def set_target(self, target): def check(self, module_test, events): assert any( e.type == "FINDING" - and "Possible [AWS Bucket Takeover Detection] via direct BadDNS analysis. Indicator: [[Words: The specified bucket does not exist | Condition: and | Part: body] Matchers-Condition: and] Trigger: [self] baddns Module: [CNAME]" - in e.data["description"] - for e in events + and "Possible [AWS Bucket Takeover Detection] via direct BadDNS analysis. Indicator: [[Words: The specified bucket does not exist | Condition: and | Part: body] Matchers-Condition: and] Trigger: [self] baddns Module: [CNAME]" + in e.data["description"] + for e in events ), "Failed to emit FINDING" assert any("baddns-cname" in e.tags for e in events), "Failed to add baddns tag" diff --git a/bbot/test/test_step_2/module_tests/test_module_excavate.py b/bbot/test/test_step_2/module_tests/test_module_excavate.py index 915ca7d58a..b0c6b723a8 100644 --- a/bbot/test/test_step_2/module_tests/test_module_excavate.py +++ b/bbot/test/test_step_2/module_tests/test_module_excavate.py @@ -499,9 +499,7 @@ def check(self, module_test, events): found_htmltags_img = False for e in events: - if e.type == "WEB_PARAMETER": - if e.data["description"] == "HTTP Extracted Parameter [jqueryget] (GET jquery Submodule)": found_jquery_get = True if e.data["original_value"] == "value1": @@ -552,7 +550,6 @@ def check(self, module_test, events): class TestExcavateParameterExtraction_postformnoaction(ModuleTestBase): - targets = ["http://127.0.0.1:8888/"] # hunt is added as parameter extraction is only activated by one or more modules that consume WEB_PARAMETER @@ -574,7 +571,6 @@ async def setup_after_prep(self, module_test): module_test.set_expect_requests(respond_args=respond_args) def check(self, module_test, events): - excavate_getparam_extraction = False for e in events: if e.type == "WEB_PARAMETER": @@ -606,7 +602,6 @@ def check(self, module_test, events): class TestExcavateParameterExtraction_relativeurl(ModuleTestBase): - targets = ["http://127.0.0.1:8888/"] # hunt is added as parameter extraction is only activated by one or more modules that consume WEB_PARAMETER @@ -631,7 +626,6 @@ class TestExcavateParameterExtraction_relativeurl(ModuleTestBase): root_page_html = "Root page" async def setup_after_prep(self, module_test): - module_test.httpserver.expect_request("/").respond_with_data(self.primary_page_html) module_test.httpserver.expect_request("/secondary").respond_with_data(self.secondary_page_html) module_test.httpserver.expect_request("/root.html").respond_with_data(self.root_page_html) @@ -731,7 +725,6 @@ def check(self, module_test, events): class TestExcavateParameterExtraction_inputtagnovalue(ModuleTestBase): - targets = ["http://127.0.0.1:8888/"] # hunt is added as parameter extraction is only activated by one or more modules that consume WEB_PARAMETER @@ -1212,7 +1205,6 @@ class TestExcavate(ModuleTestBase): config_overrides = {"web": {"spider_distance": 1, "spider_depth": 1}} async def setup_before_prep(self, module_test): - response_data = """ ftp://ftp.test.notreal \\nhttps://www1.test.notreal @@ -1300,13 +1292,11 @@ def check(self, module_test, events): class TestExcavateHeaders_blacklist(ModuleTestBase): - targets = ["http://127.0.0.1:8888/"] modules_overrides = ["excavate", "httpx", "hunt"] config_overrides = {"web": {"spider_distance": 1, "spider_depth": 1}} async def setup_before_prep(self, module_test): - module_test.httpserver.expect_request("/").respond_with_data( "

test

", status=200, @@ -1320,7 +1310,6 @@ async def setup_before_prep(self, module_test): ) def check(self, module_test, events): - found_first_cookie = False found_second_cookie = False found_third_cookie = False diff --git a/bbot/test/test_step_2/module_tests/test_module_gowitness.py b/bbot/test/test_step_2/module_tests/test_module_gowitness.py index 2d6dc2cd8f..6090fbb1d6 100644 --- a/bbot/test/test_step_2/module_tests/test_module_gowitness.py +++ b/bbot/test/test_step_2/module_tests/test_module_gowitness.py @@ -101,6 +101,4 @@ class TestGoWitnessWithBlob(TestGowitness): def check(self, module_test, events): webscreenshots = [e for e in events if e.type == "WEBSCREENSHOT"] assert webscreenshots, "failed to raise WEBSCREENSHOT events" - assert all( - "blob" in e.data and e.data["blob"] for e in webscreenshots - ), "blob not found in WEBSCREENSHOT data" + assert all("blob" in e.data and e.data["blob"] for e in webscreenshots), "blob not found in WEBSCREENSHOT data" diff --git a/bbot/test/test_step_2/module_tests/test_module_hunt.py b/bbot/test/test_step_2/module_tests/test_module_hunt.py index 0ce8e93537..867a2565c6 100644 --- a/bbot/test/test_step_2/module_tests/test_module_hunt.py +++ b/bbot/test/test_step_2/module_tests/test_module_hunt.py @@ -23,7 +23,6 @@ def check(self, module_test, events): class TestHunt_Multiple(TestHunt): - async def setup_after_prep(self, module_test): expect_args = {"method": "GET", "uri": "/"} respond_args = {"response_data": 'ping'} diff --git a/bbot/test/test_step_2/module_tests/test_module_lightfuzz.py b/bbot/test/test_step_2/module_tests/test_module_lightfuzz.py index 72d607c0c7..b54a1d277a 100644 --- a/bbot/test/test_step_2/module_tests/test_module_lightfuzz.py +++ b/bbot/test/test_step_2/module_tests/test_module_lightfuzz.py @@ -34,7 +34,6 @@ async def setup_after_prep(self, module_test): module_test.set_expect_requests(expect_args=expect_args, respond_args=respond_args) def request_handler(self, request): - qs = str(request.query_string.decode()) if "filename=" in qs: value = qs.split("=")[1] @@ -52,11 +51,9 @@ def request_handler(self, request): return Response("file not found", status=500) def check(self, module_test, events): - web_parameter_emitted = False pathtraversal_finding_emitted = False for e in events: - if e.type == "WEB_PARAMETER": if "HTTP Extracted Parameter [filename]" in e.data["description"]: web_parameter_emitted = True @@ -74,7 +71,6 @@ def check(self, module_test, events): # Path Traversal Absolute path class Test_Lightfuzz_path_absolute(Test_Lightfuzz_path_singledot): - etc_passwd = """ root:x:0:0:root:/root:/bin/bash daemon:x:1:1:daemon:/usr/sbin:/usr/sbin/nologin @@ -87,7 +83,6 @@ class Test_Lightfuzz_path_absolute(Test_Lightfuzz_path_singledot): """ async def setup_after_prep(self, module_test): - expect_args = {"method": "GET", "uri": "/images", "query_string": "filename=/etc/passwd"} respond_args = {"response_data": self.etc_passwd} module_test.set_expect_requests(expect_args=expect_args, respond_args=respond_args) @@ -104,7 +99,6 @@ async def setup_after_prep(self, module_test): module_test.set_expect_requests(expect_args=expect_args, respond_args=respond_args) def check(self, module_test, events): - web_parameter_emitted = False pathtraversal_finding_emitted = False for e in events: @@ -156,7 +150,6 @@ async def setup_after_prep(self, module_test): module_test.set_expect_requests_handler(expect_args=expect_args, request_handler=self.request_handler) def check(self, module_test, events): - web_parameter_emitted = False ssti_finding_emitted = False for e in events: @@ -189,7 +182,6 @@ class Test_Lightfuzz_xss(ModuleTestBase): } def request_handler(self, request): - qs = str(request.query_string.decode()) parameter_block = """ @@ -219,7 +211,6 @@ async def setup_after_prep(self, module_test): module_test.set_expect_requests_handler(expect_args=expect_args, request_handler=self.request_handler) def check(self, module_test, events): - web_parameter_emitted = False xss_finding_emitted = False for e in events: @@ -238,12 +229,8 @@ def check(self, module_test, events): # Base64 Envelope XSS Detection class Test_Lightfuzz_envelope_base64(Test_Lightfuzz_xss): def request_handler(self, request): - qs = str(request.query_string.decode()) - print("****") - print(qs) - parameter_block = """