Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Internationalization Support (Punycode) #741

Merged
merged 8 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions bbot/core/event/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
smart_decode,
get_file_extension,
validators,
smart_decode_punycode,
tagify,
)

Expand Down Expand Up @@ -982,9 +981,7 @@ def make_event(
return data
else:
if event_type is None:
if isinstance(data, str):
data = smart_decode_punycode(data)
event_type = get_event_type(data)
event_type, data = get_event_type(data)
if not dummy:
log.debug(f'Autodetected event type "{event_type}" based on data: "{data}"')

Expand Down
13 changes: 7 additions & 6 deletions bbot/core/event/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from contextlib import suppress

from bbot.core.errors import ValidationError
from bbot.core.helpers import sha1, smart_decode, smart_decode_punycode
from bbot.core.helpers import sha1, smart_decode, smart_encode_punycode
from bbot.core.helpers.regexes import event_type_regexes, event_id_regex


Expand All @@ -14,25 +14,26 @@ def get_event_type(data):
"""
Attempt to divine event type from data
"""
data = smart_decode_punycode(smart_decode(data).strip())

# IP address
with suppress(Exception):
ipaddress.ip_address(data)
return "IP_ADDRESS"
return "IP_ADDRESS", data

# IP network
with suppress(Exception):
ipaddress.ip_network(data, strict=False)
return "IP_RANGE"
return "IP_RANGE", data

data = smart_encode_punycode(smart_decode(data).strip())

# Strict regexes
for t, regexes in event_type_regexes.items():
for r in regexes:
if r.match(data):
if t == "URL":
return "URL_UNVERIFIED"
return t
return "URL_UNVERIFIED", data
return t, data

raise ValidationError(f'Unable to autodetect event type from "{data}"')

Expand Down
145 changes: 128 additions & 17 deletions bbot/core/helpers/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import re
import sys
import copy
import idna
import json
import atexit
import codecs
Expand Down Expand Up @@ -34,7 +35,6 @@

from .url import * # noqa F401
from .. import errors
from .punycode import * # noqa F401
from .logger import log_to_stderr
from . import regexes as bbot_regexes
from .names_generator import random_name, names, adjectives # noqa F401
Expand Down Expand Up @@ -106,21 +106,41 @@ def split_host_port(d):
"192.168.1.1:443" --> (IPv4Address('192.168.1.1'), 443)
"[dead::beef]:443" --> (IPv6Address('dead::beef'), 443)
"""
if not "://" in d:
d = f"d://{d}"
parsed = urlparse(d)
port = None
d = str(d)
host = None
with suppress(ValueError):
if parsed.port is None:
if parsed.scheme in ("https", "wss"):
port = 443
elif parsed.scheme in ("http", "ws"):
port = 80
else:
port = int(parsed.port)
with suppress(ValueError):
host = parsed.hostname
port = None
scheme = None
if is_ip(d):
return make_ip_type(d), port

match = bbot_regexes.split_host_port_regex.match(d)
if match is None:
raise ValueError(f'split_port() failed to parse "{d}"')
scheme = match.group("scheme")
netloc = match.group("netloc")
if netloc is None:
raise ValueError(f'split_port() failed to parse "{d}"')

match = bbot_regexes.extract_open_port_regex.match(netloc)
if match is None:
raise ValueError(f'split_port() failed to parse netloc "{netloc}"')

host = match.group(2)
if host is None:
host = match.group(1)
if host is None:
raise ValueError(f'split_port() failed to locate host in netloc "{netloc}"')

port = match.group(3)
if port is None and scheme is not None:
if scheme in ("https", "wss"):
port = 443
elif scheme in ("http", "ws"):
port = 80
elif port is not None:
with suppress(ValueError):
port = int(port)

return make_ip_type(host), port


Expand Down Expand Up @@ -632,12 +652,13 @@ def make_netloc(host, port):
("192.168.1.1", None) --> "192.168.1.1"
("192.168.1.1", 443) --> "192.168.1.1:443"
("evilcorp.com", 80) --> "evilcorp.com:80"
("dead::beef", None) --> "[dead::beef]"
("dead::beef", 443) --> "[dead::beef]:443"
"""
if port is None:
return host
if is_ip(host, version=6):
host = f"[{host}]"
if port is None:
return host
return f"{host}:{port}"


Expand Down Expand Up @@ -898,10 +919,100 @@ def clean_old(d, keep=10, filter=lambda x: True, key=latest_mtime, reverse=True,


def extract_emails(s):
"""
Extract email addresses from a body of text
"""
for email in bbot_regexes.email_regex.findall(smart_decode(s)):
yield email.lower()


def extract_host(s):
"""
Attempts to find and extract the host portion of a string.

Args:
s (str): The string from which to extract the host.

Returns:
tuple: A tuple containing three strings:
(hostname (None if not found), string_before_hostname, string_after_hostname).

Examples:
>>> extract_host("evilcorp.com:80")
("evilcorp.com", "", ":80")

>>> extract_host("http://evilcorp.com:80/asdf.php?a=b")
("evilcorp.com", "http://", ":80/asdf.php?a=b")

>>> extract_host("[email protected]")
("evilcorp.com", "bob@", "")

>>> extract_host("[dead::beef]:22")
("dead::beef", "[", "]:22")

>>> extract_host("ftp://username:[email protected]/my-file.csv")
(
"my-ftp.com",
"ftp://username:password@",
"/my-file.csv",
)
"""
s = smart_decode(s)
match = bbot_regexes.extract_host_regex.search(s)

if match:
hostname = match.group(1)
before = s[: match.start(1)]
after = s[match.end(1) :]
host, port = split_host_port(hostname)
netloc = make_netloc(host, port)
if netloc != hostname:
# invalid host / port
return (None, s, "")
if host is not None:
if port is not None:
after = f":{port}{after}"
if is_ip(host, version=6) and hostname.startswith("["):
before = f"{before}["
after = f"]{after}"
hostname = str(host)
return (hostname, before, after)

return (None, s, "")


def smart_encode_punycode(text: str) -> str:
"""
ドメイン.テスト --> xn--eckwd4c7c.xn--zckzah
"""
host, before, after = extract_host(text)
if host is None:
return text

try:
host = idna.encode(host).decode(errors="ignore")
except UnicodeError:
pass # If encoding fails, leave the host as it is

return f"{before}{host}{after}"


def smart_decode_punycode(text: str) -> str:
"""
xn--eckwd4c7c.xn--zckzah --> ドメイン.テスト
"""
host, before, after = extract_host(text)
if host is None:
return text

try:
host = idna.decode(host)
except UnicodeError:
pass # If decoding fails, leave the host as it is

return f"{before}{host}{after}"


def can_sudo_without_password():
"""
Return True if the current user can sudo without a password
Expand Down
53 changes: 0 additions & 53 deletions bbot/core/helpers/punycode.py

This file was deleted.

13 changes: 12 additions & 1 deletion bbot/core/helpers/regexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
_ipv6_regex = r"[A-F0-9:]*:[A-F0-9:]*:[A-F0-9:]*"
ipv6_regex = re.compile(_ipv6_regex, re.I)
# dns names with periods
_dns_name_regex = r"(?:\w(?:[\w-]{0,100}\w)?\.)+[^\W_]{1,63}\.?"
_dns_name_regex = r"(?:\w(?:[\w-]{0,100}\w)?\.)+(?:[xX][nN]--)?[^\W_]{1,63}\.?"
# dns names without periods
_hostname_regex = r"(?!\w*\.\w+)\w(?:[\w-]{0,100}\w)?"
_email_regex = r"(?:[^\W_][\w\-\.\+]{,100})@" + _dns_name_regex
Expand Down Expand Up @@ -87,3 +87,14 @@
jquery_get_regex = re.compile(r"url:\s?[\"\'].+?\?(\w+)=")
jquery_post_regex = re.compile(r"\$.post\([\'\"].+[\'\"].+\{(.+)\}")
a_tag_regex = re.compile(r"<a[^>]*href=[\"\'][^\"\'?>]*\?([^&\"\'=]+)")

valid_netloc = r"[^\s!@#$%^&()=/?\\'\";~`<>]+"

_split_host_port_regex = r"(?:(?P<scheme>[a-z0-9]{1,20})://)?(?:[^?]*@)?(?P<netloc>" + valid_netloc + ")"
split_host_port_regex = re.compile(_split_host_port_regex, re.I)

_extract_open_port_regex = r"(?:(?:\[([0-9a-f:]+)\])|([^\s:]+))(?::(\d{1,5}))?"
extract_open_port_regex = re.compile(_extract_open_port_regex)

_extract_host_regex = r"(?:[a-z0-9]{1,20}://)?(?:[^?]*@)?(" + valid_netloc + ")"
extract_host_regex = re.compile(_extract_host_regex, re.I)
7 changes: 3 additions & 4 deletions bbot/core/helpers/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@

from bbot.core.helpers import regexes
from bbot.core.helpers.url import parse_url, hash_url
from bbot.core.helpers.punycode import smart_decode_punycode
from bbot.core.helpers.misc import split_host_port, make_netloc, is_ip
from bbot.core.helpers.misc import smart_encode_punycode, split_host_port, make_netloc, is_ip

log = logging.getLogger("bbot.core.helpers.validators")

Expand Down Expand Up @@ -57,7 +56,7 @@ def validate_host(host):
return str(ip)
except Exception:
# finally, try DNS_NAME
host = smart_decode_punycode(host)
host = smart_encode_punycode(host)
# clean asterisks and clinging dashes
host = host.strip("*.-").replace("*", "")
for r in regexes.event_type_regexes["DNS_NAME"]:
Expand Down Expand Up @@ -89,7 +88,7 @@ def validate_severity(severity):

@validator
def validate_email(email):
email = smart_decode_punycode(str(email).strip().lower())
email = smart_encode_punycode(str(email).strip().lower())
if any(r.match(email) for r in regexes.event_type_regexes["EMAIL_ADDRESS"]):
return email
assert False, f'Invalid email: "{email}"'
Expand Down
Loading