Skip to content

Commit

Permalink
merge dev
Browse files Browse the repository at this point in the history
  • Loading branch information
github-actions committed Dec 20, 2024
2 parents 5c48512 + 5db65e1 commit a9d9218
Show file tree
Hide file tree
Showing 26 changed files with 446 additions and 229 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,11 @@ Targets can be any of the following:
- `IP_RANGE` (`1.2.3.0/24`)
- `OPEN_TCP_PORT` (`192.168.0.1:80`)
- `URL` (`https://www.evilcorp.com`)
- `EMAIL_ADDRESS` (`[email protected]`)
- `ORG_STUB` (`ORG:evilcorp`)
- `USER_STUB` (`USER:bobsmith`)
- `FILESYSTEM` (`FILESYSTEM:/tmp/asdf`)
- `MOBILE_APP` (`MOBILE_APP:https://play.google.com/store/apps/details?id=com.evilcorp.app`)

For more information, see [Targets](https://www.blacklanternsecurity.com/bbot/Stable/scanning/#targets-t). To learn how BBOT handles scope, see [Scope](https://www.blacklanternsecurity.com/bbot/Stable/scanning/#scope).

Expand Down
84 changes: 61 additions & 23 deletions bbot/core/event/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
from typing import Optional
from contextlib import suppress
from radixtarget import RadixTarget
from urllib.parse import urljoin, parse_qs
from pydantic import BaseModel, field_validator
from urllib.parse import urlparse, urljoin, parse_qs


from .helpers import *
Expand Down Expand Up @@ -515,22 +515,25 @@ def scope_distance(self, scope_distance):
new_scope_distance = min(self.scope_distance, scope_distance)
if self._scope_distance != new_scope_distance:
# remove old scope distance tags
for t in list(self.tags):
if t.startswith("distance-"):
self.remove_tag(t)
if self.host:
if scope_distance == 0:
self.add_tag("in-scope")
self.remove_tag("affiliate")
else:
self.remove_tag("in-scope")
self.add_tag(f"distance-{new_scope_distance}")
self._scope_distance = new_scope_distance
self.refresh_scope_tags()
# apply recursively to parent events
parent_scope_distance = getattr(self.parent, "scope_distance", None)
if parent_scope_distance is not None and self.parent is not self:
self.parent.scope_distance = new_scope_distance + 1

def refresh_scope_tags(self):
for t in list(self.tags):
if t.startswith("distance-"):
self.remove_tag(t)
if self.host:
if self.scope_distance == 0:
self.add_tag("in-scope")
self.remove_tag("affiliate")
else:
self.remove_tag("in-scope")
self.add_tag(f"distance-{self.scope_distance}")

@property
def scope_description(self):
"""
Expand Down Expand Up @@ -587,7 +590,7 @@ def parent(self, parent):
if t in ("spider-danger", "spider-max"):
self.add_tag(t)
elif not self._dummy:
log.warning(f"Tried to set invalid parent on {self}: (got: {parent})")
log.warning(f"Tried to set invalid parent on {self}: (got: {repr(parent)} ({type(parent)}))")

@property
def parent_id(self):
Expand Down Expand Up @@ -1042,6 +1045,9 @@ def sanitize_data(self, data):
blob = None
try:
self._data_path = Path(data["path"])
# prepend the scan's home dir if the path is relative
if not self._data_path.is_absolute():
self._data_path = self.scan.home / self._data_path
if self._data_path.is_file():
self.add_tag("file")
if file_blobs:
Expand Down Expand Up @@ -1352,18 +1358,22 @@ def sanitize_data(self, data):
self.parsed_url = self.validators.validate_url_parsed(url)
data["url"] = self.parsed_url.geturl()

header_dict = {}
for i in data.get("raw_header", "").splitlines():
if len(i) > 0 and ":" in i:
k, v = i.split(":", 1)
k = k.strip().lower()
v = v.lstrip()
if k in header_dict:
header_dict[k].append(v)
else:
header_dict[k] = [v]
if not "raw_header" in data:
raise ValueError("raw_header is required for HTTP_RESPONSE events")

if "header-dict" not in data:
header_dict = {}
for i in data.get("raw_header", "").splitlines():
if len(i) > 0 and ":" in i:
k, v = i.split(":", 1)
k = k.strip().lower()
v = v.lstrip()
if k in header_dict:
header_dict[k].append(v)
else:
header_dict[k] = [v]
data["header-dict"] = header_dict

data["header-dict"] = header_dict
# move URL to the front of the dictionary for visibility
data = dict(data)
new_data = {"url": data.pop("url")}
Expand All @@ -1377,6 +1387,13 @@ def _words(self):
def _pretty_string(self):
return f'{self.data["hash"]["header_mmh3"]}:{self.data["hash"]["body_mmh3"]}'

@property
def raw_response(self):
"""
Formats the status code, headers, and body into a single string formatted as an HTTP/1.1 response.
"""
return f'{self.data["raw_header"]}{self.data["body"]}'

@property
def http_status(self):
try:
Expand Down Expand Up @@ -1584,6 +1601,27 @@ class RAW_DNS_RECORD(DictHostEvent, DnsEvent):
class MOBILE_APP(DictEvent):
_always_emit = True

def _sanitize_data(self, data):
if isinstance(data, str):
data = {"url": data}
if "url" not in data:
raise ValidationError("url is required for MOBILE_APP events")
url = data["url"]
# parse URL
try:
self.parsed_url = urlparse(url)
except Exception as e:
raise ValidationError(f"Error parsing URL {url}: {e}")
if not "id" in data:
# extract "id" getparam
params = parse_qs(self.parsed_url.query)
try:
_id = params["id"][0]
except Exception:
raise ValidationError("id is required for MOBILE_APP events")
data["id"] = _id
return data

def _pretty_string(self):
return self.data["url"]

Expand Down
100 changes: 70 additions & 30 deletions bbot/core/helpers/depsinstaller/installer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,43 @@


class DepsInstaller:
CORE_DEPS = {
# core BBOT dependencies in the format of binary: package_name
# each one will only be installed if the binary is not found
"unzip": "unzip",
"zipinfo": "unzip",
"curl": "curl",
"git": "git",
"make": "make",
"gcc": "gcc",
"bash": "bash",
"which": "which",
"unrar": "unrar-free",
"tar": "tar",
# debian why are you like this
"7z": [
{
"name": "Install 7zip (Debian)",
"package": {"name": ["p7zip-full"], "state": "present"},
"become": True,
"when": "ansible_facts['os_family'] == 'Debian'",
},
{
"name": "Install 7zip (Non-Debian)",
"package": {"name": ["p7zip"], "state": "present"},
"become": True,
"when": "ansible_facts['os_family'] != 'Debian'",
},
],
}

def __init__(self, parent_helper):
self.parent_helper = parent_helper
self.preset = self.parent_helper.preset
self.core = self.preset.core

self.os_platform = os_platform()

# respect BBOT's http timeout
self.web_config = self.parent_helper.config.get("web", {})
http_timeout = self.web_config.get("http_timeout", 30)
Expand Down Expand Up @@ -202,28 +234,32 @@ def apt_install(self, packages):
"""
Install packages with the OS's default package manager (apt, pacman, dnf, etc.)
"""
packages_str = ",".join(packages)
args, kwargs = self._make_apt_ansible_args(packages)
success, err = self.ansible_run(module="package", args=args, **kwargs)
if success:
log.info(f'Successfully installed OS packages "{",".join(sorted(packages))}"')
else:
log.warning(
f"Failed to install OS packages ({err}). Recommend installing the following packages manually:"
)
for p in packages:
log.warning(f" - {p}")
return success

def _make_apt_ansible_args(self, packages):
packages_str = ",".join(sorted(packages))
log.info(f"Installing the following OS packages: {packages_str}")
args = {"name": packages_str, "state": "present"} # , "update_cache": True, "cache_valid_time": 86400}
kwargs = {}
# don't sudo brew
if os_platform() != "darwin":
if self.os_platform != "darwin":
kwargs = {
"ansible_args": {
"ansible_become": True,
"ansible_become_method": "sudo",
}
}
success, err = self.ansible_run(module="package", args=args, **kwargs)
if success:
log.info(f'Successfully installed OS packages "{packages_str}"')
else:
log.warning(
f"Failed to install OS packages ({err}). Recommend installing the following packages manually:"
)
for p in packages:
log.warning(f" - {p}")
return success
return args, kwargs

def shell(self, module, commands):
tasks = []
Expand Down Expand Up @@ -269,7 +305,7 @@ def ansible_run(self, tasks=None, module=None, args=None, ansible_args=None):
for task in tasks:
if "package" in task:
# special case for macos
if os_platform() == "darwin":
if self.os_platform == "darwin":
# don't sudo brew
task["become"] = False
# brew doesn't support update_cache
Expand All @@ -292,8 +328,8 @@ def ansible_run(self, tasks=None, module=None, args=None, ansible_args=None):
},
module=module,
module_args=module_args,
quiet=not self.ansible_debug,
verbosity=(3 if self.ansible_debug else 0),
quiet=True,
verbosity=0,
cancel_callback=lambda: None,
)

Expand All @@ -303,7 +339,7 @@ def ansible_run(self, tasks=None, module=None, args=None, ansible_args=None):
err = ""
for e in res.events:
if self.ansible_debug and not success:
log.debug(json.dumps(e, indent=4))
log.debug(json.dumps(e, indent=2))
if e["event"] == "runner_on_failed":
err = e["event_data"]["res"]["msg"]
break
Expand Down Expand Up @@ -347,26 +383,30 @@ def ensure_root(self, message=""):

def install_core_deps(self):
to_install = set()
to_install_friendly = set()
playbook = []
self._install_sudo_askpass()
# ensure tldextract data is cached
self.parent_helper.tldextract("evilcorp.co.uk")
# command: package_name
core_deps = {
"unzip": "unzip",
"zipinfo": "unzip",
"curl": "curl",
"git": "git",
"make": "make",
"gcc": "gcc",
"bash": "bash",
"which": "which",
}
for command, package_name in core_deps.items():
for command, package_name_or_playbook in self.CORE_DEPS.items():
if not self.parent_helper.which(command):
to_install.add(package_name)
to_install_friendly.add(command)
if isinstance(package_name_or_playbook, str):
to_install.add(package_name_or_playbook)
else:
playbook.extend(package_name_or_playbook)
if to_install:
playbook.append(
{
"name": "Install Core BBOT Dependencies",
"package": {"name": list(to_install), "state": "present"},
"become": True,
}
)
if playbook:
log.info(f"Installing core BBOT dependencies: {','.join(sorted(to_install_friendly))}")
self.ensure_root()
self.apt_install(list(to_install))
self.ansible_run(tasks=playbook)

def _setup_sudo_cache(self):
if not self._sudo_cache_setup:
Expand Down
8 changes: 1 addition & 7 deletions bbot/core/helpers/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,13 +559,12 @@ def is_port(p):
return p and p.isdigit() and 0 <= int(p) <= 65535


def is_dns_name(d, include_local=True):
def is_dns_name(d):
"""
Determines if the given string is a valid DNS name.
Args:
d (str): The string to be checked.
include_local (bool): Consider local hostnames to be valid (hostnames without periods)
Returns:
bool: True if the string is a valid DNS name, False otherwise.
Expand All @@ -575,17 +574,12 @@ def is_dns_name(d, include_local=True):
True
>>> is_dns_name('localhost')
True
>>> is_dns_name('localhost', include_local=False)
False
>>> is_dns_name('192.168.1.1')
False
"""
if is_ip(d):
return False
d = smart_decode(d)
if include_local:
if bbot_regexes.hostname_regex.match(d):
return True
if bbot_regexes.dns_name_validation_regex.match(d):
return True
return False
Expand Down
Loading

0 comments on commit a9d9218

Please sign in to comment.