Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better host inheritance #1666

Merged
merged 1 commit into from
Aug 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bbot/core/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ async def _infinite_retry(self, callback, *args, **kwargs):
try:
return await asyncio.wait_for(callback(*args, **kwargs), timeout=interval)
except (TimeoutError, asyncio.exceptions.TimeoutError):
self.log.debug(f"{self.name}: Timeout after {interval:,} seconds{context}, retrying...")
self.log.debug(f"{self.name}: Timeout after {interval:,} seconds {context}, retrying...")
retries += 1
if max_retries is not None and retries > max_retries:
raise TimeoutError(f"Timed out after {max_retries*interval:,} seconds {context}")
Expand Down
56 changes: 28 additions & 28 deletions bbot/core/event/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,15 +335,6 @@ def host_original(self):
return self.host
return self._host_original

@property
def closest_host(self):
"""
Walk up the chain of parents events until we hit the first one with a host
"""
if self.host is not None or self.parent is None or self.parent is self:
return self.host
return self.parent.closest_host

@property
def port(self):
self.host
Expand Down Expand Up @@ -602,7 +593,7 @@ def get_parents(self, omit=False, include_self=False):
return parents

def _host(self):
return None
return ""

def _sanitize_data(self, data):
"""
Expand Down Expand Up @@ -954,30 +945,39 @@ def _host(self):


class ClosestHostEvent(DictHostEvent):
# if a host isn't specified, this event type uses the host from the closest parent
# if a host/path/url isn't specified, this event type grabs it from the closest parent
# inherited by FINDING and VULNERABILITY
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if "host" not in self.data:
closest_host = self.closest_host
if closest_host is None:
raise ValueError("No host was found in event parents. Host must be specified!")
self.data["host"] = str(closest_host)
if not self.host:
for parent in self.get_parents(include_self=True):
# inherit closest URL
if not "url" in self.data:
parent_url = getattr(parent, "parsed_url", None)
if parent_url is not None:
self.data["url"] = parent_url.geturl()
# inherit closest path
if not "path" in self.data and isinstance(parent.data, dict):
parent_path = parent.data.get("path", None)
if parent_path is not None:
self.data["path"] = parent_path
# inherit closest host
if parent.host:
self.data["host"] = str(parent.host)
break
# die if we still haven't found a host
if not self.host:
raise ValueError("No host was found in event parents. Host must be specified!")


class DictPathEvent(DictEvent):
_path_keywords = ["path", "filename"]

def sanitize_data(self, data):
new_data = dict(data)
file_blobs = getattr(self.scan, "_file_blobs", False)
folder_blobs = getattr(self.scan, "_folder_blobs", False)
for path_keyword in self._path_keywords:
blob = None
try:
data_path = Path(data[path_keyword])
except KeyError:
continue
blob = None
try:
data_path = Path(data["path"])
if data_path.is_file():
self.add_tag("file")
if file_blobs:
Expand All @@ -987,10 +987,10 @@ def sanitize_data(self, data):
self.add_tag("folder")
if folder_blobs:
blob = self._tar_directory(data_path)
else:
continue
if blob:
new_data["blob"] = base64.b64encode(blob).decode("utf-8")
except KeyError:
pass
if blob:
new_data["blob"] = base64.b64encode(blob).decode("utf-8")

return new_data

Expand Down
2 changes: 1 addition & 1 deletion bbot/modules/gowitness.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ async def handle_batch(self, *events):
url = screenshot["url"]
final_url = screenshot["final_url"]
filename = self.screenshot_path / screenshot["filename"]
webscreenshot_data = {"filename": str(filename), "url": final_url}
webscreenshot_data = {"path": str(filename), "url": final_url}
parent_event = event_dict[url]
await self.emit_event(
webscreenshot_data,
Expand Down
37 changes: 22 additions & 15 deletions bbot/test/test_step_1/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -813,25 +813,32 @@ def test_event_closest_host():
# first event has a host
event1 = scan.make_event("evilcorp.com", "DNS_NAME", parent=scan.root_event)
assert event1.host == "evilcorp.com"
assert event1.closest_host == "evilcorp.com"
# second event has no host
event2 = scan.make_event("wat", "ASDF", parent=event1)
assert event2.host == None
assert event2.closest_host == "evilcorp.com"
# finding automatically uses the host from the first event
finding = scan.make_event({"path": "/tmp/asdf.txt", "description": "test"}, "FINDING", parent=event2)
assert finding.data["host"] == "evilcorp.com"
assert finding.host == "evilcorp.com"
# same with vuln
vuln = scan.make_event(
{"path": "/tmp/asdf.txt", "description": "test", "severity": "HIGH"}, "VULNERABILITY", parent=event2
# second event has a host + url
event2 = scan.make_event(
{"method": "GET", "url": "http://www.evilcorp.com/asdf", "hash": {"header_mmh3": "1", "body_mmh3": "2"}},
"HTTP_RESPONSE",
parent=event1,
)
assert vuln.data["host"] == "evilcorp.com"
assert vuln.host == "evilcorp.com"
assert event2.host == "www.evilcorp.com"
# third event has a path
event3 = scan.make_event({"path": "/tmp/asdf.txt"}, "FILESYSTEM", parent=event2)
assert not event3.host
# finding automatically uses the host from the second event
finding = scan.make_event({"description": "test"}, "FINDING", parent=event3)
assert finding.data["host"] == "www.evilcorp.com"
assert finding.data["url"] == "http://www.evilcorp.com/asdf"
assert finding.data["path"] == "/tmp/asdf.txt"
assert finding.host == "www.evilcorp.com"
# same with vuln
vuln = scan.make_event({"description": "test", "severity": "HIGH"}, "VULNERABILITY", parent=event3)
assert vuln.data["host"] == "www.evilcorp.com"
assert vuln.data["url"] == "http://www.evilcorp.com/asdf"
assert vuln.data["path"] == "/tmp/asdf.txt"
assert vuln.host == "www.evilcorp.com"

# no host == not allowed
event3 = scan.make_event("wat", "ASDF", parent=scan.root_event)
assert event3.host == None
assert not event3.host
with pytest.raises(ValueError):
finding = scan.make_event({"path": "/tmp/asdf.txt", "description": "test"}, "FINDING", parent=event3)
with pytest.raises(ValueError):
Expand Down
Loading