diff --git a/bbot/core/helpers/misc.py b/bbot/core/helpers/misc.py index ebf9ef779..2787845ca 100644 --- a/bbot/core/helpers/misc.py +++ b/bbot/core/helpers/misc.py @@ -2483,7 +2483,7 @@ def parse_port_string(port_string): >>> parse_port_string("invalid") ValueError: Invalid port or port range: invalid """ - elements = port_string.split(",") + elements = str(port_string).split(",") ports = [] for element in elements: diff --git a/bbot/modules/base.py b/bbot/modules/base.py index 85ac9e58c..660111d3f 100644 --- a/bbot/modules/base.py +++ b/bbot/modules/base.py @@ -663,19 +663,6 @@ def _event_precheck(self, event): # exclude certain URLs (e.g. javascript): if event.type.startswith("URL") and self.name != "httpx" and "httpx-only" in event.tags: return False, "its extension was listed in url_extension_httpx_only" - # if event is an IP address that was speculated from a CIDR - source_is_range = getattr(event.source, "type", "") == "IP_RANGE" - if ( - source_is_range - and event.type == "IP_ADDRESS" - and str(event.module) == "speculate" - and self.name != "speculate" - ): - # and the current module listens for both ranges and CIDRs - if all([x in self.watched_events for x in ("IP_RANGE", "IP_ADDRESS")]): - # then skip the event. - # this helps avoid double-portscanning both an individual IP and its parent CIDR. - return False, "module consumes IP ranges directly" return True, "precheck succeeded" @@ -718,11 +705,12 @@ async def __event_postcheck(self, event): if self._is_graph_important(event): return True, "event is critical to the graph" - # don't send out-of-scope targets to active modules + # don't send out-of-scope targets to active modules (excluding portscanners, because they can handle it) # this only takes effect if your target and whitelist are different # TODO: the logic here seems incomplete, it could probably use some work. - if "active" in self.flags and "target" in event.tags and event not in self.scan.whitelist: - return False, "it is not in whitelist and module has active flag" + if "active" in self.flags and "portscan" not in self.flags: + if "target" in event.tags and event not in self.scan.whitelist: + return False, "it is not in whitelist and module has active flag" # check scope distance filter_result, reason = self._scope_distance_check(event) @@ -811,10 +799,10 @@ async def queue_event(self, event, precheck=True): acceptable, reason = self._event_precheck(event) if not acceptable: if reason and reason != "its type is not in watched_events": - self.debug(f"Not accepting {event} because {reason}") + self.debug(f"Not queueing {event} because {reason}") return else: - self.debug(f"Accepting {event} because {reason}") + self.debug(f"Queueing {event} because {reason}") try: self.incoming_event_queue.put_nowait(event) async with self._event_received: @@ -914,7 +902,7 @@ def _outgoing_dedup_hash(self, event): """ Determines the criteria for what is considered to be a duplicate event if `suppress_dupes` is True. """ - return hash(event) + return hash((event, self.name)) def get_per_host_hash(self, event): """ diff --git a/bbot/modules/internal/speculate.py b/bbot/modules/internal/speculate.py index e37ba9e08..73c2d7438 100644 --- a/bbot/modules/internal/speculate.py +++ b/bbot/modules/internal/speculate.py @@ -35,19 +35,19 @@ class speculate(BaseInternalModule): _priority = 4 async def setup(self): - self.open_port_consumers = any(["OPEN_TCP_PORT" in m.watched_events for m in self.scan.modules.values()]) + scan_modules = [m for m in self.scan.modules.values() if m._type == "scan"] + self.open_port_consumers = any(["OPEN_TCP_PORT" in m.watched_events for m in scan_modules]) self.portscanner_enabled = any(["portscan" in m.flags for m in self.scan.modules.values()]) + self.emit_open_ports = self.open_port_consumers and not self.portscanner_enabled self.range_to_ip = True self.dns_resolution = self.scan.config.get("dns_resolution", True) self.org_stubs_seen = set() port_string = self.config.get("ports", "80,443") - try: self.ports = self.helpers.parse_port_string(str(port_string)) except ValueError as e: - self.warning(f"Error parsing ports: {e}") - return False + return False, f"Error parsing ports: {e}" if not self.portscanner_enabled: self.info(f"No portscanner enabled. Assuming open ports: {', '.join(str(x) for x in self.ports)}") @@ -79,10 +79,15 @@ async def handle_event(self, event): self.emit_event(parent, "DNS_NAME", source=event, internal=True) # generate open ports - emit_open_ports = self.open_port_consumers and not self.portscanner_enabled + + # we speculate on distance-1 stuff too, because distance-1 open ports are needed by certain modules like sslcert + event_in_scope_distance = event.scope_distance <= (self.scan.scope_search_distance + 1) + speculate_open_ports = self.emit_open_ports and event_in_scope_distance + # from URLs - if event.type == "URL" or (event.type == "URL_UNVERIFIED" and emit_open_ports): - if event.host and event.port not in self.ports: + if event.type == "URL" or (event.type == "URL_UNVERIFIED" and self.open_port_consumers): + # only speculate port from a URL if it wouldn't be speculated naturally from the host + if event.host and (event.port not in self.ports or not speculate_open_ports): self.emit_event( self.helpers.make_netloc(event.host, event.port), "OPEN_TCP_PORT", @@ -103,7 +108,7 @@ async def handle_event(self, event): self.emit_event(url_event) # from hosts - if emit_open_ports: + if speculate_open_ports: # don't act on unresolved DNS_NAMEs usable_dns = False if event.type == "DNS_NAME": @@ -149,11 +154,7 @@ async def handle_event(self, event): self.emit_event(stub_event) async def filter_event(self, event): - # don't accept IP_RANGE --> IP_ADDRESS events from self - if str(event.module) == "speculate": - if not (event.type == "IP_ADDRESS" and str(getattr(event.source, "type")) == "IP_RANGE"): - return False # don't accept errored DNS_NAMEs if any(t in event.tags for t in ("unresolved", "a-error", "aaaa-error")): - return False + return False, "there were errors resolving this hostname" return True diff --git a/bbot/modules/masscan.py b/bbot/modules/masscan.py index eb455d8fc..d83747649 100644 --- a/bbot/modules/masscan.py +++ b/bbot/modules/masscan.py @@ -1,27 +1,26 @@ import json -import subprocess from contextlib import suppress -from bbot.modules.base import BaseModule +from bbot.modules.templates.portscanner import portscanner -class masscan(BaseModule): +class masscan(portscanner): flags = ["active", "portscan", "aggressive"] - watched_events = ["SCAN"] + watched_events = ["IP_ADDRESS", "IP_RANGE"] produced_events = ["OPEN_TCP_PORT"] - meta = {"description": "Port scan IP subnets with masscan"} - # 600 packets/s ~= entire private IP space in 8 hours + meta = {"description": "Port scan with masscan. By default, scans top 100 ports."} options = { "top_ports": 100, "ports": "", + # ping scan at 600 packets/s ~= entire private IP space in 8 hours "rate": 600, - "wait": 10, + "wait": 5, "ping_first": False, "ping_only": False, "use_cache": False, } options_desc = { - "top_ports": "Top ports to scan (default 100)", + "top_ports": "Top ports to scan (default 100) (to override, specify 'ports')", "ports": "Ports to scan", "rate": "Rate in packets per second", "wait": "Seconds to wait for replies after scan is complete", @@ -58,30 +57,30 @@ class masscan(BaseModule): "copy": {"src": "#{BBOT_TEMP}/masscan/bin/masscan", "dest": "#{BBOT_TOOLS}/", "mode": "u+x,g+x,o+x"}, }, ] - _qsize = 100 + batch_size = 1000000 async def setup(self): self.top_ports = self.config.get("top_ports", 100) - self.ports = self.config.get("ports", "80,443") self.rate = self.config.get("rate", 600) self.wait = self.config.get("wait", 10) self.ping_first = self.config.get("ping_first", False) self.ping_only = self.config.get("ping_only", False) - self.alive_hosts = dict() - # make a quick dry run to validate ports etc. - self._target_findkey = "9.8.7.6" - if not self.helpers.in_tests: + self.use_cache = self.config.get("use_cache", False) + self.ports = self.config.get("ports", "") + if self.ports: try: - dry_run_command = self._build_masscan_command(self._target_findkey, dry_run=True) - dry_run_result = await self.helpers.run(dry_run_command) - self.masscan_config = dry_run_result.stdout - self.masscan_config = "\n".join(l for l in self.masscan_config.splitlines() if "nocapture" not in l) - except subprocess.CalledProcessError as e: - self.warning(f"Error in masscan: {e.stderr}") - return False + self.helpers.parse_port_string(self.ports) + except ValueError as e: + return False, f"Error parsing ports: {e}" + self.alive_hosts = dict() + + _, invalid_targets = self._build_targets(self.scan.target) + if invalid_targets > 0: + self.warning( + f"Masscan can only accept IP addresses or IP ranges as target ({invalid_targets:,} targets were hostnames)" + ) self.run_time = self.helpers.make_date() - self.use_cache = self.config.get("use_cache", False) self.ping_cache = self.scan.home / f"masscan_ping.txt" self.syn_cache = self.scan.home / f"masscan_syn.txt" if self.use_cache: @@ -101,23 +100,14 @@ async def setup(self): self.helpers.depsinstaller.ensure_root(message="Masscan requires root privileges") self.ping_cache_fd = None self.syn_cache_fd = None - return True - async def handle_event(self, event): + return await super().setup() + + async def handle_batch(self, *events): if self.use_cache: self.emit_from_cache() else: - exclude, invalid_exclude = self._build_targets(self.scan.blacklist) - targets, invalid_targets = self._build_targets(self.scan.whitelist) - if invalid_exclude > 0: - self.warning( - f"Masscan can only accept IP addresses or IP ranges for blacklist ({invalid_exclude:,} blacklisted were hostnames)" - ) - if invalid_targets > 0: - self.warning( - f"Masscan can only accept IP addresses or IP ranges as target ({invalid_targets:,} targets were hostnames)" - ) - + targets = [str(e.data) for e in events] if not targets: self.warning("No targets specified") return @@ -126,7 +116,7 @@ async def handle_event(self, event): if self.ping_first or self.ping_only: self.verbose("Starting masscan (ping scan)") - await self.masscan(targets, result_callback=self.append_alive_host, exclude=exclude, ping=True) + await self.masscan(targets, result_callback=self.append_alive_host, ping=True) targets = ",".join(str(h) for h in self.alive_hosts) if not targets: self.warning("No hosts responded to pings") @@ -135,48 +125,46 @@ async def handle_event(self, event): # TCP SYN scan if not self.ping_only: self.verbose("Starting masscan (TCP SYN scan)") - await self.masscan(targets, result_callback=self.emit_open_tcp_port, exclude=exclude) + await self.masscan(targets, result_callback=self.emit_open_tcp_port) else: self.verbose("Only ping sweep was requested, skipping TCP SYN scan") # save memory self.alive_hosts.clear() - async def masscan(self, targets, result_callback, exclude=None, ping=False): - # config file - masscan_config = self.masscan_config.replace(self._target_findkey, targets) - self.debug("Masscan config:") - for line in masscan_config.splitlines(): - self.debug(line) - config_file = self.helpers.tempfile(masscan_config) - # output file - # process_output = functools.partial(self.process_output, result_callback=result_callback) - # json_output_file = self.helpers.tempfile_tail(process_output) - # command - command = self._build_masscan_command(config=config_file, exclude=exclude, ping=ping) - # execute + async def masscan(self, targets, result_callback, ping=False): + target_file = self.helpers.tempfile(targets, pipe=False) + command = self._build_masscan_command(target_file, ping=ping) stats_file = self.helpers.tempfile_tail(callback=self.verbose) try: with open(stats_file, "w") as stats_fh: async for line in self.helpers.run_live(command, sudo=True, stderr=stats_fh): self.process_output(line, result_callback=result_callback) finally: - stats_file.unlink() + for file in (stats_file, target_file): + file.unlink() - def _build_masscan_command(self, targets=None, config=None, exclude=None, dry_run=False, ping=False): - command = ("masscan", "--rate", self.rate, "--wait", self.wait, "--open-only", "-oJ", "-") - if targets is not None: - command += (targets,) - if config is not None: - command += ("-c", config) + def _build_masscan_command(self, target_file=None, dry_run=False, ping=False): + command = ( + "masscan", + "--excludefile", + str(self.exclude_file), + "--rate", + self.rate, + "--wait", + self.wait, + "--open-only", + "-oJ", + "-", + ) + if target_file is not None: + command += ("-iL", str(target_file)) if ping: command += ("--ping",) - elif not dry_run: + else: if self.ports: command += ("-p", self.ports) else: command += ("--top-ports", str(self.top_ports)) - if exclude is not None: - command += ("--exclude", exclude) if dry_run: command += ("--echo",) return command @@ -258,23 +246,14 @@ def get_source_event(self, host): source_event = self.scan.root_event return source_event - def _build_targets(self, target): - invalid_targets = 0 - targets = [] - for t in target: - t = self.helpers.make_ip_type(t.data) - if isinstance(t, str): - invalid_targets += 1 - else: - targets.append(t) - return ",".join(str(t) for t in targets), invalid_targets - async def cleanup(self): if self.ping_first: with suppress(Exception): self.ping_cache_fd.close() with suppress(Exception): self.syn_cache_fd.close() + with suppress(Exception): + self.exclude_file.unlink() def _write_ping_result(self, host): if self.ping_cache_fd is None: diff --git a/bbot/modules/nmap.py b/bbot/modules/nmap.py index 5e485e5ac..0cc614b43 100644 --- a/bbot/modules/nmap.py +++ b/bbot/modules/nmap.py @@ -1,21 +1,21 @@ from lxml import etree -from bbot.modules.base import BaseModule +from bbot.modules.templates.portscanner import portscanner -class nmap(BaseModule): - watched_events = ["IP_ADDRESS", "DNS_NAME"] +class nmap(portscanner): + watched_events = ["IP_ADDRESS", "DNS_NAME", "IP_RANGE"] produced_events = ["OPEN_TCP_PORT"] flags = ["active", "portscan", "aggressive", "web-thorough"] - meta = {"description": "Execute port scans with nmap"} + meta = {"description": "Port scan with nmap. By default, scans top 100 ports."} options = { - "ports": "", "top_ports": 100, + "ports": "", "timing": "T4", "skip_host_discovery": True, } options_desc = { - "ports": "ports to scan", - "top_ports": "top ports to scan", + "top_ports": "Top ports to scan (default 100) (to override, specify 'ports')", + "ports": "Ports to scan", "timing": "-T<0-5>: Set timing template (higher is faster)", "skip_host_discovery": "skip host discovery (-Pn)", } @@ -32,7 +32,7 @@ async def setup(self): self.timing = self.config.get("timing", "T4") self.top_ports = self.config.get("top_ports", 100) self.skip_host_discovery = self.config.get("skip_host_discovery", True) - return True + return await super().setup() async def handle_batch(self, *events): target = self.helpers.make_target(*events) @@ -65,6 +65,8 @@ def construct_command(self, targets): temp_filename = self.helpers.temp_filename(extension="xml") command = [ "nmap", + "--excludefile", + str(self.exclude_file), "-n", "--resolve-all", f"-{self.timing}", diff --git a/bbot/modules/output/asset_inventory.py b/bbot/modules/output/asset_inventory.py index 56e94aaa2..db9fcd946 100644 --- a/bbot/modules/output/asset_inventory.py +++ b/bbot/modules/output/asset_inventory.py @@ -36,9 +36,6 @@ class asset_inventory(CSV): async def setup(self): self.assets = {} - self.open_port_producers = "httpx" in self.scan.modules or any( - ["portscan" in m.flags for m in self.scan.modules.values()] - ) self.use_previous = self.config.get("use_previous", False) self.summary_netmask = self.config.get("summary_netmask", 16) self.emitted_contents = False @@ -136,6 +133,8 @@ async def finish(self): with open(self.output_file, newline="") as f: c = csv.DictReader(f) for row in c: + # yield to event loop to make sure we don't hold up the scan + await self.helpers.sleep(0) host = row.get("Host", "").strip() ips = row.get("IP(s)", "") if not host or not ips: diff --git a/bbot/modules/output/base.py b/bbot/modules/output/base.py index 99a03b0cf..9d5f2c3ee 100644 --- a/bbot/modules/output/base.py +++ b/bbot/modules/output/base.py @@ -39,19 +39,6 @@ def _event_precheck(self, event): if event._internal: return False, "_internal is True" - # if event is an IP address that was speculated from a CIDR - source_is_range = getattr(event.source, "type", "") == "IP_RANGE" - if ( - source_is_range - and event.type == "IP_ADDRESS" - and str(event.module) == "speculate" - and self.name != "speculate" - ): - # and the current module listens for both ranges and CIDRs - if all([x in self.watched_events for x in ("IP_RANGE", "IP_ADDRESS")]): - # then skip the event. - # this helps avoid double-portscanning both an individual IP and its parent CIDR. - return False, "module consumes IP ranges directly" return True, "precheck succeeded" def is_incoming_duplicate(self, event, add=False): diff --git a/bbot/modules/templates/portscanner.py b/bbot/modules/templates/portscanner.py new file mode 100644 index 000000000..ef27c36a9 --- /dev/null +++ b/bbot/modules/templates/portscanner.py @@ -0,0 +1,53 @@ +from bbot.modules.base import BaseModule + + +class portscanner(BaseModule): + """ + A portscanner containing useful methods for nmap, masscan, etc. + """ + + async def setup(self): + self.ip_ranges = [e.host for e in self.scan.target.events if e.type == "IP_RANGE"] + exclude, invalid_exclude = self._build_targets(self.scan.blacklist) + if not exclude: + exclude = ["255.255.255.255/32"] + self.exclude_file = self.helpers.tempfile(exclude, pipe=False) + if invalid_exclude > 0: + self.warning( + f"Port scanner can only accept IP addresses or IP ranges as blacklist ({invalid_exclude:,} blacklisted were hostnames)" + ) + return True + + async def filter_event(self, event): + """ + The purpose of this filter_event is to decide whether we should accept individual IP_ADDRESS + events that reside inside our target subnets (IP_RANGE), if any. + + This prevents scanning the same IP twice. + """ + # if we are emitting hosts from a previous asset_inventory, this is a special case + # in this case we want to accept the individual IPs even if they overlap with our target ranges + asset_inventory_module = self.scan.modules.get("asset_inventory", None) + asset_inventory_config = getattr(asset_inventory_module, "config", {}) + asset_inventory_use_previous = asset_inventory_config.get("use_previous", False) + if event.type == "IP_ADDRESS" and not asset_inventory_use_previous: + for net in self.helpers.ip_network_parents(event.data, include_self=True): + if net in self.ip_ranges: + return False, f"skipping {event.host} because it is already included in {net}" + elif event.type == "IP_RANGE" and asset_inventory_use_previous: + return False, f"skipping IP_RANGE {event.host} because asset_inventory.use_previous=True" + return True + + def _build_targets(self, target, delimiter=","): + invalid_targets = 0 + targets = [] + for t in target: + t = self.helpers.make_ip_type(t.data) + if isinstance(t, str): + invalid_targets += 1 + else: + if self.helpers.is_ip(t): + targets.append(f"{t}/32") + else: + targets.append(str(t)) + return targets, invalid_targets diff --git a/bbot/test/test_step_1/test_helpers.py b/bbot/test/test_step_1/test_helpers.py index 59cbbf87b..0c50f65eb 100644 --- a/bbot/test/test_step_1/test_helpers.py +++ b/bbot/test/test_step_1/test_helpers.py @@ -679,6 +679,7 @@ async def do_stuff(r): def test_portparse_singleports(helpers): assert helpers.parse_port_string("80,443,22") == [80, 443, 22] + assert helpers.parse_port_string(80) == [80] def test_portparse_range_valid(helpers): diff --git a/bbot/test/test_step_1/test_modules_basic.py b/bbot/test/test_step_1/test_modules_basic.py index 79c27c5c9..5a50d4260 100644 --- a/bbot/test/test_step_1/test_modules_basic.py +++ b/bbot/test/test_step_1/test_modules_basic.py @@ -50,15 +50,6 @@ async def test_modules_basic(scan, helpers, events, bbot_config, bbot_scanner, h localhost2.add_tag("target") assert base_module._event_precheck(localhost2)[0] == True base_module.target_only = False - # special case for IPs and ranges - base_module.watched_events = ["IP_ADDRESS", "IP_RANGE"] - ip_range = scan.make_event("127.0.0.0/24", dummy=True) - localhost4 = scan.make_event("127.0.0.1", source=ip_range) - localhost4.scope_distance = 0 - localhost4.module = "plumbus" - assert base_module._event_precheck(localhost4)[0] == True - localhost4.module = "speculate" - assert base_module._event_precheck(localhost4)[0] == False # in scope only base_module.in_scope_only = True @@ -71,7 +62,6 @@ async def test_modules_basic(scan, helpers, events, bbot_config, bbot_scanner, h assert reason == "it did not meet in_scope_only filter criteria" base_module.in_scope_only = False base_module.scope_distance_modifier = 0 - localhost4 = scan.make_event("127.0.0.1", source=events.subdomain) valid, reason = await base_module._event_postcheck(events.localhost) assert valid diff --git a/bbot/test/test_step_2/module_tests/test_module_masscan.py b/bbot/test/test_step_2/module_tests/test_module_masscan.py index 3bb24341b..c489d8518 100644 --- a/bbot/test/test_step_2/module_tests/test_module_masscan.py +++ b/bbot/test/test_step_2/module_tests/test_module_masscan.py @@ -5,14 +5,6 @@ class TestMasscan(ModuleTestBase): targets = ["8.8.8.8/32"] scan_name = "test_masscan" config_overrides = {"modules": {"masscan": {"ports": "443", "wait": 1}}} - masscan_config = """seed = 17230484647655100360 -rate = 600 -shard = 1/1 - - -# TARGET SELECTION (IP, PORTS, EXCLUDES) -ports = -range = 9.8.7.6""" masscan_output = """[ { "ip": "8.8.8.8", "timestamp": "1680197558", "ports": [ {"port": 443, "proto": "tcp", "status": "open", "reason": "syn-ack", "ttl": 54} ] } @@ -30,7 +22,6 @@ async def run_masscan(command, *args, **kwargs): async for l in module_test.scan.helpers.run_live(command, *args, **kwargs): yield l - module_test.scan.modules["masscan"].masscan_config = self.masscan_config module_test.monkeypatch.setattr(module_test.scan.helpers, "run_live", run_masscan) def check(self, module_test, events): diff --git a/bbot/test/test_step_2/module_tests/test_module_nmap.py b/bbot/test/test_step_2/module_tests/test_module_nmap.py index 58bb801d1..092f84a47 100644 --- a/bbot/test/test_step_2/module_tests/test_module_nmap.py +++ b/bbot/test/test_step_2/module_tests/test_module_nmap.py @@ -2,9 +2,74 @@ class TestNmap(ModuleTestBase): - targets = ["127.0.0.1"] + targets = ["127.0.0.1/31"] config_overrides = {"modules": {"nmap": {"ports": "8888,8889"}}} + async def setup_after_prep(self, module_test): + # make sure our IP_RANGE / IP_ADDRESS filtering is working right + # IPs within the target IP range should be rejected + ip_event_1 = module_test.scan.make_event("127.0.0.0", source=module_test.scan.root_event) + ip_event_1.scope_distance = 0 + ip_event_1_result = await module_test.module._event_postcheck(ip_event_1) + assert ip_event_1_result[0] == False + assert ( + "it did not meet custom filter criteria: skipping 127.0.0.0 because it is already included in 127.0.0.0/31" + in ip_event_1_result[1] + ) + # but ones outside should be accepted + ip_event_2 = module_test.scan.make_event("127.0.0.3", source=module_test.scan.root_event) + ip_event_2.scope_distance = 0 + assert (await module_test.module._event_postcheck(ip_event_2))[0] == True + + def check(self, module_test, events): + assert 1 == len([e for e in events if e.data == "127.0.0.1:8888"]) + assert not any(e.data == "127.0.0.1:8889" for e in events) + + +class TestNmapAssetInventory(ModuleTestBase): + targets = ["127.0.0.1/31"] + config_overrides = { + "modules": {"nmap": {"ports": "8888,8889"}}, + "output_modules": {"asset_inventory": {"use_previous": True}}, + } + modules_overrides = ["nmap", "asset_inventory"] + module_name = "nmap" + scan_name = "nmap_test_asset_inventory" + + async def setup_after_prep(self, module_test): + from bbot.scanner import Scanner + + first_scan_config = module_test.scan.config.copy() + first_scan_config["output_modules"]["asset_inventory"]["use_previous"] = False + first_scan = Scanner("127.0.0.1", name=self.scan_name, modules=["asset_inventory"], config=first_scan_config) + await first_scan.async_start_without_generator() + + asset_inventory_output_file = first_scan.home / "asset-inventory.csv" + assert "127.0.0.1," in open(asset_inventory_output_file).read() + # make sure our IP_RANGE / IP_ADDRESS filtering is working right + # IPs within the target IP range should not be rejected because asset_inventory.use_previous=true + ip_event_1 = module_test.scan.make_event("127.0.0.0", source=module_test.scan.root_event) + ip_event_1.scope_distance = 0 + assert (await module_test.module._event_postcheck(ip_event_1))[0] == True + # but ones outside should be accepted + ip_event_2 = module_test.scan.make_event("127.0.0.3", source=module_test.scan.root_event) + ip_event_2.scope_distance = 0 + assert (await module_test.module._event_postcheck(ip_event_2))[0] == True + + ip_range_event = module_test.scan.make_event("127.0.0.1/31", source=module_test.scan.root_event) + ip_range_event.scope_distance = 0 + ip_range_filter_result = await module_test.module._event_postcheck(ip_range_event) + assert ip_range_filter_result[0] == False + assert f"skipping IP_RANGE 127.0.0.0/31 because asset_inventory.use_previous=True" in ip_range_filter_result[1] + def check(self, module_test, events): - assert any(e.data == "127.0.0.1:8888" for e in events) + assert 1 == len( + [ + e + for e in events + if e.data == "127.0.0.1:8888" + and e.source.data == "127.0.0.1" + and str(e.source.module) == "asset_inventory" + ] + ) assert not any(e.data == "127.0.0.1:8889" for e in events)