diff --git a/bbot/core/event/base.py b/bbot/core/event/base.py index 22326fb49..4e4ae54a2 100644 --- a/bbot/core/event/base.py +++ b/bbot/core/event/base.py @@ -24,7 +24,6 @@ smart_decode, get_file_extension, validators, - smart_decode_punycode, tagify, ) @@ -997,9 +996,7 @@ def make_event( return data else: if event_type is None: - if isinstance(data, str): - data = smart_decode_punycode(data) - event_type = get_event_type(data) + event_type, data = get_event_type(data) if not dummy: log.debug(f'Autodetected event type "{event_type}" based on data: "{data}"') diff --git a/bbot/core/event/helpers.py b/bbot/core/event/helpers.py index 6df0fe2ee..228be7c33 100644 --- a/bbot/core/event/helpers.py +++ b/bbot/core/event/helpers.py @@ -3,7 +3,7 @@ from contextlib import suppress from bbot.core.errors import ValidationError -from bbot.core.helpers import sha1, smart_decode, smart_decode_punycode +from bbot.core.helpers import sha1, smart_decode, smart_encode_punycode from bbot.core.helpers.regexes import event_type_regexes, event_id_regex @@ -14,25 +14,26 @@ def get_event_type(data): """ Attempt to divine event type from data """ - data = smart_decode_punycode(smart_decode(data).strip()) # IP address with suppress(Exception): ipaddress.ip_address(data) - return "IP_ADDRESS" + return "IP_ADDRESS", data # IP network with suppress(Exception): ipaddress.ip_network(data, strict=False) - return "IP_RANGE" + return "IP_RANGE", data + + data = smart_encode_punycode(smart_decode(data).strip()) # Strict regexes for t, regexes in event_type_regexes.items(): for r in regexes: if r.match(data): if t == "URL": - return "URL_UNVERIFIED" - return t + return "URL_UNVERIFIED", data + return t, data raise ValidationError(f'Unable to autodetect event type from "{data}"') diff --git a/bbot/core/helpers/misc.py b/bbot/core/helpers/misc.py index 3f9e86425..14b58b73a 100644 --- a/bbot/core/helpers/misc.py +++ b/bbot/core/helpers/misc.py @@ -2,6 +2,7 @@ import re import sys import copy +import idna import json import atexit import codecs @@ -34,7 +35,6 @@ from .url import * # noqa F401 from .. import errors -from .punycode import * # noqa F401 from .logger import log_to_stderr from . import regexes as bbot_regexes from .names_generator import random_name, names, adjectives # noqa F401 @@ -106,21 +106,41 @@ def split_host_port(d): "192.168.1.1:443" --> (IPv4Address('192.168.1.1'), 443) "[dead::beef]:443" --> (IPv6Address('dead::beef'), 443) """ - if not "://" in d: - d = f"d://{d}" - parsed = urlparse(d) - port = None + d = str(d) host = None - with suppress(ValueError): - if parsed.port is None: - if parsed.scheme in ("https", "wss"): - port = 443 - elif parsed.scheme in ("http", "ws"): - port = 80 - else: - port = int(parsed.port) - with suppress(ValueError): - host = parsed.hostname + port = None + scheme = None + if is_ip(d): + return make_ip_type(d), port + + match = bbot_regexes.split_host_port_regex.match(d) + if match is None: + raise ValueError(f'split_port() failed to parse "{d}"') + scheme = match.group("scheme") + netloc = match.group("netloc") + if netloc is None: + raise ValueError(f'split_port() failed to parse "{d}"') + + match = bbot_regexes.extract_open_port_regex.match(netloc) + if match is None: + raise ValueError(f'split_port() failed to parse netloc "{netloc}"') + + host = match.group(2) + if host is None: + host = match.group(1) + if host is None: + raise ValueError(f'split_port() failed to locate host in netloc "{netloc}"') + + port = match.group(3) + if port is None and scheme is not None: + if scheme in ("https", "wss"): + port = 443 + elif scheme in ("http", "ws"): + port = 80 + elif port is not None: + with suppress(ValueError): + port = int(port) + return make_ip_type(host), port @@ -632,12 +652,13 @@ def make_netloc(host, port): ("192.168.1.1", None) --> "192.168.1.1" ("192.168.1.1", 443) --> "192.168.1.1:443" ("evilcorp.com", 80) --> "evilcorp.com:80" + ("dead::beef", None) --> "[dead::beef]" ("dead::beef", 443) --> "[dead::beef]:443" """ - if port is None: - return host if is_ip(host, version=6): host = f"[{host}]" + if port is None: + return host return f"{host}:{port}" @@ -898,10 +919,100 @@ def clean_old(d, keep=10, filter=lambda x: True, key=latest_mtime, reverse=True, def extract_emails(s): + """ + Extract email addresses from a body of text + """ for email in bbot_regexes.email_regex.findall(smart_decode(s)): yield email.lower() +def extract_host(s): + """ + Attempts to find and extract the host portion of a string. + + Args: + s (str): The string from which to extract the host. + + Returns: + tuple: A tuple containing three strings: + (hostname (None if not found), string_before_hostname, string_after_hostname). + + Examples: + >>> extract_host("evilcorp.com:80") + ("evilcorp.com", "", ":80") + + >>> extract_host("http://evilcorp.com:80/asdf.php?a=b") + ("evilcorp.com", "http://", ":80/asdf.php?a=b") + + >>> extract_host("bob@evilcorp.com") + ("evilcorp.com", "bob@", "") + + >>> extract_host("[dead::beef]:22") + ("dead::beef", "[", "]:22") + + >>> extract_host("ftp://username:password@my-ftp.com/my-file.csv") + ( + "my-ftp.com", + "ftp://username:password@", + "/my-file.csv", + ) + """ + s = smart_decode(s) + match = bbot_regexes.extract_host_regex.search(s) + + if match: + hostname = match.group(1) + before = s[: match.start(1)] + after = s[match.end(1) :] + host, port = split_host_port(hostname) + netloc = make_netloc(host, port) + if netloc != hostname: + # invalid host / port + return (None, s, "") + if host is not None: + if port is not None: + after = f":{port}{after}" + if is_ip(host, version=6) and hostname.startswith("["): + before = f"{before}[" + after = f"]{after}" + hostname = str(host) + return (hostname, before, after) + + return (None, s, "") + + +def smart_encode_punycode(text: str) -> str: + """ + ドメイン.テスト --> xn--eckwd4c7c.xn--zckzah + """ + host, before, after = extract_host(text) + if host is None: + return text + + try: + host = idna.encode(host).decode(errors="ignore") + except UnicodeError: + pass # If encoding fails, leave the host as it is + + return f"{before}{host}{after}" + + +def smart_decode_punycode(text: str) -> str: + """ + xn--eckwd4c7c.xn--zckzah --> ドメイン.テスト + """ + host, before, after = extract_host(text) + if host is None: + return text + + try: + host = idna.decode(host) + except UnicodeError: + pass # If decoding fails, leave the host as it is + + return f"{before}{host}{after}" + + def can_sudo_without_password(): """ Return True if the current user can sudo without a password diff --git a/bbot/core/helpers/punycode.py b/bbot/core/helpers/punycode.py deleted file mode 100644 index d7055f6db..000000000 --- a/bbot/core/helpers/punycode.py +++ /dev/null @@ -1,53 +0,0 @@ -import re -import idna - - -alphanum_regex = re.compile(r"([\w-]+)") -alphanum_anchored = re.compile(r"^[\w-]+$") - - -def split_text(text): - # Split text into segments by special characters - # We assume that only alphanumeric segments should be encoded - if not isinstance(text, str): - raise ValueError(f"data must be a string, not {type(text)}") - segments = alphanum_regex.split(text) - return segments - - -def smart_encode_punycode(text: str) -> str: - """ - ドメイン.テスト --> xn--eckwd4c7c.xn--zckzah - """ - segments = split_text(text) - result_segments = [] - - for segment in segments: - try: - if alphanum_anchored.match(segment): # Only encode alphanumeric segments - segment = idna.encode(segment).decode(errors="ignore") - except UnicodeError: - pass # If encoding fails, leave the segment as it is - - result_segments.append(segment) - - return "".join(result_segments) - - -def smart_decode_punycode(text: str) -> str: - """ - xn--eckwd4c7c.xn--zckzah --> ドメイン.テスト - """ - segments = split_text(text) - result_segments = [] - - for segment in segments: - try: - if alphanum_anchored.match(segment): # Only decode alphanumeric segments - segment = idna.decode(segment) - except UnicodeError: - pass # If decoding fails, leave the segment as it is - - result_segments.append(segment) - - return "".join(result_segments) diff --git a/bbot/core/helpers/regexes.py b/bbot/core/helpers/regexes.py index 5ed169345..3761b09e7 100644 --- a/bbot/core/helpers/regexes.py +++ b/bbot/core/helpers/regexes.py @@ -22,7 +22,7 @@ _ipv6_regex = r"[A-F0-9:]*:[A-F0-9:]*:[A-F0-9:]*" ipv6_regex = re.compile(_ipv6_regex, re.I) # dns names with periods -_dns_name_regex = r"(?:\w(?:[\w-]{0,100}\w)?\.)+[^\W_]{1,63}\.?" +_dns_name_regex = r"(?:\w(?:[\w-]{0,100}\w)?\.)+(?:[xX][nN]--)?[^\W_]{1,63}\.?" # dns names without periods _hostname_regex = r"(?!\w*\.\w+)\w(?:[\w-]{0,100}\w)?" _email_regex = r"(?:[^\W_][\w\-\.\+]{,100})@" + _dns_name_regex @@ -87,3 +87,14 @@ jquery_get_regex = re.compile(r"url:\s?[\"\'].+?\?(\w+)=") jquery_post_regex = re.compile(r"\$.post\([\'\"].+[\'\"].+\{(.+)\}") a_tag_regex = re.compile(r"]*href=[\"\'][^\"\'?>]*\?([^&\"\'=]+)") + +valid_netloc = r"[^\s!@#$%^&()=/?\\'\";~`<>]+" + +_split_host_port_regex = r"(?:(?P[a-z0-9]{1,20})://)?(?:[^?]*@)?(?P" + valid_netloc + ")" +split_host_port_regex = re.compile(_split_host_port_regex, re.I) + +_extract_open_port_regex = r"(?:(?:\[([0-9a-f:]+)\])|([^\s:]+))(?::(\d{1,5}))?" +extract_open_port_regex = re.compile(_extract_open_port_regex) + +_extract_host_regex = r"(?:[a-z0-9]{1,20}://)?(?:[^?]*@)?(" + valid_netloc + ")" +extract_host_regex = re.compile(_extract_host_regex, re.I) diff --git a/bbot/core/helpers/validators.py b/bbot/core/helpers/validators.py index 3fa759b95..82d7a38d4 100644 --- a/bbot/core/helpers/validators.py +++ b/bbot/core/helpers/validators.py @@ -4,8 +4,7 @@ from bbot.core.helpers import regexes from bbot.core.helpers.url import parse_url, hash_url -from bbot.core.helpers.punycode import smart_decode_punycode -from bbot.core.helpers.misc import split_host_port, make_netloc, is_ip +from bbot.core.helpers.misc import smart_encode_punycode, split_host_port, make_netloc, is_ip log = logging.getLogger("bbot.core.helpers.validators") @@ -57,7 +56,7 @@ def validate_host(host): return str(ip) except Exception: # finally, try DNS_NAME - host = smart_decode_punycode(host) + host = smart_encode_punycode(host) # clean asterisks and clinging dashes host = host.strip("*.-").replace("*", "") for r in regexes.event_type_regexes["DNS_NAME"]: @@ -89,7 +88,7 @@ def validate_severity(severity): @validator def validate_email(email): - email = smart_decode_punycode(str(email).strip().lower()) + email = smart_encode_punycode(str(email).strip().lower()) if any(r.match(email) for r in regexes.event_type_regexes["EMAIL_ADDRESS"]): return email assert False, f'Invalid email: "{email}"' diff --git a/bbot/test/test_step_1/test_events.py b/bbot/test/test_step_1/test_events.py index 7c7563c1a..842b91f9c 100644 --- a/bbot/test/test_step_1/test_events.py +++ b/bbot/test/test_step_1/test_events.py @@ -245,21 +245,94 @@ async def test_events(events, scan, helpers, bbot_config): {"host": "evilcorp.com", "severity": "WACK", "description": "asdf"}, "VULNERABILITY", dummy=True ) - # punycode + # punycode - event type detection + + # japanese assert scan.make_event("ドメイン.テスト", dummy=True).type == "DNS_NAME" assert scan.make_event("bob@ドメイン.テスト", dummy=True).type == "EMAIL_ADDRESS" + assert scan.make_event("テスト@ドメイン.テスト", dummy=True).type == "EMAIL_ADDRESS" assert scan.make_event("ドメイン.テスト:80", dummy=True).type == "OPEN_TCP_PORT" assert scan.make_event("http://ドメイン.テスト:80", dummy=True).type == "URL_UNVERIFIED" - - assert scan.make_event("xn--eckwd4c7c.xn--zckzah", dummy=True).data == "ドメイン.テスト" - assert scan.make_event("bob@xn--eckwd4c7c.xn--zckzah", dummy=True).data == "bob@ドメイン.テスト" - assert scan.make_event("xn--eckwd4c7c.xn--zckzah:80", dummy=True).data == "ドメイン.テスト:80" - assert scan.make_event("http://xn--eckwd4c7c.xn--zckzah:80", dummy=True).data == "http://ドメイン.テスト/" + assert scan.make_event("http://ドメイン.テスト:80/テスト", dummy=True).type == "URL_UNVERIFIED" assert scan.make_event("xn--eckwd4c7c.xn--zckzah", dummy=True).type == "DNS_NAME" assert scan.make_event("bob@xn--eckwd4c7c.xn--zckzah", dummy=True).type == "EMAIL_ADDRESS" + assert scan.make_event("テスト@xn--eckwd4c7c.xn--zckzah", dummy=True).type == "EMAIL_ADDRESS" assert scan.make_event("xn--eckwd4c7c.xn--zckzah:80", dummy=True).type == "OPEN_TCP_PORT" assert scan.make_event("http://xn--eckwd4c7c.xn--zckzah:80", dummy=True).type == "URL_UNVERIFIED" + assert scan.make_event("http://xn--eckwd4c7c.xn--zckzah:80/テスト", dummy=True).type == "URL_UNVERIFIED" + + # thai + assert scan.make_event("เราเที่ยวด้วยกัน.com", dummy=True).type == "DNS_NAME" + assert scan.make_event("bob@เราเที่ยวด้วยกัน.com", dummy=True).type == "EMAIL_ADDRESS" + assert scan.make_event("ทดสอบ@เราเที่ยวด้วยกัน.com", dummy=True).type == "EMAIL_ADDRESS" + assert scan.make_event("เราเที่ยวด้วยกัน.com:80", dummy=True).type == "OPEN_TCP_PORT" + assert scan.make_event("http://เราเที่ยวด้วยกัน.com:80", dummy=True).type == "URL_UNVERIFIED" + assert scan.make_event("http://เราเที่ยวด้วยกัน.com:80/ทดสอบ", dummy=True).type == "URL_UNVERIFIED" + + assert scan.make_event("xn--12c1bik6bbd8ab6hd1b5jc6jta.com", dummy=True).type == "DNS_NAME" + assert scan.make_event("bob@xn--12c1bik6bbd8ab6hd1b5jc6jta.com", dummy=True).type == "EMAIL_ADDRESS" + assert scan.make_event("ทดสอบ@xn--12c1bik6bbd8ab6hd1b5jc6jta.com", dummy=True).type == "EMAIL_ADDRESS" + assert scan.make_event("xn--12c1bik6bbd8ab6hd1b5jc6jta.com:80", dummy=True).type == "OPEN_TCP_PORT" + assert scan.make_event("http://xn--12c1bik6bbd8ab6hd1b5jc6jta.com:80", dummy=True).type == "URL_UNVERIFIED" + assert scan.make_event("http://xn--12c1bik6bbd8ab6hd1b5jc6jta.com:80/ทดสอบ", dummy=True).type == "URL_UNVERIFIED" + + # punycode - encoding / decoding tests + + # japanese + assert scan.make_event("xn--eckwd4c7c.xn--zckzah", dummy=True).data == "xn--eckwd4c7c.xn--zckzah" + assert scan.make_event("bob@xn--eckwd4c7c.xn--zckzah", dummy=True).data == "bob@xn--eckwd4c7c.xn--zckzah" + assert scan.make_event("テスト@xn--eckwd4c7c.xn--zckzah", dummy=True).data == "テスト@xn--eckwd4c7c.xn--zckzah" + assert scan.make_event("xn--eckwd4c7c.xn--zckzah:80", dummy=True).data == "xn--eckwd4c7c.xn--zckzah:80" + assert scan.make_event("http://xn--eckwd4c7c.xn--zckzah:80", dummy=True).data == "http://xn--eckwd4c7c.xn--zckzah/" + assert ( + scan.make_event("http://xn--eckwd4c7c.xn--zckzah:80/テスト", dummy=True).data + == "http://xn--eckwd4c7c.xn--zckzah/テスト" + ) + + assert scan.make_event("ドメイン.テスト", dummy=True).data == "xn--eckwd4c7c.xn--zckzah" + assert scan.make_event("bob@ドメイン.テスト", dummy=True).data == "bob@xn--eckwd4c7c.xn--zckzah" + assert scan.make_event("テスト@ドメイン.テスト", dummy=True).data == "テスト@xn--eckwd4c7c.xn--zckzah" + assert scan.make_event("ドメイン.テスト:80", dummy=True).data == "xn--eckwd4c7c.xn--zckzah:80" + assert scan.make_event("http://ドメイン.テスト:80", dummy=True).data == "http://xn--eckwd4c7c.xn--zckzah/" + assert scan.make_event("http://ドメイン.テスト:80/テスト", dummy=True).data == "http://xn--eckwd4c7c.xn--zckzah/テスト" + # thai + assert ( + scan.make_event("xn--12c1bik6bbd8ab6hd1b5jc6jta.com", dummy=True).data == "xn--12c1bik6bbd8ab6hd1b5jc6jta.com" + ) + assert ( + scan.make_event("bob@xn--12c1bik6bbd8ab6hd1b5jc6jta.com", dummy=True).data + == "bob@xn--12c1bik6bbd8ab6hd1b5jc6jta.com" + ) + assert ( + scan.make_event("ทดสอบ@xn--12c1bik6bbd8ab6hd1b5jc6jta.com", dummy=True).data + == "ทดสอบ@xn--12c1bik6bbd8ab6hd1b5jc6jta.com" + ) + assert ( + scan.make_event("xn--12c1bik6bbd8ab6hd1b5jc6jta.com:80", dummy=True).data + == "xn--12c1bik6bbd8ab6hd1b5jc6jta.com:80" + ) + assert ( + scan.make_event("http://xn--12c1bik6bbd8ab6hd1b5jc6jta.com:80", dummy=True).data + == "http://xn--12c1bik6bbd8ab6hd1b5jc6jta.com/" + ) + assert ( + scan.make_event("http://xn--12c1bik6bbd8ab6hd1b5jc6jta.com:80/ทดสอบ", dummy=True).data + == "http://xn--12c1bik6bbd8ab6hd1b5jc6jta.com/ทดสอบ" + ) + + assert scan.make_event("เราเที่ยวด้วยกัน.com", dummy=True).data == "xn--12c1bik6bbd8ab6hd1b5jc6jta.com" + assert scan.make_event("bob@เราเที่ยวด้วยกัน.com", dummy=True).data == "bob@xn--12c1bik6bbd8ab6hd1b5jc6jta.com" + assert scan.make_event("ทดสอบ@เราเที่ยวด้วยกัน.com", dummy=True).data == "ทดสอบ@xn--12c1bik6bbd8ab6hd1b5jc6jta.com" + assert scan.make_event("เราเที่ยวด้วยกัน.com:80", dummy=True).data == "xn--12c1bik6bbd8ab6hd1b5jc6jta.com:80" + assert ( + scan.make_event("http://เราเที่ยวด้วยกัน.com:80", dummy=True).data + == "http://xn--12c1bik6bbd8ab6hd1b5jc6jta.com/" + ) + assert ( + scan.make_event("http://เราเที่ยวด้วยกัน.com:80/ทดสอบ", dummy=True).data + == "http://xn--12c1bik6bbd8ab6hd1b5jc6jta.com/ทดสอบ" + ) # test event serialization from bbot.core.event import event_from_json diff --git a/bbot/test/test_step_1/test_helpers.py b/bbot/test/test_step_1/test_helpers.py index 016e6d79f..abf09cadc 100644 --- a/bbot/test/test_step_1/test_helpers.py +++ b/bbot/test/test_step_1/test_helpers.py @@ -108,6 +108,54 @@ async def test_helpers_misc(helpers, scan, bbot_scanner, bbot_config, bbot_https "b@b.com", ) + assert helpers.extract_host("evilcorp.com:80") == ("evilcorp.com", "", ":80") + assert helpers.extract_host("http://evilcorp.com:80/asdf.php?a=b") == ( + "evilcorp.com", + "http://", + ":80/asdf.php?a=b", + ) + assert helpers.extract_host("http://evilcorp.com:80/asdf.php?a=b@a.com") == ( + "evilcorp.com", + "http://", + ":80/asdf.php?a=b@a.com", + ) + assert helpers.extract_host("bob@evilcorp.com") == ("evilcorp.com", "bob@", "") + assert helpers.extract_host("[dead::beef]:22") == ("dead::beef", "[", "]:22") + assert helpers.extract_host("scp://[dead::beef]:22") == ("dead::beef", "scp://[", "]:22") + assert helpers.extract_host("https://[dead::beef]:22?a=b") == ("dead::beef", "https://[", "]:22?a=b") + assert helpers.extract_host("https://[dead::beef]/?a=b") == ("dead::beef", "https://[", "]/?a=b") + assert helpers.extract_host("https://[dead::beef]?a=b") == ("dead::beef", "https://[", "]?a=b") + assert helpers.extract_host("ftp://username:password@my-ftp.com/my-file.csv") == ( + "my-ftp.com", + "ftp://username:password@", + "/my-file.csv", + ) + assert helpers.extract_host("ftp://username:p@ssword@my-ftp.com/my-file.csv") == ( + "my-ftp.com", + "ftp://username:p@ssword@", + "/my-file.csv", + ) + assert helpers.extract_host("ftp://username:password:/@my-ftp.com/my-file.csv") == ( + "my-ftp.com", + "ftp://username:password:/@", + "/my-file.csv", + ) + assert helpers.extract_host("ftp://username:password:/@dead::beef/my-file.csv") == ( + None, + "ftp://username:password:/@dead::beef/my-file.csv", + "", + ) + assert helpers.extract_host("ftp://username:password:/@[dead::beef]/my-file.csv") == ( + "dead::beef", + "ftp://username:password:/@[", + "]/my-file.csv", + ) + assert helpers.extract_host("ftp://username:password:/@[dead::beef]:22/my-file.csv") == ( + "dead::beef", + "ftp://username:password:/@[", + "]:22/my-file.csv", + ) + assert helpers.split_domain("www.evilcorp.co.uk") == ("www", "evilcorp.co.uk") assert helpers.split_domain("asdf.www.test.notreal") == ("asdf.www", "test.notreal") assert helpers.split_domain("www.test.notreal") == ("www", "test.notreal") @@ -118,8 +166,13 @@ async def test_helpers_misc(helpers, scan, bbot_scanner, bbot_config, bbot_https assert helpers.split_host_port("http://evilcorp.co.uk:666") == ("evilcorp.co.uk", 666) assert helpers.split_host_port("evilcorp.co.uk:666") == ("evilcorp.co.uk", 666) assert helpers.split_host_port("evilcorp.co.uk") == ("evilcorp.co.uk", None) + assert helpers.split_host_port("192.168.0.1") == (ipaddress.ip_address("192.168.0.1"), None) + assert helpers.split_host_port("192.168.0.1:80") == (ipaddress.ip_address("192.168.0.1"), 80) + assert helpers.split_host_port("[e]:80") == ("e", 80) assert helpers.split_host_port("d://wat:wat") == ("wat", None) assert helpers.split_host_port("https://[dead::beef]:8338") == (ipaddress.ip_address("dead::beef"), 8338) + assert helpers.split_host_port("[dead::beef]") == (ipaddress.ip_address("dead::beef"), None) + assert helpers.split_host_port("dead::beef") == (ipaddress.ip_address("dead::beef"), None) extracted_words = helpers.extract_words("blacklanternsecurity") assert "black" in extracted_words # assert "blacklantern" in extracted_words @@ -346,10 +399,6 @@ async def test_helpers_misc(helpers, scan, bbot_scanner, bbot_config, bbot_https assert helpers.smart_decode_punycode("bob_smith@xn--eckwd4c7c.xn--zckzah") == "bob_smith@ドメイン.テスト" assert helpers.smart_encode_punycode("ドメイン.テスト:80") == "xn--eckwd4c7c.xn--zckzah:80" assert helpers.smart_decode_punycode("xn--eckwd4c7c.xn--zckzah:80") == "ドメイン.テスト:80" - with pytest.raises(ValueError): - helpers.smart_decode_punycode(b"asdf") - with pytest.raises(ValueError): - helpers.smart_encode_punycode(b"asdf") assert helpers.recursive_decode("Hello%20world%21") == "Hello world!" assert helpers.recursive_decode("Hello%20%5Cu041f%5Cu0440%5Cu0438%5Cu0432%5Cu0435%5Cu0442") == "Hello Привет" diff --git a/bbot/test/test_step_1/test_regexes.py b/bbot/test/test_step_1/test_regexes.py index db889ec9c..7807e6c79 100644 --- a/bbot/test/test_step_1/test_regexes.py +++ b/bbot/test/test_step_1/test_regexes.py @@ -1,4 +1,5 @@ import pytest +import traceback from bbot.core.event.helpers import get_event_type from bbot.core.helpers import regexes @@ -40,7 +41,7 @@ def test_dns_name_regexes(): assert not r.match(dns), f"BAD DNS NAME: {dns} matched regex: {r}" try: - event_type = get_event_type(dns) + event_type, _ = get_event_type(dns) if event_type == "OPEN_TCP_PORT": assert dns == "evilcorp.com:80" continue @@ -56,7 +57,7 @@ def test_dns_name_regexes(): for dns in good_dns: matches = list(r.match(dns) for r in dns_name_regexes) assert any(matches), f"Good DNS_NAME {dns} did not match regexes" - event_type = get_event_type(dns) + event_type, _ = get_event_type(dns) if not event_type == "DNS_NAME": assert ( dns == "1.2.3.4" and event_type == "IP_ADDRESS" @@ -102,7 +103,7 @@ def test_open_port_regexes(): assert not r.match(open_port), f"BAD OPEN_TCP_PORT: {open_port} matched regex: {r}" try: - event_type = get_event_type(open_port) + event_type, _ = get_event_type(open_port) if event_type == "IP_ADDRESS": assert open_port in ("1.2.3.4", "[dead::beef]") continue @@ -118,7 +119,7 @@ def test_open_port_regexes(): for open_port in good_ports: matches = list(r.match(open_port) for r in open_port_regexes) assert any(matches), f"Good OPEN_TCP_PORT {open_port} did not match regexes" - event_type = get_event_type(open_port) + event_type, _ = get_event_type(open_port) assert event_type == "OPEN_TCP_PORT" @@ -170,7 +171,7 @@ def test_url_regexes(): event_type = "" try: - event_type = get_event_type(bad_url) + event_type, _ = get_event_type(bad_url) if event_type == "DNS_NAME": assert bad_url == "evilcorp.com" continue @@ -178,9 +179,11 @@ def test_url_regexes(): except ValidationError: continue except Exception as e: - pytest.fail(f"BAD URL: {bad_url} raised unknown error: {e}") + pytest.fail(f"BAD URL: {bad_url} raised unknown error: {e}: {traceback.format_exc()}") for good_url in good_urls: matches = list(r.match(good_url) for r in url_regexes) assert any(matches), f"Good URL {good_url} did not match regexes" - assert get_event_type(good_url) == "URL_UNVERIFIED", f"Event type for URL {good_url} was not properly detected" + assert ( + get_event_type(good_url)[0] == "URL_UNVERIFIED" + ), f"Event type for URL {good_url} was not properly detected"