diff --git a/bbot/core/event/base.py b/bbot/core/event/base.py
index a4da3bae5a..3f9dd73694 100644
--- a/bbot/core/event/base.py
+++ b/bbot/core/event/base.py
@@ -687,7 +687,7 @@ def __hash__(self):
def __str__(self):
max_event_len = 80
d = str(self.data)
- return f'{self.type}("{d[:max_event_len]}{("..." if len(d) > max_event_len else "")}", module={self.module}, tags={self.tags} graph_important={self._graph_important})'
+ return f'{self.type}("{d[:max_event_len]}{("..." if len(d) > max_event_len else "")}", module={self.module}, tags={self.tags})'
def __repr__(self):
return str(self)
diff --git a/bbot/modules/base.py b/bbot/modules/base.py
index 00e5b96ca6..2754322163 100644
--- a/bbot/modules/base.py
+++ b/bbot/modules/base.py
@@ -679,21 +679,26 @@ async def _event_postcheck(self, event):
A simple wrapper for dup tracking and preserving event chains for graph modules
"""
acceptable, reason = await self.__event_postcheck(event)
+ is_graph_important = self._is_graph_important(event, remove=True)
if acceptable:
+ # check duplicates
is_incoming_duplicate = self.is_incoming_duplicate(event, add=True)
if is_incoming_duplicate and not self.accept_dupes:
- if not self._graph_important(event):
+ if not is_graph_important:
return False, f"module has already seen {event}"
+ # queue parents if required by graph
if self._preserve_graph:
s = event
while 1:
- s = s.source
- if s is None or s == self.scan.root_event or s == event:
+ s = s.get_source()
+ if s is None:
+ break
+ if s == self.scan.root_event or s == event:
break
if not self.is_incoming_duplicate(s, add=True):
- self._graph_important_tracker.add(hash(event))
- self.critical(f"queueing {event}")
+ self._graph_important_tracker.add(hash(s))
+ self.debug(f"Queueing {s} as graph-important event")
await self.queue_event(s, precheck=False)
return acceptable, reason
@@ -731,6 +736,8 @@ async def __event_postcheck(self, event):
if not filter_result:
if self._is_graph_important(event):
return True, f"{reason}, but exception was made because it is graph important"
+ else:
+ self.debug(f"{event} is not graph-important")
return filter_result, reason
# custom filtering
@@ -810,7 +817,7 @@ async def queue_event(self, event, precheck=True):
if self.incoming_event_queue is False:
self.debug(f"Not in an acceptable state to queue incoming event")
return
- acceptable, reason = True, "no precheck was performed"
+ acceptable, reason = True, "precheck was skipped"
if precheck:
acceptable, reason = self._event_precheck(event)
if not acceptable:
@@ -904,8 +911,12 @@ def is_incoming_duplicate(self, event, add=False):
return False
return is_dup
- def _is_graph_important(self, event):
- return self._preserve_graph and hash(event) in self._graph_important_tracker
+ def _is_graph_important(self, event, remove=False):
+ ret = self._preserve_graph and hash(event) in self._graph_important_tracker
+ if remove:
+ with suppress(KeyError):
+ self._graph_important_tracker.remove(hash(event))
+ return ret
def _incoming_dedup_hash(self, event):
"""
diff --git a/bbot/modules/httpx.py b/bbot/modules/httpx.py
index b1aab8ee0f..ef77668db0 100644
--- a/bbot/modules/httpx.py
+++ b/bbot/modules/httpx.py
@@ -110,7 +110,6 @@ async def handle_batch(self, *events):
if proxy:
command += ["-http-proxy", proxy]
async for line in self.helpers.run_live(command, input=list(stdin), stderr=subprocess.DEVNULL):
- self.critical(line)
try:
j = json.loads(line)
except json.decoder.JSONDecodeError:
diff --git a/bbot/modules/output/base.py b/bbot/modules/output/base.py
index 8db21645b0..e845624c75 100644
--- a/bbot/modules/output/base.py
+++ b/bbot/modules/output/base.py
@@ -31,7 +31,7 @@ def _event_precheck(self, event):
if event._omit:
return False, "_omit is True"
# force-output certain events to the graph
- if self._preserve_graph and event._graph_important:
+ if self._is_graph_important(event):
return True, "event is critical to the graph"
# internal events like those from speculate, ipneighbor
# or events that are over our report distance
diff --git a/bbot/modules/output/neo4j.py b/bbot/modules/output/neo4j.py
index 477bdd373b..b8194aa100 100644
--- a/bbot/modules/output/neo4j.py
+++ b/bbot/modules/output/neo4j.py
@@ -17,6 +17,7 @@ class neo4j(BaseOutputModule):
}
deps_pip = ["py2neo~=2021.2.3"]
batch_size = 50
+ _preserve_graph = True
async def setup(self):
try:
diff --git a/bbot/scanner/manager.py b/bbot/scanner/manager.py
index 2d2bac754f..21b589c285 100644
--- a/bbot/scanner/manager.py
+++ b/bbot/scanner/manager.py
@@ -236,10 +236,10 @@ async def _emit_event(self, event, **kwargs):
# Scope shepherding
# here is where we make sure in-scope events are set to their proper scope distance
if event.host and event_whitelisted:
- log.critical(f"Making {event} in-scope")
+ log.debug(f"Making {event} in-scope")
event.scope_distance = 0
elif (not event.always_emit) and event.scope_distance > self.scan.scope_report_distance:
- log.critical(
+ log.debug(
f"Making {event} internal because its scope_distance ({event.scope_distance}) > scope_report_distance ({self.scan.scope_report_distance})"
)
event.make_internal()
diff --git a/bbot/test/conftest.py b/bbot/test/conftest.py
index a4c9827057..67e7515ff7 100644
--- a/bbot/test/conftest.py
+++ b/bbot/test/conftest.py
@@ -1,5 +1,5 @@
import ssl
-import shutil # noqa
+import shutil
import pytest
import logging
from pathlib import Path
@@ -18,7 +18,7 @@ def pytest_sessionfinish(session, exitstatus):
logger.removeHandler(handler)
# Wipe out BBOT home dir
- # shutil.rmtree("/tmp/.bbot_test", ignore_errors=True)
+ shutil.rmtree("/tmp/.bbot_test", ignore_errors=True)
yield
diff --git a/bbot/test/test_step_1/test_manager_scope_accuracy.py b/bbot/test/test_step_1/test_manager_scope_accuracy.py
new file mode 100644
index 0000000000..6d55da9f46
--- /dev/null
+++ b/bbot/test/test_step_1/test_manager_scope_accuracy.py
@@ -0,0 +1,761 @@
+from ..bbot_fixtures import * # noqa: F401
+
+from pytest_httpserver import HTTPServer
+
+
+@pytest.fixture
+def bbot_other_httpservers():
+
+ server_hosts = [
+ ("127.0.0.77", 8888),
+ ("127.0.0.88", 8888),
+ ("127.0.0.99", 8888),
+ ("127.0.0.111", 8888),
+ ("127.0.0.222", 8889),
+ ("127.0.0.33", 8889),
+ ]
+
+ servers = [HTTPServer(host=host, port=port) for host, port in server_hosts]
+ for server in servers:
+ server.start()
+
+ yield servers
+
+ for server in servers:
+ server.clear()
+ if server.is_running():
+ server.stop()
+ server.check_assertions()
+ server.clear()
+
+
+
+@pytest.mark.asyncio
+async def test_manager_scope_accuracy(bbot_config, bbot_scanner, bbot_httpserver, bbot_other_httpservers):
+ """
+ This test ensures that BBOT correctly handles different scope distance settings.
+ It performs these tests for normal modules, output modules, and their graph variants,
+ ensuring that when an internal event leads to an interesting discovery, the entire event chain is preserved.
+ This is important for preventing orphans in the graph.
+ """
+
+ from bbot.modules.base import BaseModule
+ from bbot.modules.output.base import BaseOutputModule
+
+ server_77, server_88, server_99, server_111, server_222, server_33 = bbot_other_httpservers
+
+ bbot_httpserver.expect_request(uri="/").respond_with_data(response_data="")
+ server_77.expect_request(uri="/").respond_with_data(response_data="")
+ server_88.expect_request(uri="/").respond_with_data(response_data="")
+ server_99.expect_request(uri="/").respond_with_data(response_data="")
+ server_111.expect_request(uri="/").respond_with_data(response_data="")
+ server_222.expect_request(uri="/").respond_with_data(response_data="")
+ server_33.expect_request(uri="/").respond_with_data(response_data="")
+
+ class DummyModule(BaseModule):
+ _name = "dummy_module"
+ watched_events = ["*"]
+ scope_distance_modifier = 10
+ accept_dupes = True
+
+ async def setup(self):
+ self.events = []
+ return True
+
+ async def handle_event(self, event):
+ self.events.append(event)
+
+ class DummyModuleNoDupes(DummyModule):
+ accept_dupes = False
+
+ class DummyGraphModule(DummyModule):
+ _name = "dummy_graph_module"
+ watched_events = ["*"]
+ scope_distance_modifier = 0
+ accept_dupes = True
+ _preserve_graph = True
+
+ class DummyGraphOutputModule(BaseOutputModule):
+ _name = "dummy_graph_output_module"
+ watched_events = ["*"]
+ _preserve_graph = True
+
+ async def setup(self):
+ self.events = []
+ return True
+
+ async def handle_event(self, event):
+ self.events.append(event)
+
+ class DummyGraphBatchOutputModule(DummyGraphOutputModule):
+ _name = "dummy_graph_batch_output_module"
+ watched_events = ["*"]
+ _preserve_graph = True
+ batch_size = 5
+
+ async def handle_batch(self, *events):
+ for event in events:
+ self.events.append(event)
+
+ async def do_scan(*args, _config={}, _dns_mock={}, scan_callback=None, **kwargs):
+ merged_config = OmegaConf.merge(bbot_config, OmegaConf.create(_config))
+ scan = bbot_scanner(*args, config=merged_config, **kwargs)
+ dummy_module = DummyModule(scan)
+ dummy_module_nodupes = DummyModuleNoDupes(scan)
+ dummy_graph_module = DummyGraphModule(scan)
+ dummy_graph_output_module = DummyGraphOutputModule(scan)
+ dummy_graph_batch_output_module = DummyGraphBatchOutputModule(scan)
+ scan.modules["dummy_module"] = dummy_module
+ scan.modules["dummy_module_nodupes"] = dummy_module_nodupes
+ scan.modules["dummy_graph_module"] = dummy_graph_module
+ scan.modules["dummy_graph_output_module"] = dummy_graph_output_module
+ scan.modules["dummy_graph_batch_output_module"] = dummy_graph_batch_output_module
+ if _dns_mock:
+ scan.helpers.dns.mock_dns(_dns_mock)
+ if scan_callback is not None:
+ scan_callback(scan)
+ return (
+ [e async for e in scan.async_start()],
+ dummy_module.events,
+ dummy_module_nodupes.events,
+ dummy_graph_module.events,
+ dummy_graph_output_module.events,
+ dummy_graph_batch_output_module.events,
+ )
+
+ dns_mock_chain = {
+ ("test.notreal", "A"): "127.0.0.66",
+ ("127.0.0.66", "PTR"): "test.notrealzies",
+ ("test.notrealzies", "CNAME"): "www.test.notreal",
+ ("www.test.notreal", "A"): "127.0.0.77",
+ ("127.0.0.77", "PTR"): "test2.notrealzies",
+ ("test2.notrealzies", "A"): "127.0.0.88",
+ }
+
+ # dns search distance = 1, report distance = 0
+ events, all_events, all_events_nodups, graph_events, graph_output_events, graph_output_batch_events = await do_scan(
+ "test.notreal",
+ _config={"dns_resolution": True, "scope_dns_search_distance": 1, "scope_report_distance": 0},
+ _dns_mock=dns_mock_chain,
+ )
+
+ assert len(events) == 2
+ assert 1 == len([e for e in events if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False])
+ assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.66"])
+ assert 0 == len([e for e in events if e.type == "DNS_NAME" and e.data == "test.notrealzies"])
+ assert 0 == len([e for e in events if e.type == "DNS_NAME" and e.data == "www.test.notreal"])
+ assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77"])
+
+ assert len(all_events) == 3
+ assert 1 == len([e for e in all_events if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False])
+ assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.66" and e._internal == True])
+ assert 0 == len([e for e in all_events if e.type == "DNS_NAME" and e.data == "test.notrealzies"])
+ assert 0 == len([e for e in all_events if e.type == "DNS_NAME" and e.data == "www.test.notreal"])
+ assert 0 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77"])
+
+ assert len(all_events_nodups) == 3
+ assert 1 == len([e for e in all_events_nodups if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False])
+ assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.66" and e._internal == True])
+ assert 0 == len([e for e in all_events_nodups if e.type == "DNS_NAME" and e.data == "test.notrealzies"])
+ assert 0 == len([e for e in all_events_nodups if e.type == "DNS_NAME" and e.data == "www.test.notreal"])
+ assert 0 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.77"])
+
+ assert len(graph_events) == 2
+ assert 1 == len([e for e in graph_events if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False])
+ assert 0 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.66"])
+ assert 0 == len([e for e in graph_events if e.type == "DNS_NAME" and e.data == "test.notrealzies"])
+ assert 0 == len([e for e in graph_events if e.type == "DNS_NAME" and e.data == "www.test.notreal"])
+ assert 0 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77"])
+
+ assert len(graph_output_events) == 2
+ assert 1 == len([e for e in graph_output_events if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False])
+ assert 0 == len([e for e in graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.66"])
+ assert 0 == len([e for e in graph_output_events if e.type == "DNS_NAME" and e.data == "test.notrealzies"])
+ assert 0 == len([e for e in graph_output_events if e.type == "DNS_NAME" and e.data == "www.test.notreal"])
+ assert 0 == len([e for e in graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77"])
+
+ # dns search distance = 2, report distance = 0
+ events, all_events, all_events_nodups, graph_events, graph_output_events, graph_output_batch_events = await do_scan(
+ "test.notreal",
+ _config={"dns_resolution": True, "scope_dns_search_distance": 2, "scope_report_distance": 0},
+ _dns_mock=dns_mock_chain,
+ )
+
+ assert len(events) == 3
+ assert 1 == len([e for e in events if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False])
+ assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.66"])
+ assert 0 == len([e for e in events if e.type == "DNS_NAME" and e.data == "test.notrealzies"])
+ assert 1 == len([e for e in events if e.type == "DNS_NAME" and e.data == "www.test.notreal" and e._internal == False])
+ assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77"])
+ assert 0 == len([e for e in events if e.type == "DNS_NAME" and e.data == "test2.notrealzies"])
+ assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.88"])
+
+ for _all_events in (all_events, all_events_nodups):
+ assert len(_all_events) == 7
+ assert 1 == len([e for e in _all_events if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False])
+ assert 1 == len([e for e in _all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.66" and e._internal == True])
+ assert 1 == len([e for e in _all_events if e.type == "DNS_NAME" and e.data == "test.notrealzies" and e._internal == True])
+ assert 1 == len([e for e in _all_events if e.type == "DNS_NAME" and e.data == "www.test.notreal" and e._internal == False])
+ assert 1 == len([e for e in _all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == True])
+ assert 1 == len([e for e in _all_events if e.type == "DNS_NAME" and e.data == "test2.notrealzies" and e._internal == True])
+ assert 0 == len([e for e in _all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.88"])
+
+ assert len(graph_events) == 5
+ assert 1 == len([e for e in graph_events if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False])
+ assert 1 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.66" and e._internal == True])
+ assert 1 == len([e for e in graph_events if e.type == "DNS_NAME" and e.data == "test.notrealzies" and e._internal == True])
+ assert 1 == len([e for e in graph_events if e.type == "DNS_NAME" and e.data == "www.test.notreal" and e._internal == False])
+ assert 0 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77"])
+ assert 0 == len([e for e in graph_events if e.type == "DNS_NAME" and e.data == "test2.notrealzies"])
+ assert 0 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.88"])
+
+ for _graph_output_events in (graph_output_events, graph_output_batch_events):
+ assert len(_graph_output_events) == 5
+ assert 1 == len([e for e in _graph_output_events if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False])
+ assert 1 == len([e for e in _graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.66" and e._internal == True])
+ assert 1 == len([e for e in _graph_output_events if e.type == "DNS_NAME" and e.data == "test.notrealzies" and e._internal == True])
+ assert 1 == len([e for e in _graph_output_events if e.type == "DNS_NAME" and e.data == "www.test.notreal" and e._internal == False])
+ assert 0 == len([e for e in _graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77"])
+ assert 0 == len([e for e in _graph_output_events if e.type == "DNS_NAME" and e.data == "test2.notrealzies"])
+ assert 0 == len([e for e in _graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.88"])
+
+ # dns search distance = 2, report distance = 1
+ events, all_events, all_events_nodups, graph_events, graph_output_events, graph_output_batch_events = await do_scan(
+ "test.notreal",
+ _config={"dns_resolution": True, "scope_dns_search_distance": 2, "scope_report_distance": 1},
+ _dns_mock=dns_mock_chain,
+ )
+
+ assert len(events) == 5
+ assert 1 == len([e for e in events if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False])
+ assert 1 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.66" and e._internal == False])
+ assert 0 == len([e for e in events if e.type == "DNS_NAME" and e.data == "test.notrealzies"])
+ assert 1 == len([e for e in events if e.type == "DNS_NAME" and e.data == "www.test.notreal" and e._internal == False])
+ assert 1 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == False])
+ assert 0 == len([e for e in events if e.type == "DNS_NAME" and e.data == "test2.notrealzies"])
+ assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.88"])
+
+ assert len(all_events) == 7
+ assert 1 == len([e for e in all_events if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False])
+ assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.66" and e._internal == False])
+ assert 1 == len([e for e in all_events if e.type == "DNS_NAME" and e.data == "test.notrealzies" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "DNS_NAME" and e.data == "www.test.notreal" and e._internal == False])
+ assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == False])
+ assert 1 == len([e for e in all_events if e.type == "DNS_NAME" and e.data == "test2.notrealzies" and e._internal == True])
+ assert 0 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.88"])
+
+ assert len(all_events_nodups) == 7
+ assert 1 == len([e for e in all_events_nodups if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False])
+ assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.66" and e._internal == False])
+ assert 1 == len([e for e in all_events_nodups if e.type == "DNS_NAME" and e.data == "test.notrealzies" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "DNS_NAME" and e.data == "www.test.notreal" and e._internal == False])
+ assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == False])
+ assert 1 == len([e for e in all_events_nodups if e.type == "DNS_NAME" and e.data == "test2.notrealzies" and e._internal == True])
+ assert 0 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.88"])
+
+ assert len(graph_events) == 5
+ assert 1 == len([e for e in graph_events if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False])
+ assert 1 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.66" and e._internal == False])
+ assert 1 == len([e for e in graph_events if e.type == "DNS_NAME" and e.data == "test.notrealzies" and e._internal == True])
+ assert 1 == len([e for e in graph_events if e.type == "DNS_NAME" and e.data == "www.test.notreal" and e._internal == False])
+ assert 0 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77"])
+ assert 0 == len([e for e in graph_events if e.type == "DNS_NAME" and e.data == "test2.notrealzies"])
+ assert 0 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.88"])
+
+ for _graph_output_events in (graph_output_events, graph_output_batch_events):
+ assert len(_graph_output_events) == 6
+ assert 1 == len([e for e in _graph_output_events if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False])
+ assert 1 == len([e for e in _graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.66" and e._internal == False])
+ assert 1 == len([e for e in _graph_output_events if e.type == "DNS_NAME" and e.data == "test.notrealzies" and e._internal == True])
+ assert 1 == len([e for e in _graph_output_events if e.type == "DNS_NAME" and e.data == "www.test.notreal" and e._internal == False])
+ assert 1 == len([e for e in _graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == False])
+ assert 0 == len([e for e in _graph_output_events if e.type == "DNS_NAME" and e.data == "test2.notrealzies"])
+ assert 0 == len([e for e in _graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.88"])
+
+ dns_mock_chain = {
+ ("test.notreal", "A"): "127.0.0.66",
+ ("127.0.0.66", "PTR"): "test.notrealzies",
+ ("test.notrealzies", "A"): "127.0.0.77",
+ }
+
+ class DummyVulnModule(BaseModule):
+ _name = "dummyvulnmodule"
+ watched_events = ["IP_ADDRESS"]
+ scope_distance_modifier = 3
+ accept_dupes = True
+
+ async def filter_event(self, event):
+ if event.data == "127.0.0.77":
+ return True
+ return False, "bleh"
+
+ async def handle_event(self, event):
+ self.emit_event(
+ {"host": str(event.host), "description": "yep", "severity": "CRITICAL"}, "VULNERABILITY", source=event
+ )
+
+ def custom_setup(scan):
+ dummyvulnmodule = DummyVulnModule(scan)
+ scan.modules["dummyvulnmodule"] = dummyvulnmodule
+
+ # dns search distance = 3, report distance = 1
+ events, all_events, all_events_nodups, graph_events, graph_output_events, graph_output_batch_events = await do_scan(
+ "test.notreal",
+ scan_callback=custom_setup,
+ _config={"dns_resolution": True, "scope_dns_search_distance": 3, "scope_report_distance": 1},
+ _dns_mock=dns_mock_chain,
+ )
+
+ assert len(events) == 4
+ assert 1 == len([e for e in events if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False])
+ assert 1 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.66" and e._internal == False])
+ assert 0 == len([e for e in events if e.type == "DNS_NAME" and e.data == "test.notrealzies"])
+ assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77"])
+ assert 1 == len([e for e in events if e.type == "VULNERABILITY" and e.data["host"] == "127.0.0.77" and e._internal == False])
+
+ assert len(all_events) == 6
+ assert 1 == len([e for e in all_events if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False])
+ assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.66" and e._internal == False])
+ assert 1 == len([e for e in all_events if e.type == "DNS_NAME" and e.data == "test.notrealzies" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "VULNERABILITY" and e.data["host"] == "127.0.0.77" and e._internal == False])
+
+ assert len(all_events_nodups) == 6
+ assert 1 == len([e for e in all_events_nodups if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False])
+ assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.66" and e._internal == False])
+ assert 1 == len([e for e in all_events_nodups if e.type == "DNS_NAME" and e.data == "test.notrealzies" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "VULNERABILITY" and e.data["host"] == "127.0.0.77" and e._internal == False])
+
+ assert len(graph_events) == 2
+ assert 1 == len([e for e in graph_events if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False])
+ assert 0 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.66" and e._internal == False])
+ assert 0 == len([e for e in graph_events if e.type == "DNS_NAME" and e.data == "test.notrealzies" and e._internal == True])
+ assert 0 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == True])
+ assert 0 == len([e for e in graph_events if e.type == "VULNERABILITY" and e.data["host"] == "127.0.0.77"])
+
+ for _graph_output_events in (graph_output_events, graph_output_batch_events):
+ assert len(_graph_output_events) == 6
+ assert 1 == len([e for e in _graph_output_events if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False])
+ assert 1 == len([e for e in _graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.66" and e._internal == False])
+ assert 1 == len([e for e in _graph_output_events if e.type == "DNS_NAME" and e.data == "test.notrealzies" and e._internal == True])
+ assert 1 == len([e for e in _graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == True])
+ assert 1 == len([e for e in _graph_output_events if e.type == "VULNERABILITY" and e.data["host"] == "127.0.0.77" and e._internal == False])
+
+ # httpx/speculate IP_RANGE --> IP_ADDRESS --> OPEN_TCP_PORT --> URL, search distance = 0
+ events, all_events, all_events_nodups, graph_events, graph_output_events, graph_output_batch_events = await do_scan(
+ "127.0.0.1/31",
+ modules=["httpx", "excavate"],
+ _config={
+ "scope_search_distance": 0,
+ "scope_dns_search_distance": 2,
+ "scope_report_distance": 1,
+ "speculate": True,
+ "internal_modules": {"speculate": {"ports": "8888"}},
+ "omit_event_types": ["HTTP_RESPONSE", "URL_UNVERIFIED"],
+ },
+ )
+
+ assert len(events) == 3
+ assert 1 == len([e for e in events if e.type == "IP_RANGE" and e.data == "127.0.0.0/31" and e._internal == False])
+ assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.0"])
+ assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.1"])
+ assert 0 == len([e for e in events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.0:8888"])
+ assert 0 == len([e for e in events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.1:8888"])
+ assert 1 == len([e for e in events if e.type == "URL" and e.data == "http://127.0.0.1:8888/" and e._internal == False])
+ assert 0 == len([e for e in events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.1:8888"])
+ assert 0 == len([e for e in events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.77:8888/"])
+ assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77"])
+ assert 0 == len([e for e in events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.77:8888"])
+
+ assert len(all_events) == 11
+ assert 1 == len([e for e in all_events if e.type == "IP_RANGE" and e.data == "127.0.0.0/31" and e._internal == False])
+ assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.0" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.1" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.0:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.1:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "URL" and e.data == "http://127.0.0.1:8888/" and e._internal == False])
+ assert 1 == len([e for e in all_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.1:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.77:8888/" and e._internal == False])
+ assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.77:8888" and e._internal == True])
+
+ assert len(all_events_nodups) == 11
+ assert 1 == len([e for e in all_events_nodups if e.type == "IP_RANGE" and e.data == "127.0.0.0/31" and e._internal == False])
+ assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.0" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.1" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.0:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.1:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "URL" and e.data == "http://127.0.0.1:8888/" and e._internal == False])
+ assert 1 == len([e for e in all_events_nodups if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.1:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.77:8888/" and e._internal == False])
+ assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.77:8888" and e._internal == True])
+
+ assert len(graph_events) == 8
+ assert 1 == len([e for e in graph_events if e.type == "IP_RANGE" and e.data == "127.0.0.0/31" and e._internal == False])
+ assert 1 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.0" and e._internal == True])
+ assert 1 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.1" and e._internal == True])
+ assert 1 == len([e for e in graph_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.0:8888" and e._internal == True])
+ assert 1 == len([e for e in graph_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.1:8888" and e._internal == True])
+ assert 1 == len([e for e in graph_events if e.type == "URL" and e.data == "http://127.0.0.1:8888/" and e._internal == False])
+ assert 1 == len([e for e in graph_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.1:8888" and e._internal == True])
+ assert 0 == len([e for e in graph_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.77:8888/" and e._internal == False])
+ assert 0 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == True])
+ assert 0 == len([e for e in graph_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.77:8888" and e._internal == True])
+
+ for _graph_output_events in (graph_output_events, graph_output_batch_events):
+ assert len(_graph_output_events) == 5
+ assert 1 == len([e for e in _graph_output_events if e.type == "IP_RANGE" and e.data == "127.0.0.0/31" and e._internal == False])
+ assert 0 == len([e for e in _graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.0" and e._internal == True])
+ assert 1 == len([e for e in _graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.1" and e._internal == True])
+ assert 0 == len([e for e in _graph_output_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.0:8888" and e._internal == True])
+ assert 1 == len([e for e in _graph_output_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.1:8888" and e._internal == True])
+ assert 1 == len([e for e in _graph_output_events if e.type == "URL" and e.data == "http://127.0.0.1:8888/" and e._internal == False])
+ assert 0 == len([e for e in _graph_output_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.1:8888" and e._internal == True])
+ assert 0 == len([e for e in _graph_output_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.77:8888/" and e._internal == False])
+ assert 0 == len([e for e in _graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == True])
+ assert 0 == len([e for e in _graph_output_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.77:8888" and e._internal == True])
+
+ # httpx/speculate IP_RANGE --> IP_ADDRESS --> OPEN_TCP_PORT --> URL, search distance = 0, in_scope_only = False
+ events, all_events, all_events_nodups, graph_events, graph_output_events, graph_output_batch_events = await do_scan(
+ "127.0.0.1/31",
+ modules=["httpx", "excavate"],
+ _config={
+ "scope_search_distance": 0,
+ "scope_dns_search_distance": 2,
+ "scope_report_distance": 1,
+ "speculate": True,
+ "modules": {"httpx": {"in_scope_only": False}},
+ "internal_modules": {"speculate": {"ports": "8888"}},
+ "omit_event_types": ["HTTP_RESPONSE", "URL_UNVERIFIED"],
+ },
+ )
+
+ assert len(events) == 4
+ assert 1 == len([e for e in events if e.type == "IP_RANGE" and e.data == "127.0.0.0/31" and e._internal == False])
+ assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.0"])
+ assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.1"])
+ assert 0 == len([e for e in events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.0:8888"])
+ assert 0 == len([e for e in events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.1:8888"])
+ assert 1 == len([e for e in events if e.type == "URL" and e.data == "http://127.0.0.1:8888/" and e._internal == False])
+ assert 0 == len([e for e in events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.1:8888"])
+ assert 0 == len([e for e in events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.77:8888/"])
+ assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77"])
+ assert 0 == len([e for e in events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.77:8888"])
+ assert 1 == len([e for e in events if e.type == "URL" and e.data == "http://127.0.0.77:8888/" and e._internal == False])
+ assert 0 == len([e for e in events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.77:8888"])
+ assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.88"])
+ assert 0 == len([e for e in events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.77:8888/"])
+
+ assert len(all_events) == 15
+ assert 1 == len([e for e in all_events if e.type == "IP_RANGE" and e.data == "127.0.0.0/31" and e._internal == False])
+ assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.0" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.1" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.0:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.1:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "URL" and e.data == "http://127.0.0.1:8888/" and e._internal == False])
+ assert 1 == len([e for e in all_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.1:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.77:8888/" and e._internal == False])
+ assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.77:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "URL" and e.data == "http://127.0.0.77:8888/" and e._internal == False])
+ assert 1 == len([e for e in all_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.77:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.88" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.88:8888/" and e._internal == True])
+
+ assert len(all_events_nodups) == 15
+ assert 1 == len([e for e in all_events_nodups if e.type == "IP_RANGE" and e.data == "127.0.0.0/31" and e._internal == False])
+ assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.0" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.1" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.0:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.1:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "URL" and e.data == "http://127.0.0.1:8888/" and e._internal == False])
+ assert 1 == len([e for e in all_events_nodups if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.1:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.77:8888/" and e._internal == False])
+ assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.77:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "URL" and e.data == "http://127.0.0.77:8888/" and e._internal == False])
+ assert 1 == len([e for e in all_events_nodups if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.77:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.88" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.88:8888/" and e._internal == True])
+
+ assert len(graph_events) == 8
+ assert 1 == len([e for e in graph_events if e.type == "IP_RANGE" and e.data == "127.0.0.0/31" and e._internal == False])
+ assert 1 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.0" and e._internal == True])
+ assert 1 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.1" and e._internal == True])
+ assert 1 == len([e for e in graph_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.0:8888" and e._internal == True])
+ assert 1 == len([e for e in graph_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.1:8888" and e._internal == True])
+ assert 1 == len([e for e in graph_events if e.type == "URL" and e.data == "http://127.0.0.1:8888/" and e._internal == False])
+ assert 1 == len([e for e in graph_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.1:8888" and e._internal == True])
+ assert 0 == len([e for e in graph_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.77:8888/" and e._internal == False])
+ assert 0 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == True])
+ assert 0 == len([e for e in graph_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.77:8888" and e._internal == True])
+ assert 0 == len([e for e in graph_events if e.type == "URL" and e.data == "http://127.0.0.77:8888/" and e._internal == False])
+ assert 0 == len([e for e in graph_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.77:8888" and e._internal == True])
+ assert 0 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.88" and e._internal == True])
+ assert 0 == len([e for e in graph_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.88:8888/" and e._internal == True])
+
+ for _graph_output_events in (graph_output_events, graph_output_batch_events):
+ assert len(_graph_output_events) == 8
+ assert 1 == len([e for e in _graph_output_events if e.type == "IP_RANGE" and e.data == "127.0.0.0/31" and e._internal == False])
+ assert 0 == len([e for e in _graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.0" and e._internal == True])
+ assert 1 == len([e for e in _graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.1" and e._internal == True])
+ assert 0 == len([e for e in _graph_output_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.0:8888" and e._internal == True])
+ assert 1 == len([e for e in _graph_output_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.1:8888" and e._internal == True])
+ assert 1 == len([e for e in _graph_output_events if e.type == "URL" and e.data == "http://127.0.0.1:8888/" and e._internal == False])
+ assert 0 == len([e for e in _graph_output_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.1:8888" and e._internal == True])
+ assert 0 == len([e for e in _graph_output_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.77:8888/" and e._internal == False])
+ assert 1 == len([e for e in _graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == True])
+ assert 1 == len([e for e in _graph_output_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.77:8888" and e._internal == True])
+ assert 1 == len([e for e in _graph_output_events if e.type == "URL" and e.data == "http://127.0.0.77:8888/" and e._internal == False])
+ assert 0 == len([e for e in _graph_output_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.77:8888" and e._internal == True])
+ assert 0 == len([e for e in _graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.88" and e._internal == True])
+ assert 0 == len([e for e in _graph_output_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.88:8888/" and e._internal == True])
+
+ # httpx/speculate IP_RANGE --> IP_ADDRESS --> OPEN_TCP_PORT --> URL, search distance = 1
+ events, all_events, all_events_nodups, graph_events, graph_output_events, graph_output_batch_events = await do_scan(
+ "127.0.0.1/31",
+ modules=["httpx", "excavate"],
+ _config={
+ "scope_search_distance": 1,
+ "scope_dns_search_distance": 2,
+ "scope_report_distance": 1,
+ "speculate": True,
+ "modules": {"httpx": {"in_scope_only": False}},
+ "internal_modules": {"speculate": {"ports": "8888"}},
+ "omit_event_types": ["HTTP_RESPONSE", "URL_UNVERIFIED"],
+ },
+ )
+
+ assert len(events) == 4
+ assert 1 == len([e for e in events if e.type == "IP_RANGE" and e.data == "127.0.0.0/31" and e._internal == False])
+ assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.0"])
+ assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.1"])
+ assert 0 == len([e for e in events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.0:8888"])
+ assert 0 == len([e for e in events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.1:8888"])
+ assert 1 == len([e for e in events if e.type == "URL" and e.data == "http://127.0.0.1:8888/" and e._internal == False])
+ assert 0 == len([e for e in events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.1:8888"])
+ assert 0 == len([e for e in events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.77:8888/"])
+ assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77"])
+ assert 0 == len([e for e in events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.77:8888"])
+ assert 1 == len([e for e in events if e.type == "URL" and e.data == "http://127.0.0.77:8888/" and e._internal == False])
+ assert 0 == len([e for e in events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.77:8888"])
+ assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.88"])
+ assert 0 == len([e for e in events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.77:8888/"])
+
+ assert len(all_events) == 20
+ assert 1 == len([e for e in all_events if e.type == "IP_RANGE" and e.data == "127.0.0.0/31" and e._internal == False])
+ assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.0" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.1" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.0:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.1:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "URL" and e.data == "http://127.0.0.1:8888/" and e._internal == False])
+ assert 1 == len([e for e in all_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.1:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.77:8888/" and e._internal == False])
+ assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.77:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "URL" and e.data == "http://127.0.0.77:8888/" and e._internal == False])
+ assert 1 == len([e for e in all_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.77:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.88:8888/" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.88" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.88" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.88:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "URL" and e.data == "http://127.0.0.88:8888/" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.88:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.99:8888/" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.99" and e._internal == True])
+
+ assert len(all_events_nodups) == 20
+ assert 1 == len([e for e in all_events_nodups if e.type == "IP_RANGE" and e.data == "127.0.0.0/31" and e._internal == False])
+ assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.0" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.1" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.0:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.1:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "URL" and e.data == "http://127.0.0.1:8888/" and e._internal == False])
+ assert 1 == len([e for e in all_events_nodups if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.1:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.77:8888/" and e._internal == False])
+ assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.77:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "URL" and e.data == "http://127.0.0.77:8888/" and e._internal == False])
+ assert 1 == len([e for e in all_events_nodups if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.77:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.88:8888/" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.88" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.88" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.88:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "URL" and e.data == "http://127.0.0.88:8888/" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.88:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.99:8888/" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.99" and e._internal == True])
+
+ assert len(graph_events) == 13
+ assert 1 == len([e for e in graph_events if e.type == "IP_RANGE" and e.data == "127.0.0.0/31" and e._internal == False])
+ assert 1 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.0" and e._internal == True])
+ assert 1 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.1" and e._internal == True])
+ assert 1 == len([e for e in graph_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.0:8888" and e._internal == True])
+ assert 1 == len([e for e in graph_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.1:8888" and e._internal == True])
+ assert 1 == len([e for e in graph_events if e.type == "URL" and e.data == "http://127.0.0.1:8888/" and e._internal == False])
+ assert 1 == len([e for e in graph_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.1:8888" and e._internal == True])
+ assert 1 == len([e for e in graph_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.77:8888/" and e._internal == False])
+ assert 1 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == True])
+ assert 1 == len([e for e in graph_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.77:8888" and e._internal == True])
+ assert 1 == len([e for e in graph_events if e.type == "URL" and e.data == "http://127.0.0.77:8888/" and e._internal == False])
+ assert 1 == len([e for e in graph_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.77:8888" and e._internal == True])
+ assert 0 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.88" and e._internal == True])
+ assert 0 == len([e for e in graph_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.88:8888/" and e._internal == True])
+
+ for _graph_output_events in (graph_output_events, graph_output_batch_events):
+ assert len(_graph_output_events) == 8
+ assert 1 == len([e for e in _graph_output_events if e.type == "IP_RANGE" and e.data == "127.0.0.0/31" and e._internal == False])
+ assert 0 == len([e for e in _graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.0" and e._internal == True])
+ assert 1 == len([e for e in _graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.1" and e._internal == True])
+ assert 0 == len([e for e in _graph_output_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.0:8888" and e._internal == True])
+ assert 1 == len([e for e in _graph_output_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.1:8888" and e._internal == True])
+ assert 1 == len([e for e in _graph_output_events if e.type == "URL" and e.data == "http://127.0.0.1:8888/" and e._internal == False])
+ assert 0 == len([e for e in _graph_output_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.1:8888" and e._internal == True])
+ assert 0 == len([e for e in _graph_output_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.77:8888/" and e._internal == False])
+ assert 1 == len([e for e in _graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == True])
+ assert 1 == len([e for e in _graph_output_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.77:8888" and e._internal == True])
+ assert 1 == len([e for e in _graph_output_events if e.type == "URL" and e.data == "http://127.0.0.77:8888/" and e._internal == False])
+ assert 0 == len([e for e in _graph_output_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.77:8888" and e._internal == True])
+ assert 0 == len([e for e in _graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.88" and e._internal == True])
+ assert 0 == len([e for e in _graph_output_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.88:8888/" and e._internal == True])
+
+ # 2 events from a single HTTP_RESPONSE
+ events, all_events, all_events_nodups, graph_events, graph_output_events, graph_output_batch_events = await do_scan(
+ "127.0.0.111/31",
+ whitelist=["127.0.0.111/31", "127.0.0.222", "127.0.0.33"],
+ modules=["httpx", "excavate"],
+ output_modules=["python", "neo4j"],
+ _config={
+ "scope_search_distance": 0,
+ "scope_dns_search_distance": 2,
+ "scope_report_distance": 0,
+ "speculate": True,
+ "output_modules": {"neo4j": {"uri": "bolt://localhost:7687"}},
+ "internal_modules": {"speculate": {"ports": "8888"}},
+ "omit_event_types": ["HTTP_RESPONSE", "URL_UNVERIFIED"],
+ },
+ )
+
+ assert len(events) == 5
+ assert 1 == len([e for e in events if e.type == "IP_RANGE" and e.data == "127.0.0.110/31" and e._internal == False])
+ assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.110"])
+ assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.111"])
+ assert 0 == len([e for e in events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.110:8888"])
+ assert 0 == len([e for e in events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.111:8888"])
+ assert 1 == len([e for e in events if e.type == "URL" and e.data == "http://127.0.0.111:8888/" and e._internal == False])
+ assert 0 == len([e for e in events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.111:8888"])
+ assert 0 == len([e for e in events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.44:8888/"])
+ assert 0 == len([e for e in events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.55:8888/"])
+ assert 1 == len([e for e in events if e.type == "URL" and e.data == "http://127.0.0.222:8889/" and e._internal == False])
+ assert 1 == len([e for e in events if e.type == "URL" and e.data == "http://127.0.0.33:8889/" and e._internal == False])
+
+ assert len(all_events) == 26
+ assert 1 == len([e for e in all_events if e.type == "IP_RANGE" and e.data == "127.0.0.110/31" and e._internal == False])
+ assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.110" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.111" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.110:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.111:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "URL" and e.data == "http://127.0.0.111:8888/" and e._internal == False])
+ assert 1 == len([e for e in all_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.111:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.222:8889/" and e._internal == False])
+ assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.222" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.33:8889/" and e._internal == False])
+ assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.33" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.222:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.222:8889" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.33:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.33:8889" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "URL" and e.data == "http://127.0.0.222:8889/" and e._internal == False])
+ assert 1 == len([e for e in all_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.222:8889" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "URL" and e.data == "http://127.0.0.33:8889/" and e._internal == False])
+ assert 1 == len([e for e in all_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.33:8889" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.44:8888/" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.44" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.55:8888/" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.55" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.44:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.55:8888" and e._internal == True])
+
+ assert len(all_events_nodups) == 26
+ assert 1 == len([e for e in all_events_nodups if e.type == "IP_RANGE" and e.data == "127.0.0.110/31" and e._internal == False])
+ assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.110" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.111" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.110:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.111:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "URL" and e.data == "http://127.0.0.111:8888/" and e._internal == False])
+ assert 1 == len([e for e in all_events_nodups if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.111:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.222:8889/" and e._internal == False])
+ assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.222" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.33:8889/" and e._internal == False])
+ assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.33" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.222:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.222:8889" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.33:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.33:8889" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "URL" and e.data == "http://127.0.0.222:8889/" and e._internal == False])
+ assert 1 == len([e for e in all_events_nodups if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.222:8889" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "URL" and e.data == "http://127.0.0.33:8889/" and e._internal == False])
+ assert 1 == len([e for e in all_events_nodups if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.33:8889" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.44:8888/" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.44" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.55:8888/" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.55" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.44:8888" and e._internal == True])
+ assert 1 == len([e for e in all_events_nodups if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.55:8888" and e._internal == True])
+
+ assert len(graph_events) == 20
+ assert 1 == len([e for e in graph_events if e.type == "IP_RANGE" and e.data == "127.0.0.110/31" and e._internal == False])
+ assert 1 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.110" and e._internal == True])
+ assert 1 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.111" and e._internal == True])
+ assert 1 == len([e for e in graph_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.110:8888" and e._internal == True])
+ assert 1 == len([e for e in graph_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.111:8888" and e._internal == True])
+ assert 1 == len([e for e in graph_events if e.type == "URL" and e.data == "http://127.0.0.111:8888/" and e._internal == False])
+ assert 1 == len([e for e in graph_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.111:8888" and e._internal == True])
+ assert 1 == len([e for e in graph_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.222:8889/" and e._internal == False])
+ assert 1 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.222" and e._internal == True])
+ assert 1 == len([e for e in graph_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.33:8889/" and e._internal == False])
+ assert 1 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.33" and e._internal == True])
+ assert 1 == len([e for e in graph_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.222:8888" and e._internal == True])
+ assert 1 == len([e for e in graph_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.222:8889" and e._internal == True])
+ assert 1 == len([e for e in graph_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.33:8888" and e._internal == True])
+ assert 1 == len([e for e in graph_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.33:8889" and e._internal == True])
+ assert 1 == len([e for e in graph_events if e.type == "URL" and e.data == "http://127.0.0.222:8889/" and e._internal == False])
+ assert 1 == len([e for e in graph_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.222:8889" and e._internal == True])
+ assert 1 == len([e for e in graph_events if e.type == "URL" and e.data == "http://127.0.0.33:8889/" and e._internal == False])
+ assert 1 == len([e for e in graph_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.33:8889" and e._internal == True])
+ assert 0 == len([e for e in graph_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.44:8888/" and e._internal == True])
+ assert 0 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.44" and e._internal == True])
+ assert 0 == len([e for e in graph_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.55:8888/" and e._internal == True])
+ assert 0 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.55" and e._internal == True])
+ assert 0 == len([e for e in graph_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.44:8888" and e._internal == True])
+ assert 0 == len([e for e in graph_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.55:8888" and e._internal == True])
+
+ for _graph_output_events in (graph_output_events, graph_output_batch_events):
+ assert len(_graph_output_events) == 9
+ assert 1 == len([e for e in _graph_output_events if e.type == "IP_RANGE" and e.data == "127.0.0.110/31" and e._internal == False])
+ assert 0 == len([e for e in _graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.110" and e._internal == True])
+ assert 1 == len([e for e in _graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.111" and e._internal == True])
+ assert 0 == len([e for e in _graph_output_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.110:8888" and e._internal == True])
+ assert 1 == len([e for e in _graph_output_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.111:8888" and e._internal == True])
+ assert 1 == len([e for e in _graph_output_events if e.type == "URL" and e.data == "http://127.0.0.111:8888/" and e._internal == False])
+ assert 0 == len([e for e in _graph_output_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.111:8888" and e._internal == True])
+ assert 0 == len([e for e in _graph_output_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.222:8889/" and e._internal == False])
+ assert 0 == len([e for e in _graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.222" and e._internal == True])
+ assert 0 == len([e for e in _graph_output_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.33:8889/" and e._internal == False])
+ assert 0 == len([e for e in _graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.33" and e._internal == True])
+ assert 0 == len([e for e in _graph_output_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.222:8888" and e._internal == True])
+ assert 1 == len([e for e in _graph_output_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.222:8889" and e._internal == True])
+ assert 0 == len([e for e in _graph_output_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.33:8888" and e._internal == True])
+ assert 1 == len([e for e in _graph_output_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.33:8889" and e._internal == True])
+ assert 1 == len([e for e in _graph_output_events if e.type == "URL" and e.data == "http://127.0.0.222:8889/" and e._internal == False])
+ assert 0 == len([e for e in _graph_output_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.222:8889" and e._internal == True])
+ assert 1 == len([e for e in _graph_output_events if e.type == "URL" and e.data == "http://127.0.0.33:8889/" and e._internal == False])
+ assert 0 == len([e for e in _graph_output_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.33:8889" and e._internal == True])
+ assert 0 == len([e for e in _graph_output_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.44:8888/" and e._internal == True])
+ assert 0 == len([e for e in _graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.44" and e._internal == True])
+ assert 0 == len([e for e in _graph_output_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.55:8888/" and e._internal == True])
+ assert 0 == len([e for e in _graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.55" and e._internal == True])
+ assert 0 == len([e for e in _graph_output_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.44:8888" and e._internal == True])
+ assert 0 == len([e for e in _graph_output_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.55:8888" and e._internal == True])
diff --git a/bbot/test/test_step_1/test_manager_scope_shepherding.py b/bbot/test/test_step_1/test_manager_scope_shepherding.py
deleted file mode 100644
index a3fb7df414..0000000000
--- a/bbot/test/test_step_1/test_manager_scope_shepherding.py
+++ /dev/null
@@ -1,687 +0,0 @@
-from ..bbot_fixtures import * # noqa: F401
-
-from pytest_httpserver import HTTPServer
-
-
-@pytest.fixture
-def bbot_other_httpserver():
- server = HTTPServer(host="127.0.0.77", port=8888)
- server.start()
-
- yield server
-
- server.clear()
- if server.is_running():
- server.stop()
-
- server.check_assertions()
- server.clear()
-
-
-@pytest.fixture
-def bbot_other_httpserver2():
- server = HTTPServer(host="127.0.0.88", port=8888)
- server.start()
-
- yield server
-
- server.clear()
- if server.is_running():
- server.stop()
-
- server.check_assertions()
- server.clear()
-
-
-@pytest.fixture
-def bbot_other_httpserver3():
- server = HTTPServer(host="127.0.0.111", port=8888)
- server.start()
-
- yield server
-
- server.clear()
- if server.is_running():
- server.stop()
-
- server.check_assertions()
- server.clear()
-
-
-@pytest.fixture
-def bbot_other_httpserver4():
- server = HTTPServer(host="127.0.0.222", port=8888)
- server.start()
-
- yield server
-
- server.clear()
- if server.is_running():
- server.stop()
-
- server.check_assertions()
- server.clear()
-
-
-
-
-@pytest.mark.asyncio
-async def test_manager_scope_shepherding(bbot_config, bbot_scanner, bbot_httpserver, bbot_other_httpserver, bbot_other_httpserver2, bbot_other_httpserver3, bbot_other_httpserver4):
- from bbot.modules.base import BaseModule
- from bbot.modules.output.base import BaseOutputModule
-
- class DummyModule(BaseModule):
- _name = "dummymodule"
- watched_events = ["*"]
- scope_distance_modifier = 10
- accept_dupes = True
-
- async def setup(self):
- self.events = []
- return True
-
- async def handle_event(self, event):
- self.events.append(event)
-
- class DummyModuleNoDupes(DummyModule):
- accept_dupes = False
-
- class DummyGraphModule(DummyModule):
- _name = "dummygraphmodule"
- watched_events = ["*"]
- scope_distance_modifier = 0
- accept_dupes = True
- _preserve_graph = True
-
- class DummyGraphOutputModule(BaseOutputModule):
- _name = "dummygraphoutputmodule"
- watched_events = ["*"]
- _preserve_graph = True
-
- async def setup(self):
- self.events = []
- return True
-
- async def handle_event(self, event):
- self.events.append(event)
-
- async def do_scan(*args, _config={}, _dns_mock={}, scan_callback=None, **kwargs):
- merged_config = OmegaConf.merge(bbot_config, OmegaConf.create(_config))
- scan = bbot_scanner(*args, config=merged_config, **kwargs)
- dummymodule = DummyModule(scan)
- dummymodulenodupes = DummyModuleNoDupes(scan)
- dummygraphmodule = DummyGraphModule(scan)
- dummygraphoutputmodule = DummyGraphOutputModule(scan)
- scan.modules["dummymodule"] = dummymodule
- scan.modules["dummymodulenodupes"] = dummymodulenodupes
- scan.modules["dummygraphmodule"] = dummygraphmodule
- scan.modules["dummygraphoutputmodule"] = dummygraphoutputmodule
- if _dns_mock:
- scan.helpers.dns.mock_dns(_dns_mock)
- if scan_callback is not None:
- scan_callback(scan)
- return (
- [e async for e in scan.async_start()],
- dummymodule.events,
- dummymodulenodupes.events,
- dummygraphmodule.events,
- dummygraphoutputmodule.events,
- )
-
- dns_mock_chain = {
- ("test.notreal", "A"): "127.0.0.66",
- ("127.0.0.66", "PTR"): "test.notrealzies",
- ("test.notrealzies", "CNAME"): "www.test.notreal",
- ("www.test.notreal", "A"): "127.0.0.77",
- ("127.0.0.77", "PTR"): "test2.notrealzies",
- ("test2.notrealzies", "A"): "127.0.0.88",
- }
-
- """
-
- # dns search distance = 1, report distance = 0
- events, all_events, all_events_nodups, graph_events, graph_output_events = await do_scan(
- "test.notreal",
- _config={"dns_resolution": True, "scope_dns_search_distance": 1, "scope_report_distance": 0},
- _dns_mock=dns_mock_chain,
- )
-
- assert len(events) == 2
- assert 1 == len([e for e in events if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False and e._graph_important == False])
- assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.66"])
- assert 0 == len([e for e in events if e.type == "DNS_NAME" and e.data == "test.notrealzies"])
- assert 0 == len([e for e in events if e.type == "DNS_NAME" and e.data == "www.test.notreal"])
- assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77"])
-
- assert len(all_events) == 3
- assert 1 == len([e for e in all_events if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.66" and e._internal == True and e._graph_important == False])
- assert 0 == len([e for e in all_events if e.type == "DNS_NAME" and e.data == "test.notrealzies"])
- assert 0 == len([e for e in all_events if e.type == "DNS_NAME" and e.data == "www.test.notreal"])
- assert 0 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77"])
-
- assert len(all_events_nodups) == 3
- assert 1 == len([e for e in all_events_nodups if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.66" and e._internal == True and e._graph_important == False])
- assert 0 == len([e for e in all_events_nodups if e.type == "DNS_NAME" and e.data == "test.notrealzies"])
- assert 0 == len([e for e in all_events_nodups if e.type == "DNS_NAME" and e.data == "www.test.notreal"])
- assert 0 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.77"])
-
- assert len(graph_events) == 2
- assert 1 == len([e for e in graph_events if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False and e._graph_important == False])
- assert 0 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.66"])
- assert 0 == len([e for e in graph_events if e.type == "DNS_NAME" and e.data == "test.notrealzies"])
- assert 0 == len([e for e in graph_events if e.type == "DNS_NAME" and e.data == "www.test.notreal"])
- assert 0 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77"])
-
- assert len(graph_output_events) == 2
- assert 1 == len([e for e in graph_output_events if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False and e._graph_important == False])
- assert 0 == len([e for e in graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.66"])
- assert 0 == len([e for e in graph_output_events if e.type == "DNS_NAME" and e.data == "test.notrealzies"])
- assert 0 == len([e for e in graph_output_events if e.type == "DNS_NAME" and e.data == "www.test.notreal"])
- assert 0 == len([e for e in graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77"])
-
- """
-
- # dns search distance = 2, report distance = 0
- events, all_events, all_events_nodups, graph_events, graph_output_events = await do_scan(
- "test.notreal",
- _config={"dns_resolution": True, "scope_dns_search_distance": 2, "scope_report_distance": 0},
- _dns_mock=dns_mock_chain,
- )
-
- for e in events:
- log.critical(e)
- log.critical("=" * 20)
- for e in all_events:
- log.critical(e)
- log.critical("=" * 20)
- for e in all_events_nodups:
- log.critical(e)
- log.critical("=" * 20)
- for e in graph_events:
- log.critical(e)
- log.critical("=" * 20)
- for e in graph_output_events:
- log.critical(e)
-
- assert len(events) == 3
- assert 1 == len([e for e in events if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False and e._graph_important == False])
- assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.66"])
- assert 0 == len([e for e in events if e.type == "DNS_NAME" and e.data == "test.notrealzies"])
- assert 1 == len([e for e in events if e.type == "DNS_NAME" and e.data == "www.test.notreal" and e._internal == False and e._graph_important == False])
- assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77"])
- assert 0 == len([e for e in events if e.type == "DNS_NAME" and e.data == "test2.notrealzies"])
- assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.88"])
-
- assert len(all_events) == 7
- assert 1 == len([e for e in all_events if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.66" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in all_events if e.type == "DNS_NAME" and e.data == "test.notrealzies" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in all_events if e.type == "DNS_NAME" and e.data == "www.test.notreal" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events if e.type == "DNS_NAME" and e.data == "test2.notrealzies" and e._internal == True and e._graph_important == False])
- assert 0 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.88"])
-
- assert len(all_events_nodups) == 7
- assert 1 == len([e for e in all_events_nodups if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.66" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in all_events_nodups if e.type == "DNS_NAME" and e.data == "test.notrealzies" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in all_events_nodups if e.type == "DNS_NAME" and e.data == "www.test.notreal" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events_nodups if e.type == "DNS_NAME" and e.data == "test2.notrealzies" and e._internal == True and e._graph_important == False])
- assert 0 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.88"])
-
- assert len(graph_events) == 5
- assert 1 == len([e for e in graph_events if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.66" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in graph_events if e.type == "DNS_NAME" and e.data == "test.notrealzies" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in graph_events if e.type == "DNS_NAME" and e.data == "www.test.notreal" and e._internal == False and e._graph_important == False])
- assert 0 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77"])
- assert 0 == len([e for e in graph_events if e.type == "DNS_NAME" and e.data == "test2.notrealzies"])
- assert 0 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.88"])
-
- assert len(graph_output_events) == 5
- assert 1 == len([e for e in graph_output_events if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.66" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in graph_output_events if e.type == "DNS_NAME" and e.data == "test.notrealzies" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in graph_output_events if e.type == "DNS_NAME" and e.data == "www.test.notreal" and e._internal == False and e._graph_important == False])
- assert 0 == len([e for e in graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77"])
- assert 0 == len([e for e in graph_output_events if e.type == "DNS_NAME" and e.data == "test2.notrealzies"])
- assert 0 == len([e for e in graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.88"])
-
- return
-
- """
-
- # dns search distance = 2, report distance = 1
- events, all_events, all_events_nodups, graph_events, graph_output_events = await do_scan(
- "test.notreal",
- _config={"dns_resolution": True, "scope_dns_search_distance": 2, "scope_report_distance": 1},
- _dns_mock=dns_mock_chain,
- )
-
- assert len(events) == 5
- assert 1 == len([e for e in events if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.66" and e._internal == False and e._graph_important == True])
- assert 0 == len([e for e in events if e.type == "DNS_NAME" and e.data == "test.notrealzies"])
- assert 1 == len([e for e in events if e.type == "DNS_NAME" and e.data == "www.test.notreal" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == False and e._graph_important == False])
- assert 0 == len([e for e in events if e.type == "DNS_NAME" and e.data == "test2.notrealzies"])
- assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.88"])
-
- assert len(all_events) == 7
- assert 1 == len([e for e in all_events if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.66" and e._internal == False and e._graph_important == True])
- assert 1 == len([e for e in all_events if e.type == "DNS_NAME" and e.data == "test.notrealzies" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in all_events if e.type == "DNS_NAME" and e.data == "www.test.notreal" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in all_events if e.type == "DNS_NAME" and e.data == "test2.notrealzies" and e._internal == True and e._graph_important == False])
- assert 0 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.88"])
-
- assert len(all_events_nodups) == 7
- assert 1 == len([e for e in all_events_nodups if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.66" and e._internal == False and e._graph_important == True])
- assert 1 == len([e for e in all_events_nodups if e.type == "DNS_NAME" and e.data == "test.notrealzies" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in all_events_nodups if e.type == "DNS_NAME" and e.data == "www.test.notreal" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in all_events_nodups if e.type == "DNS_NAME" and e.data == "test2.notrealzies" and e._internal == True and e._graph_important == False])
- assert 0 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.88"])
-
- assert len(graph_events) == 5
- assert 1 == len([e for e in graph_events if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.66" and e._internal == False and e._graph_important == True])
- assert 1 == len([e for e in graph_events if e.type == "DNS_NAME" and e.data == "test.notrealzies" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in graph_events if e.type == "DNS_NAME" and e.data == "www.test.notreal" and e._internal == False and e._graph_important == False])
- assert 0 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77"])
- assert 0 == len([e for e in graph_events if e.type == "DNS_NAME" and e.data == "test2.notrealzies"])
- assert 0 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.88"])
-
- assert len(graph_output_events) == 7
- assert 1 == len([e for e in graph_output_events if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False and e._graph_important == False])
- assert 2 == len([e for e in graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.66" and e._internal == False and e._graph_important == True])
- assert 1 == len([e for e in graph_output_events if e.type == "DNS_NAME" and e.data == "test.notrealzies" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in graph_output_events if e.type == "DNS_NAME" and e.data == "www.test.notreal" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == False and e._graph_important == False])
- assert 0 == len([e for e in graph_output_events if e.type == "DNS_NAME" and e.data == "test2.notrealzies"])
- assert 0 == len([e for e in graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.88"])
-
- dns_mock_chain = {
- ("test.notreal", "A"): "127.0.0.66",
- ("127.0.0.66", "PTR"): "test.notrealzies",
- ("test.notrealzies", "A"): "127.0.0.77",
- }
-
- class DummyVulnModule(BaseModule):
- _name = "dummyvulnmodule"
- watched_events = ["IP_ADDRESS"]
- scope_distance_modifier = 3
- accept_dupes = True
-
- async def filter_event(self, event):
- if event.data == "127.0.0.77":
- return True
- return False, "bleh"
-
- async def handle_event(self, event):
- self.emit_event(
- {"host": str(event.host), "description": "yep", "severity": "CRITICAL"}, "VULNERABILITY", source=event
- )
-
- def custom_setup(scan):
- dummyvulnmodule = DummyVulnModule(scan)
- scan.modules["dummyvulnmodule"] = dummyvulnmodule
-
- # dns search distance = 3, report distance = 1
- events, all_events, all_events_nodups, graph_events, graph_output_events = await do_scan(
- "test.notreal",
- scan_callback=custom_setup,
- _config={"dns_resolution": True, "scope_dns_search_distance": 3, "scope_report_distance": 1},
- _dns_mock=dns_mock_chain,
- )
-
- assert len(events) == 4
- assert 1 == len([e for e in events if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.66" and e._internal == False and e._graph_important == True])
- assert 0 == len([e for e in events if e.type == "DNS_NAME" and e.data == "test.notrealzies"])
- assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77"])
- assert 1 == len([e for e in events if e.type == "VULNERABILITY" and e.data["host"] == "127.0.0.77" and e._internal == False and e._graph_important == False])
-
- assert len(all_events) == 6
- assert 1 == len([e for e in all_events if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.66" and e._internal == False and e._graph_important == True])
- assert 1 == len([e for e in all_events if e.type == "DNS_NAME" and e.data == "test.notrealzies" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in all_events if e.type == "VULNERABILITY" and e.data["host"] == "127.0.0.77" and e._internal == False and e._graph_important == False])
-
- assert len(all_events_nodups) == 6
- assert 1 == len([e for e in all_events_nodups if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.66" and e._internal == False and e._graph_important == True])
- assert 1 == len([e for e in all_events_nodups if e.type == "DNS_NAME" and e.data == "test.notrealzies" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in all_events_nodups if e.type == "VULNERABILITY" and e.data["host"] == "127.0.0.77" and e._internal == False and e._graph_important == False])
-
- assert len(graph_events) == 5
- assert 1 == len([e for e in graph_events if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.66" and e._internal == False and e._graph_important == True])
- assert 1 == len([e for e in graph_events if e.type == "DNS_NAME" and e.data == "test.notrealzies" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == True and e._graph_important == True])
- assert 0 == len([e for e in graph_events if e.type == "VULNERABILITY" and e.data["host"] == "127.0.0.77"])
-
- assert len(graph_output_events) == 7
- assert 1 == len([e for e in graph_output_events if e.type == "DNS_NAME" and e.data == "test.notreal" and e._internal == False and e._graph_important == False])
- assert 2 == len([e for e in graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.66" and e._internal == False and e._graph_important == True])
- assert 1 == len([e for e in graph_output_events if e.type == "DNS_NAME" and e.data == "test.notrealzies" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in graph_output_events if e.type == "VULNERABILITY" and e.data["host"] == "127.0.0.77" and e._internal == False and e._graph_important == False])
- """
-
- bbot_httpserver.expect_request(uri="/").respond_with_data(response_data="")
- bbot_other_httpserver.expect_request(uri="/").respond_with_data(response_data="")
- bbot_other_httpserver2.expect_request(uri="/").respond_with_data(response_data="")
- bbot_other_httpserver3.expect_request(uri="/").respond_with_data(response_data="")
- bbot_other_httpserver4.expect_request(uri="/").respond_with_data(response_data="")
-
- """
-
- # httpx/speculate IP_RANGE --> IP_ADDRESS --> OPEN_TCP_PORT --> URL, search distance = 0
- events, all_events, all_events_nodups, graph_events, graph_output_events = await do_scan(
- "127.0.0.1/31",
- modules=["httpx", "excavate"],
- _config={
- "scope_search_distance": 0,
- "scope_dns_search_distance": 2,
- "scope_report_distance": 1,
- "speculate": True,
- "internal_modules": {"speculate": {"ports": "8888"}},
- "omit_event_types": ["HTTP_RESPONSE", "URL_UNVERIFIED"],
- },
- )
-
- assert len(events) == 3
- assert 1 == len([e for e in events if e.type == "IP_RANGE" and e.data == "127.0.0.0/31" and e._internal == False and e._graph_important == False])
- assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.0"])
- assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.1"])
- assert 0 == len([e for e in events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.0:8888"])
- assert 0 == len([e for e in events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.1:8888"])
- assert 1 == len([e for e in events if e.type == "URL" and e.data == "http://127.0.0.1:8888/" and e._internal == False and e._graph_important == False])
- assert 0 == len([e for e in events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.1:8888"])
- assert 0 == len([e for e in events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.77:8888/"])
- assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77"])
- assert 0 == len([e for e in events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.77:8888"])
-
- assert len(all_events) == 11
- assert 1 == len([e for e in all_events if e.type == "IP_RANGE" and e.data == "127.0.0.0/31" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.0" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.1" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in all_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.0:8888" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.1:8888" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in all_events if e.type == "URL" and e.data == "http://127.0.0.1:8888/" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in all_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.1:8888" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.77:8888/" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.77:8888" and e._internal == True and e._graph_important == False])
-
- assert len(all_events_nodups) == 11
- assert 1 == len([e for e in all_events_nodups if e.type == "IP_RANGE" and e.data == "127.0.0.0/31" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.0" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.1" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in all_events_nodups if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.0:8888" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events_nodups if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.1:8888" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in all_events_nodups if e.type == "URL" and e.data == "http://127.0.0.1:8888/" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in all_events_nodups if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.1:8888" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events_nodups if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.77:8888/" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events_nodups if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.77:8888" and e._internal == True and e._graph_important == False])
-
- assert len(graph_events) == 10
- assert 1 == len([e for e in graph_events if e.type == "IP_RANGE" and e.data == "127.0.0.0/31" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.0" and e._internal == True and e._graph_important == False])
- assert 2 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.1" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in graph_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.0:8888" and e._internal == True and e._graph_important == False])
- assert 2 == len([e for e in graph_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.1:8888" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in graph_events if e.type == "URL" and e.data == "http://127.0.0.1:8888/" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in graph_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.1:8888" and e._internal == True and e._graph_important == False])
- assert 0 == len([e for e in graph_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.77:8888/" and e._internal == False and e._graph_important == False])
- assert 0 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == True and e._graph_important == False])
- assert 0 == len([e for e in graph_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.77:8888" and e._internal == True and e._graph_important == False])
-
- assert len(graph_output_events) == 5
- assert 1 == len([e for e in graph_output_events if e.type == "IP_RANGE" and e.data == "127.0.0.0/31" and e._internal == False and e._graph_important == False])
- assert 0 == len([e for e in graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.0" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.1" and e._internal == True and e._graph_important == True])
- assert 0 == len([e for e in graph_output_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.0:8888" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in graph_output_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.1:8888" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in graph_output_events if e.type == "URL" and e.data == "http://127.0.0.1:8888/" and e._internal == False and e._graph_important == False])
- assert 0 == len([e for e in graph_output_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.1:8888" and e._internal == True and e._graph_important == False])
- assert 0 == len([e for e in graph_output_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.77:8888/" and e._internal == False and e._graph_important == False])
- assert 0 == len([e for e in graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == True and e._graph_important == False])
- assert 0 == len([e for e in graph_output_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.77:8888" and e._internal == True and e._graph_important == False])
-
- """
-
- # httpx/speculate IP_RANGE --> IP_ADDRESS --> OPEN_TCP_PORT --> URL, search distance = 0
- events, all_events, all_events_nodups, graph_events, graph_output_events = await do_scan(
- "127.0.0.1/31",
- modules=["httpx", "excavate"],
- output_modules=["neo4j"],
- _config={
- "scope_search_distance": 0,
- "scope_dns_search_distance": 2,
- "scope_report_distance": 1,
- "speculate": True,
- "modules": {"httpx": {"in_scope_only": False}},
- "internal_modules": {"speculate": {"ports": "8888"}},
- "output_modules": {"neo4j": {"uri": "bolt://localhost:7687"}},
- "omit_event_types": ["HTTP_RESPONSE", "URL_UNVERIFIED"],
- },
- )
-
- for e in events:
- log.critical(e)
- log.critical("=" * 20)
- for e in all_events:
- log.critical(e)
- log.critical("=" * 20)
- for e in all_events_nodups:
- log.critical(e)
- log.critical("=" * 20)
- for e in graph_events:
- log.critical(e)
- log.critical("=" * 20)
- for e in graph_output_events:
- log.critical(e)
-
- assert len(events) == 4
- assert 1 == len([e for e in events if e.type == "IP_RANGE" and e.data == "127.0.0.0/31" and e._internal == False and e._graph_important == False])
- assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.0"])
- assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.1"])
- assert 0 == len([e for e in events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.0:8888"])
- assert 0 == len([e for e in events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.1:8888"])
- assert 1 == len([e for e in events if e.type == "URL" and e.data == "http://127.0.0.1:8888/" and e._internal == False and e._graph_important == False])
- assert 0 == len([e for e in events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.1:8888"])
- assert 0 == len([e for e in events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.77:8888/"])
- assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77"])
- assert 0 == len([e for e in events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.77:8888"])
- assert 1 == len([e for e in events if e.type == "URL" and e.data == "http://127.0.0.77:8888/" and e._internal == False and e._graph_important == False])
- assert 0 == len([e for e in events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.77:8888"])
- assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.88"])
- assert 0 == len([e for e in events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.77:8888/"])
-
- assert len(all_events) == 15
- assert 1 == len([e for e in all_events if e.type == "IP_RANGE" and e.data == "127.0.0.0/31" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.0" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.1" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in all_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.0:8888" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.1:8888" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in all_events if e.type == "URL" and e.data == "http://127.0.0.1:8888/" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in all_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.1:8888" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.77:8888/" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.77:8888" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events if e.type == "URL" and e.data == "http://127.0.0.77:8888/" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in all_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.77:8888" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.88" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.88:8888/" and e._internal == True and e._graph_important == False])
-
- assert len(all_events_nodups) == 15
- assert 1 == len([e for e in all_events_nodups if e.type == "IP_RANGE" and e.data == "127.0.0.0/31" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.0" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.1" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in all_events_nodups if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.0:8888" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events_nodups if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.1:8888" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in all_events_nodups if e.type == "URL" and e.data == "http://127.0.0.1:8888/" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in all_events_nodups if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.1:8888" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events_nodups if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.77:8888/" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events_nodups if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.77:8888" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events_nodups if e.type == "URL" and e.data == "http://127.0.0.77:8888/" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in all_events_nodups if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.77:8888" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.88" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events_nodups if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.88:8888/" and e._internal == True and e._graph_important == False])
-
- assert len(graph_events) == 10
- assert 1 == len([e for e in graph_events if e.type == "IP_RANGE" and e.data == "127.0.0.0/31" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.0" and e._internal == True and e._graph_important == False])
- assert 2 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.1" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in graph_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.0:8888" and e._internal == True and e._graph_important == False])
- assert 2 == len([e for e in graph_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.1:8888" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in graph_events if e.type == "URL" and e.data == "http://127.0.0.1:8888/" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in graph_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.1:8888" and e._internal == True and e._graph_important == False])
- assert 0 == len([e for e in graph_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.77:8888/" and e._internal == False and e._graph_important == False])
- assert 0 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == True and e._graph_important == False])
- assert 0 == len([e for e in graph_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.77:8888" and e._internal == True and e._graph_important == False])
- assert 0 == len([e for e in graph_events if e.type == "URL" and e.data == "http://127.0.0.77:8888/" and e._internal == False and e._graph_important == False])
- assert 0 == len([e for e in graph_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.77:8888" and e._internal == True and e._graph_important == False])
- assert 0 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.88" and e._internal == True and e._graph_important == False])
- assert 0 == len([e for e in graph_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.88:8888/" and e._internal == True and e._graph_important == False])
-
- assert len(graph_output_events) == 6
- assert 1 == len([e for e in graph_output_events if e.type == "IP_RANGE" and e.data == "127.0.0.0/31" and e._internal == False and e._graph_important == False])
- assert 0 == len([e for e in graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.0" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.1" and e._internal == True and e._graph_important == True])
- assert 0 == len([e for e in graph_output_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.0:8888" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in graph_output_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.1:8888" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in graph_output_events if e.type == "URL" and e.data == "http://127.0.0.1:8888/" and e._internal == False and e._graph_important == False])
- assert 0 == len([e for e in graph_output_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.1:8888" and e._internal == True and e._graph_important == False])
- assert 0 == len([e for e in graph_output_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.77:8888/" and e._internal == False and e._graph_important == False])
- assert 0 == len([e for e in graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == True and e._graph_important == False])
- assert 0 == len([e for e in graph_output_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.77:8888" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in graph_output_events if e.type == "URL" and e.data == "http://127.0.0.77:8888/" and e._internal == False and e._graph_important == False])
- assert 0 == len([e for e in graph_output_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.77:8888" and e._internal == True and e._graph_important == False])
- assert 0 == len([e for e in graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.88" and e._internal == True and e._graph_important == False])
- assert 0 == len([e for e in graph_output_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.88:8888/" and e._internal == True and e._graph_important == False])
-
- return
-
- # httpx/speculate IP_RANGE --> IP_ADDRESS --> OPEN_TCP_PORT --> URL, search distance = 1
- events, all_events, all_events_nodups, graph_events, graph_output_events = await do_scan(
- "127.0.0.1/31",
- modules=["httpx", "excavate"],
- _config={
- "scope_search_distance": 1,
- "scope_dns_search_distance": 2,
- "scope_report_distance": 1,
- "speculate": True,
- "modules": {"httpx": {"in_scope_only": False}},
- "internal_modules": {"speculate": {"ports": "8888"}},
- "omit_event_types": ["HTTP_RESPONSE", "URL_UNVERIFIED"],
- },
- )
-
- for e in events:
- log.critical(e)
- log.critical("=" * 20)
- for e in all_events:
- log.critical(e)
- log.critical("=" * 20)
- for e in all_events_nodups:
- log.critical(e)
- log.critical("=" * 20)
- for e in graph_events:
- log.critical(e)
- log.critical("=" * 20)
- for e in graph_output_events:
- log.critical(e)
-
- assert len(events) == 4
- assert 1 == len([e for e in events if e.type == "IP_RANGE" and e.data == "127.0.0.0/31" and e._internal == False and e._graph_important == False])
- assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.0"])
- assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.1"])
- assert 0 == len([e for e in events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.0:8888"])
- assert 0 == len([e for e in events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.1:8888"])
- assert 1 == len([e for e in events if e.type == "URL" and e.data == "http://127.0.0.1:8888/" and e._internal == False and e._graph_important == False])
- assert 0 == len([e for e in events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.1:8888"])
- assert 0 == len([e for e in events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.77:8888/"])
- assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77"])
- assert 0 == len([e for e in events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.77:8888"])
- assert 1 == len([e for e in events if e.type == "URL" and e.data == "http://127.0.0.77:8888/" and e._internal == False and e._graph_important == False])
- assert 0 == len([e for e in events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.77:8888"])
- assert 0 == len([e for e in events if e.type == "IP_ADDRESS" and e.data == "127.0.0.88"])
- assert 0 == len([e for e in events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.77:8888/"])
-
- return
-
- assert len(all_events) == 15
- assert 1 == len([e for e in all_events if e.type == "IP_RANGE" and e.data == "127.0.0.0/31" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.0" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.1" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in all_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.0:8888" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.1:8888" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in all_events if e.type == "URL" and e.data == "http://127.0.0.1:8888/" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in all_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.1:8888" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.77:8888/" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.77:8888" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events if e.type == "URL" and e.data == "http://127.0.0.77:8888/" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in all_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.77:8888" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.88" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.88:8888/" and e._internal == True and e._graph_important == False])
-
- assert len(all_events_nodups) == 15
- assert 1 == len([e for e in all_events_nodups if e.type == "IP_RANGE" and e.data == "127.0.0.0/31" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.0" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.1" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in all_events_nodups if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.0:8888" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events_nodups if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.1:8888" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in all_events_nodups if e.type == "URL" and e.data == "http://127.0.0.1:8888/" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in all_events_nodups if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.1:8888" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events_nodups if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.77:8888/" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events_nodups if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.77:8888" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events_nodups if e.type == "URL" and e.data == "http://127.0.0.77:8888/" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in all_events_nodups if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.77:8888" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events_nodups if e.type == "IP_ADDRESS" and e.data == "127.0.0.88" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in all_events_nodups if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.88:8888/" and e._internal == True and e._graph_important == False])
-
- assert len(graph_events) == 10
- assert 1 == len([e for e in graph_events if e.type == "IP_RANGE" and e.data == "127.0.0.0/31" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.0" and e._internal == True and e._graph_important == False])
- assert 2 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.1" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in graph_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.0:8888" and e._internal == True and e._graph_important == False])
- assert 2 == len([e for e in graph_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.1:8888" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in graph_events if e.type == "URL" and e.data == "http://127.0.0.1:8888/" and e._internal == False and e._graph_important == False])
- assert 1 == len([e for e in graph_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.1:8888" and e._internal == True and e._graph_important == False])
- assert 0 == len([e for e in graph_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.77:8888/" and e._internal == False and e._graph_important == False])
- assert 0 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == True and e._graph_important == False])
- assert 0 == len([e for e in graph_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.77:8888" and e._internal == True and e._graph_important == False])
- assert 0 == len([e for e in graph_events if e.type == "URL" and e.data == "http://127.0.0.77:8888/" and e._internal == False and e._graph_important == False])
- assert 0 == len([e for e in graph_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.77:8888" and e._internal == True and e._graph_important == False])
- assert 0 == len([e for e in graph_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.88" and e._internal == True and e._graph_important == False])
- assert 0 == len([e for e in graph_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.88:8888/" and e._internal == True and e._graph_important == False])
-
- assert len(graph_output_events) == 5
- assert 1 == len([e for e in graph_output_events if e.type == "IP_RANGE" and e.data == "127.0.0.0/31" and e._internal == False and e._graph_important == False])
- assert 0 == len([e for e in graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.0" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.1" and e._internal == True and e._graph_important == True])
- assert 0 == len([e for e in graph_output_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.0:8888" and e._internal == True and e._graph_important == False])
- assert 1 == len([e for e in graph_output_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.1:8888" and e._internal == True and e._graph_important == True])
- assert 1 == len([e for e in graph_output_events if e.type == "URL" and e.data == "http://127.0.0.1:8888/" and e._internal == False and e._graph_important == False])
- assert 0 == len([e for e in graph_output_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.1:8888" and e._internal == True and e._graph_important == False])
- assert 0 == len([e for e in graph_output_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.77:8888/" and e._internal == False and e._graph_important == False])
- assert 0 == len([e for e in graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.77" and e._internal == True and e._graph_important == False])
- assert 0 == len([e for e in graph_output_events if e.type == "OPEN_TCP_PORT" and e.data == "127.0.0.77:8888" and e._internal == True and e._graph_important == False])
- assert 0 == len([e for e in graph_output_events if e.type == "URL" and e.data == "http://127.0.0.77:8888/" and e._internal == False and e._graph_important == False])
- assert 0 == len([e for e in graph_output_events if e.type == "HTTP_RESPONSE" and e.data["input"] == "127.0.0.77:8888" and e._internal == True and e._graph_important == False])
- assert 0 == len([e for e in graph_output_events if e.type == "IP_ADDRESS" and e.data == "127.0.0.88" and e._internal == True and e._graph_important == False])
- assert 0 == len([e for e in graph_output_events if e.type == "URL_UNVERIFIED" and e.data == "http://127.0.0.88:8888/" and e._internal == True and e._graph_important == False])
diff --git a/pyproject.toml b/pyproject.toml
index b73db180ad..4d48ca3336 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -84,7 +84,7 @@ build-backend = "poetry_dynamic_versioning.backend"
[tool.black]
line-length = 119
-exclude = "bbot/test/test_step_1/test_manager_scope_shepherding.py"
+extend-exclude = "test_manager_scope_accuracy.py"
[tool.poetry-dynamic-versioning]
enable = true