Skip to content

Commit

Permalink
better host inheritance
Browse files Browse the repository at this point in the history
  • Loading branch information
github-actions committed Aug 16, 2024
1 parent 3172528 commit e735daa
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 45 deletions.
2 changes: 1 addition & 1 deletion bbot/core/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ async def _infinite_retry(self, callback, *args, **kwargs):
try:
return await asyncio.wait_for(callback(*args, **kwargs), timeout=interval)
except (TimeoutError, asyncio.exceptions.TimeoutError):
self.log.debug(f"{self.name}: Timeout after {interval:,} seconds{context}, retrying...")
self.log.debug(f"{self.name}: Timeout after {interval:,} seconds {context}, retrying...")
retries += 1
if max_retries is not None and retries > max_retries:
raise TimeoutError(f"Timed out after {max_retries*interval:,} seconds {context}")
Expand Down
56 changes: 28 additions & 28 deletions bbot/core/event/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,15 +335,6 @@ def host_original(self):
return self.host
return self._host_original

@property
def closest_host(self):
"""
Walk up the chain of parents events until we hit the first one with a host
"""
if self.host is not None or self.parent is None or self.parent is self:
return self.host
return self.parent.closest_host

@property
def port(self):
self.host
Expand Down Expand Up @@ -602,7 +593,7 @@ def get_parents(self, omit=False, include_self=False):
return parents

def _host(self):
return None
return ""

def _sanitize_data(self, data):
"""
Expand Down Expand Up @@ -954,30 +945,39 @@ def _host(self):


class ClosestHostEvent(DictHostEvent):
# if a host isn't specified, this event type uses the host from the closest parent
# if a host/path/url isn't specified, this event type grabs it from the closest parent
# inherited by FINDING and VULNERABILITY
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if "host" not in self.data:
closest_host = self.closest_host
if closest_host is None:
raise ValueError("No host was found in event parents. Host must be specified!")
self.data["host"] = str(closest_host)
if not self.host:
for parent in self.get_parents(include_self=True):
# inherit closest URL
if not "url" in self.data:
parent_url = getattr(parent, "parsed_url", None)
if parent_url is not None:
self.data["url"] = parent_url.geturl()
# inherit closest path
if not "path" in self.data and isinstance(parent.data, dict):
parent_path = parent.data.get("path", None)
if parent_path is not None:
self.data["path"] = parent_path
# inherit closest host
if parent.host:
self.data["host"] = str(parent.host)
break
# die if we still haven't found a host
if not self.host:
raise ValueError("No host was found in event parents. Host must be specified!")


class DictPathEvent(DictEvent):
_path_keywords = ["path", "filename"]

def sanitize_data(self, data):
new_data = dict(data)
file_blobs = getattr(self.scan, "_file_blobs", False)
folder_blobs = getattr(self.scan, "_folder_blobs", False)
for path_keyword in self._path_keywords:
blob = None
try:
data_path = Path(data[path_keyword])
except KeyError:
continue
blob = None
try:
data_path = Path(data["path"])
if data_path.is_file():
self.add_tag("file")
if file_blobs:
Expand All @@ -987,10 +987,10 @@ def sanitize_data(self, data):
self.add_tag("folder")
if folder_blobs:
blob = self._tar_directory(data_path)
else:
continue
if blob:
new_data["blob"] = base64.b64encode(blob).decode("utf-8")
except KeyError:
pass
if blob:
new_data["blob"] = base64.b64encode(blob).decode("utf-8")

return new_data

Expand Down
2 changes: 1 addition & 1 deletion bbot/modules/gowitness.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ async def handle_batch(self, *events):
url = screenshot["url"]
final_url = screenshot["final_url"]
filename = self.screenshot_path / screenshot["filename"]
webscreenshot_data = {"filename": str(filename), "url": final_url}
webscreenshot_data = {"path": str(filename), "url": final_url}
parent_event = event_dict[url]
await self.emit_event(
webscreenshot_data,
Expand Down
37 changes: 22 additions & 15 deletions bbot/test/test_step_1/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -813,25 +813,32 @@ def test_event_closest_host():
# first event has a host
event1 = scan.make_event("evilcorp.com", "DNS_NAME", parent=scan.root_event)
assert event1.host == "evilcorp.com"
assert event1.closest_host == "evilcorp.com"
# second event has no host
event2 = scan.make_event("wat", "ASDF", parent=event1)
assert event2.host == None
assert event2.closest_host == "evilcorp.com"
# finding automatically uses the host from the first event
finding = scan.make_event({"path": "/tmp/asdf.txt", "description": "test"}, "FINDING", parent=event2)
assert finding.data["host"] == "evilcorp.com"
assert finding.host == "evilcorp.com"
# same with vuln
vuln = scan.make_event(
{"path": "/tmp/asdf.txt", "description": "test", "severity": "HIGH"}, "VULNERABILITY", parent=event2
# second event has a host + url
event2 = scan.make_event(
{"method": "GET", "url": "http://www.evilcorp.com/asdf", "hash": {"header_mmh3": "1", "body_mmh3": "2"}},
"HTTP_RESPONSE",
parent=event1,
)
assert vuln.data["host"] == "evilcorp.com"
assert vuln.host == "evilcorp.com"
assert event2.host == "www.evilcorp.com"
# third event has a path
event3 = scan.make_event({"path": "/tmp/asdf.txt"}, "FILESYSTEM", parent=event2)
assert not event3.host
# finding automatically uses the host from the second event
finding = scan.make_event({"description": "test"}, "FINDING", parent=event3)
assert finding.data["host"] == "www.evilcorp.com"
assert finding.data["url"] == "http://www.evilcorp.com/asdf"
assert finding.data["path"] == "/tmp/asdf.txt"
assert finding.host == "www.evilcorp.com"
# same with vuln
vuln = scan.make_event({"description": "test", "severity": "HIGH"}, "VULNERABILITY", parent=event3)
assert vuln.data["host"] == "www.evilcorp.com"
assert vuln.data["url"] == "http://www.evilcorp.com/asdf"
assert vuln.data["path"] == "/tmp/asdf.txt"
assert vuln.host == "www.evilcorp.com"

# no host == not allowed
event3 = scan.make_event("wat", "ASDF", parent=scan.root_event)
assert event3.host == None
assert not event3.host
with pytest.raises(ValueError):
finding = scan.make_event({"path": "/tmp/asdf.txt", "description": "test"}, "FINDING", parent=event3)
with pytest.raises(ValueError):
Expand Down

0 comments on commit e735daa

Please sign in to comment.