diff --git a/bbot/core/event/base.py b/bbot/core/event/base.py index d5a9552e20..e8429869fc 100644 --- a/bbot/core/event/base.py +++ b/bbot/core/event/base.py @@ -1,20 +1,24 @@ +from collections import defaultdict import io import re import uuid import json import base64 +import asyncio import logging import tarfile +import binascii import datetime import ipaddress import traceback +import xml.etree.ElementTree as ET -from copy import copy +from copy import copy, deepcopy from pathlib import Path from typing import Optional from contextlib import suppress from radixtarget import RadixTarget -from urllib.parse import urljoin, parse_qs +from urllib.parse import urljoin, parse_qs, unquote, quote from pydantic import BaseModel, field_validator @@ -618,6 +622,19 @@ def get_parents(self, omit=False, include_self=False): e = parent return parents + def clone(self): + + # Create a shallow copy of the event first + cloned_event = copy(self) + + # Handle attributes that need deep copying manually + setattr(cloned_event, "envelopes", deepcopy(self.envelopes)) + + # Re-assign a new UUID + cloned_event.uuid = uuid.uuid4() + + return cloned_event + def _host(self): return "" @@ -797,7 +814,13 @@ def json(self, mode="json", siem_friendly=False): j["discovery_path"] = self.discovery_path j["parent_chain"] = self.parent_chain + # parameter envelopes + parameter_envelopes = getattr(self, "envelopes", None) + if parameter_envelopes is not None: + j["envelopes"] = parameter_envelopes.to_dict() + # normalize non-primitive python objects + for k, v in list(j.items()): if k == "data": continue @@ -1281,12 +1304,384 @@ class URL_HINT(URL_UNVERIFIED): class WEB_PARAMETER(DictHostEvent): + class ParameterEnvelopes: + + @staticmethod + def preprocess_base64(base64_str): + return base64.b64decode(base64_str).decode() + + @staticmethod + def postprocess_base64(string): + return base64.b64encode(string.encode()).decode() + + @staticmethod + def preprocess_hex(hex_str): + return bytes.fromhex(hex_str).decode() + + @staticmethod + def postprocess_hex(string): + return string.encode().hex() + + @staticmethod + def preprocess_urlencoded(url_encoded_str): + return unquote(url_encoded_str) + + @staticmethod + def postprocess_urlencoded(string): + return quote(string) + + @staticmethod + def is_ascii_printable(s): + return all(32 <= ord(char) < 127 for char in s) + + # Converts XML ElementTree to a JSON-like dictionary + def xml_to_dict(self, elem): + """ + Convert XML ElementTree to a dictionary recursively. + """ + d = {elem.tag: {} if elem.attrib else None} + children = list(elem) + if children: + dd = defaultdict(list) + for dc in map(self.xml_to_dict, children): + for k, v in dc.items(): + dd[k].append(v) + d = {elem.tag: {k: v[0] if len(v) == 1 else v for k, v in dd.items()}} + if elem.attrib: + d[elem.tag].update(("@" + k, v) for k, v in elem.attrib.items()) + if elem.text: + text = elem.text.strip() + if children or elem.attrib: + if text: + d[elem.tag]["#text"] = text + else: + d[elem.tag] = text + return d + + def dict_to_xml(self, d): + """ + Converts a dictionary to an XML string without adding an extra root node. + Assumes the dictionary was originally an XML structure. + """ + if not isinstance(d, dict) or len(d) != 1: + raise ValueError("Expected a dictionary with a single root element.") + + # Get the root element directly from the dict keys + root_tag = list(d.keys())[0] + root_element = ET.Element(root_tag) + + # Recursive function to handle nested dicts + def _build_tree(element, subdict): + for key, value in subdict.items(): + if isinstance(value, dict): + # Nested element + sub_element = ET.SubElement(element, key) + _build_tree(sub_element, value) + else: + # Leaf element + sub_element = ET.SubElement(element, key) + sub_element.text = str(value) + + # Start building the tree + _build_tree(root_element, d[root_tag]) + + return ET.tostring(root_element, encoding="utf-8").decode("utf-8") + + preprocess_map = { + "base64": preprocess_base64, + "hex": preprocess_hex, + "url-encoded": preprocess_urlencoded, + } + postprocess_map = { + "base64": postprocess_base64, + "hex": postprocess_hex, + "url-encoded": postprocess_urlencoded, + } + + # Format-specific functions for isolating and updating parameters + format_isolate_map = { + "json": lambda self: self.isolate_parameter(), + "xml": lambda self: self.isolate_parameter(), + } + format_update_map = { + "json": lambda self, value: self.update_json_parameter(value), + "xml": lambda self, value: self.update_xml_parameter(value), # Placeholder + } + + def initialize_value(self, value=None): + self.envelopes, end_format_dict = self.recurse_envelopes(value) + if self.envelopes: + log.debug(f"Discovered the following envelopes: [{','.join(self.envelopes)}]") + + if end_format_dict is not None: + self.end_format_type = list(end_format_dict.keys())[0] + log.debug(f"Identified the following end format: [{self.end_format_type}]") + self.end_format_data = list(end_format_dict.values())[0] + else: + self.end_format_type = None + self.end_format_data = None + self.end_format_subparameter = None + + def remove_envelopes(self, value): + """ + Remove envelopes from the value, processing each envelope in the order it was applied. + If the final format is present, trigger the appropriate handler (e.g., for JSON). + """ + # Apply the preprocess functions in the order the envelopes were applied + for env in self.envelopes: + func = self.preprocess_map.get(env) + if func: + # python3.9 compatibility hack + if isinstance(func, staticmethod): + func = func.__get__(None, self.__class__) # Unwrap staticmethod + value = func(value) + + # Dynamically select the appropriate isolate function based on the final format + isolate_func = self.format_isolate_map.get(self.end_format_type) + if isolate_func: + return isolate_func(self) + return value + + def add_envelopes(self, value): + """ + Add envelopes back to the value, processing in reverse order. + If the final format is present, trigger the appropriate handler (e.g., for JSON). + """ + # Dynamically select the appropriate update function based on the final format + update_func = self.format_update_map.get(self.end_format_type) + if update_func: + # python3.9 compatibility hack + if isinstance(update_func, staticmethod): + update_func = update_func.__get__(None, self.__class__) + value = update_func(self, value) + + # Apply the envelopes in reverse order + for env in self.envelopes[::-1]: + func = self.postprocess_map.get(env) + if func: + # python3.9 compatibility hack + if isinstance(func, staticmethod): + func = func.__get__(None, self.__class__) + value = func(value) + return value + + def recurse_envelopes(self, value, envelopes=None, end_format=None): + if envelopes is None: + envelopes = [] + log.debug( + f"Starting envelope recurse with value: [{value}], current envelopes: [{', '.join(envelopes)}], current end format: {end_format}" + ) + + if value is None or value == "" or isinstance(value, int): + return envelopes, end_format + + # Try URL decoding + try: + decoded_url = unquote(value) + if decoded_url != value and self.is_ascii_printable(decoded_url): + envelopes.append("url-encoded") + envelopes, end_format_dict = self.recurse_envelopes(decoded_url, envelopes) + return envelopes, end_format_dict + except Exception: + pass # Not valid URL encoding + + # Try base64 decoding + try: + decoded_base64 = base64.b64decode(value).decode() + if self.is_ascii_printable(decoded_base64): + envelopes.append("base64") + envelopes, end_format_dict = self.recurse_envelopes(decoded_base64, envelopes) + return envelopes, end_format_dict + except (binascii.Error, UnicodeDecodeError): + pass # Not valid base64 + + # Try hex decoding + try: + decoded_hex = bytes.fromhex(value).decode("utf-8") + if self.is_ascii_printable(decoded_hex): + envelopes.append("hex") + envelopes, end_format_dict = self.recurse_envelopes(decoded_hex, envelopes) + return envelopes, end_format_dict + except (ValueError, UnicodeDecodeError): + pass # Not valid hex + + # Try JSON parsing + try: + decoded_json = json.loads(value) + if isinstance(decoded_json, (str, dict, list)): + return envelopes, {"json": decoded_json} + except json.JSONDecodeError: + pass # Not valid JSON + + # Try XML parsing + try: + decoded_xml = ET.fromstring(value) + # Pass 'decoded_xml' to 'xml_to_dict' + xml_dict = self.xml_to_dict(decoded_xml) # Pass decoded XML as the 'elem' argument + return envelopes, {"xml": xml_dict} # Store as JSON-like dict, not XML + except ET.ParseError: + pass # Not valid XML + + return envelopes, end_format + + def isolate_parameter(self): + """ + Isolate the specified subparameter from the data structure (JSON/XML). + The subparameter is accessed using dot notation for nested keys. + """ + if self.end_format_data and self.end_format_subparameter: + # Split the dot notation string into keys + keys = self.end_format_subparameter.split(".") + + # Traverse the nested structure using the keys + subparameter_value = self.end_format_data + for key in keys: + if isinstance(subparameter_value, dict): + subparameter_value = subparameter_value.get(key) + else: + # If the structure is broken (not a dict), return None + return None + + return subparameter_value + + return None + + def update_json_parameter(self, new_value): + """ + Update the specified subparameter in the JSON structure and rebuild it. + """ + # Work with a copy to avoid modifying the original `end_format_data` + end_format_data_copy = deepcopy(self.end_format_data) + + if end_format_data_copy: + end_format_data_copy[self.end_format_subparameter] = new_value + return json.dumps(end_format_data_copy) + return new_value + + def update_xml_parameter(self, new_value): + """ + Convert the JSON-like structure back into an XML string after updating the specific parameter. + """ + if self.end_format_data and self.end_format_subparameter: + # Split the dot notation into keys + keys = self.end_format_subparameter.split(".") + + # Traverse the nested dictionary using the keys to find the target subparameter + current_data = self.end_format_data + for key in keys[:-1]: # Traverse up to the second-to-last key + current_data = current_data.get(key, {}) + + # Update the target subparameter with the new value + if isinstance(current_data, dict): + current_data[keys[-1]] = new_value + + # Convert the JSON-like dict back to an XML string + return self.dict_to_xml(self.end_format_data) + + return new_value + + def to_dict(self): + return { + "envelopes": self.envelopes, + "end_format_type": self.end_format_type, + "end_format_data": self.end_format_data, + "end_format_subparameter": self.end_format_subparameter, + } + + def __getstate__(self): + return self.to_dict() + + def __str__(self): + return f"ParameterEnvelopes(envelopes={self.envelopes}, end_format_type={self.end_format_type}, end_format_data={self.end_format_data}, end_format_subparameter={self.end_format_subparameter})" + + __repr__ = __str__ + + @classmethod + def from_dict(cls, data): + instance = cls() + instance.envelopes = data.get("envelopes", []) + instance.end_format_type = data.get("end_format_type") + instance.end_format_data = data.get("end_format_data") + instance.end_format_subparameter = data.get("end_format_subparameter") + return instance + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + if "original_value" in self.data.keys(): + parameterEnvelope_instance = self.ParameterEnvelopes() + parameterEnvelope_instance.initialize_value(self.data["original_value"]) + setattr(self, "envelopes", parameterEnvelope_instance) + + envelopes = getattr(self, "envelopes", None) + if ( + envelopes is not None + and getattr(envelopes, "end_format_type", None) is not None + and getattr(envelopes, "end_format_data", None) + ): + end_format_data = envelopes.end_format_data + + def extract_keys_with_values(data, parent_key=""): + """ + Recursively extract all keys from nested dictionaries that have values (non-empty). + Construct a path-like structure with dot notation (e.g., 'find.search'). + """ + keys = [] + if isinstance(data, dict): + for key, value in data.items(): + # Construct the full key path using dot notation + full_key = f"{parent_key}.{key}" if parent_key else key + + # Only add keys that have non-empty values + if value: + if isinstance(value, dict): + # Recursively check nested dictionaries + keys.extend(extract_keys_with_values(value, full_key)) + else: + # Add the key if it has a non-empty value + keys.append(full_key) + return keys + + # Extract all keys that have non-empty values + end_format_data_keys = extract_keys_with_values(end_format_data) + # If there are keys, assign the first key to end_format_subparameter + if end_format_data_keys: + + # Assign the first key to end_format_subparameter + setattr(envelopes, "end_format_subparameter", end_format_data_keys[0]) + setattr(envelopes, "end_format_subparameter", end_format_data_keys[0]) + + # Iterate through the remaining keys, starting from the second one + for p in end_format_data_keys[1:]: + log.debug(f"generating copy of event for subparameter {p} of type {envelopes.end_format_type}") + + # Make a copy of the current event data + cloned_event = self.clone() + cloned_envelopes = getattr(cloned_event, "envelopes") + cloned_envelopes.end_format_subparameter = p + asyncio.run_coroutine_threadsafe( + self.module.emit_event(cloned_event), asyncio.get_event_loop() + ) + def _data_id(self): # dedupe by url:name:param_type url = self.data.get("url", "") name = self.data.get("name", "") param_type = self.data.get("type", "") - return f"{url}:{name}:{param_type}" + envelopes = getattr(self, "envelopes", None) + subparameter = getattr(envelopes, "end_format_subparameter", "") if envelopes else "" + + return f"{url}:{name}:{param_type}:{subparameter}" + + def _outgoing_dedup_hash(self, event): + return hash( + ( + str(event.host), + event.data["url"], + event.data.get("name", ""), + event.data.get("type", ""), + event.data.get("envelopes", ""), + ) + ) def _url(self): return self.data["url"] @@ -1651,7 +2046,6 @@ def make_event( tags.add("affiliate") event_class = globals().get(event_type, DefaultEvent) - return event_class( data, event_type=event_type, @@ -1711,7 +2105,6 @@ def event_from_json(j, siem_friendly=False): resolved_hosts = j.get("resolved_hosts", []) event._resolved_hosts = set(resolved_hosts) - event.timestamp = datetime.datetime.fromisoformat(j["timestamp"]) event.scope_distance = j["scope_distance"] parent_id = j.get("parent", None) diff --git a/bbot/core/helpers/misc.py b/bbot/core/helpers/misc.py index 4e45ce21e6..6c8733eed6 100644 --- a/bbot/core/helpers/misc.py +++ b/bbot/core/helpers/misc.py @@ -11,6 +11,7 @@ import ahocorasick import regex as re import subprocess as sp + from pathlib import Path from contextlib import suppress from unidecode import unidecode # noqa F401 diff --git a/bbot/core/helpers/regexes.py b/bbot/core/helpers/regexes.py index 24d53286f5..1bbb2926a8 100644 --- a/bbot/core/helpers/regexes.py +++ b/bbot/core/helpers/regexes.py @@ -116,8 +116,12 @@ # For use with excavate paramaters extractor input_tag_regex = re.compile( - r"]+?name=[\"\']?([\.$\w]+)[\"\']?(?:[^>]*?value=[\"\']([=+\/\w]*)[\"\'])?[^>]*>" + r"]*?name=[\"\']?([\._=+\/\w]+)[\"\']?[^>]*?value=[\"\']?([\._=+\/\w]*)[\"\']?[^>]*?>" ) +input_tag_regex2 = re.compile( + r"]*?value=[\"\']?([\._=+\/\w]*)[\"\']?[^>]*?name=[\"\']?([\._=+\/\w]+)[\"\']?[^>]*?>" +) +input_tag_novalue_regex = re.compile(r"]*\bvalue=)[^>]*?name=[\"\']?([\._=+\/\w]*)[\"\']?[^>]*?>") # jquery_get_regex = re.compile(r"url:\s?[\"\'].+?\?(\w+)=") # jquery_get_regex = re.compile(r"\$.get\([\'\"].+[\'\"].+\{(.+)\}") # jquery_post_regex = re.compile(r"\$.post\([\'\"].+[\'\"].+\{(.+)\}") diff --git a/bbot/modules/internal/excavate.py b/bbot/modules/internal/excavate.py index 481b63f589..ded56bd770 100644 --- a/bbot/modules/internal/excavate.py +++ b/bbot/modules/internal/excavate.py @@ -329,6 +329,8 @@ class excavateTestRule(ExcavateRule): "ASP.NET_SessionId", "JSESSIONID", "PHPSESSID", + "AWSALB", + "AWSALBCORS", ] ) @@ -432,11 +434,13 @@ def extract(self): class GetForm(ParameterExtractorRule): name = "GET Form" discovery_regex = r'/