diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index f53e1be..a3601f1 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -32,7 +32,6 @@ jobs: pip install -r requirements_build.txt >/dev/null pip install -r requirements.txt >/dev/null - # NOTE: timeout for running the app includes db migrations - name: Testing to build with PIP run: | cd /tmp diff --git a/.gitignore b/.gitignore index 0a44966..a263c02 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ __pychache__ -venv/ \ No newline at end of file +venv/ +.idea/ +src/oxl_utils.egg-info \ No newline at end of file diff --git a/README.md b/README.md index 20cc800..41e372c 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,39 @@ # Utils Collection +## Data States + +```python3 +from oxl_utils.state import is_set +from oxl_utils.state import is_null +``` + +## Validators + +```python3 +# validate email format +from oxl_utils.valid.email import valid_email + +# also check for valid MX record of e-mail domain +from oxl_utils.valid.email import valid_email_dns + +# ips and networks +from oxl_utils.valid.net import valid_ip +from oxl_utils.valid.net import valid_ip4 +from oxl_utils.valid.net import valid_ip6 +from oxl_utils.valid.net import valid_net4 +from oxl_utils.valid.net import valid_net6 +from oxl_utils.valid.net import valid_public_ip +from oxl_utils.valid.net import valid_asn + +from oxl_utils.valid.dns import valid_domain +from oxl_utils.valid.email import valid_email +from oxl_utils.valid.email import has_mailserver + +``` + +## Django + +```python3 +# fix datetime timezone +from oxl_utils.dj.dt import datetime_from_db +``` diff --git a/requirements.txt b/requirements.txt index 9850437..37466bd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ -pytz \ No newline at end of file +pytz +dnspython \ No newline at end of file diff --git a/src/oxl_utils/debug.py b/src/oxl_utils/debug.py new file mode 100644 index 0000000..aa90ed0 --- /dev/null +++ b/src/oxl_utils/debug.py @@ -0,0 +1,5 @@ +from os import environ + + +def mode_debug() -> bool: + return 'DEBUG' in environ and environ['DEBUG'] == '1' diff --git a/src/oxl_utils/django/__init__.py b/src/oxl_utils/dj/__init__.py similarity index 100% rename from src/oxl_utils/django/__init__.py rename to src/oxl_utils/dj/__init__.py diff --git a/src/oxl_utils/dj/dt.py b/src/oxl_utils/dj/dt.py new file mode 100644 index 0000000..99ec9e5 --- /dev/null +++ b/src/oxl_utils/dj/dt.py @@ -0,0 +1,23 @@ +from os import environ +from datetime import datetime + +from pytz import utc, timezone + + +def datetime_from_db(dt: (datetime, None), tz: (timezone, str) = None) -> (datetime, None): + if not isinstance(dt, datetime): + return None + + if tz in [None, '']: + try: + tz = environ['TIMEZONE'] + + except KeyError: + raise EnvironmentError('TIMEZONE not provided') + + if isinstance(tz, str): + tz = timezone(tz) + + # datetime form db will always be UTC; convert it + local_dt = dt.replace(tzinfo=utc).astimezone(tz) + return tz.normalize(local_dt) diff --git a/src/oxl_utils/dj/model.py b/src/oxl_utils/dj/model.py new file mode 100644 index 0000000..d30b625 --- /dev/null +++ b/src/oxl_utils/dj/model.py @@ -0,0 +1,5 @@ +CHOICES_BOOL = ( + (True, 'Yes'), + (False, 'No') +) +DEFAULT_NONE = {'null': True, 'default': None, 'blank': True} diff --git a/src/oxl_utils/django/dt.py b/src/oxl_utils/django/dt.py deleted file mode 100644 index 3a8b075..0000000 --- a/src/oxl_utils/django/dt.py +++ /dev/null @@ -1,13 +0,0 @@ -from datetime import datetime - -from pytz import utc -from - - -def datetime_from_db(dt: (datetime, None), timezone: str) -> (datetime, None): - # datetime form db will always be UTC; convert it - if not isinstance(dt, datetime): - return None - - local_dt = dt.replace(tzinfo=utc).astimezone(TIMEZONE) - return TIMEZONE.normalize(local_dt) diff --git a/src/oxl_utils/dt.py b/src/oxl_utils/dt.py new file mode 100644 index 0000000..3087a8c --- /dev/null +++ b/src/oxl_utils/dt.py @@ -0,0 +1,18 @@ +from os import environ +from datetime import datetime + +from pytz import timezone + + +def datetime_w_tz(tz: (timezone, str) = None) -> datetime: + if tz in [None, '']: + try: + tz = environ['TIMEZONE'] + + except KeyError: + raise EnvironmentError('TIMEZONE not provided') + + if isinstance(tz, str): + tz = timezone(tz) + + return datetime.now(tz) diff --git a/src/oxl_utils/log.py b/src/oxl_utils/log.py new file mode 100644 index 0000000..b46a7e1 --- /dev/null +++ b/src/oxl_utils/log.py @@ -0,0 +1,56 @@ +from os import getpid, environ +from sys import stderr, stdout +from inspect import stack as inspect_stack +from inspect import getfile as inspect_getfile + +from .dt import datetime_w_tz +from .debug import mode_debug + +LOG_FORMAT = 'default' if 'LOG_FORMAT' not in environ else environ['LOG_FORMAT'] +LOG_TIME_FORMAT = '%Y-%m-%d %H:%M:%S %z' +PID = getpid() + +LEVEL_NAME_MAPPING = { + 1: 'FATAL', + 2: 'ERROR', + 3: 'WARN', + 4: 'INFO', + 5: 'INFO', + 6: 'DEBUG', + 7: 'DEBUG', +} + + +def _log_formatted(level: int, msg: str, mid: str = '') -> str: + if LOG_FORMAT == 'gunicorn': + return f"[{datetime_w_tz().strftime(LOG_TIME_FORMAT)}] [{PID}] [{LEVEL_NAME_MAPPING[level]}] {mid}{msg}" + + return f"{datetime_w_tz().strftime(LOG_TIME_FORMAT)} {LEVEL_NAME_MAPPING[level]} {mid}{msg}" + + +def log(msg: str, level: int = 3): + debug = mode_debug() + prefix_caller = '' + + if level > 5 and not debug: + return + + if debug: + caller = inspect_getfile(inspect_stack()[1][0]).rsplit('/', 1)[1].rsplit('.', 1)[0] + prefix_caller = f'[{caller}] ' + + stdout.write(_log_formatted(msg=msg, level=level, mid=prefix_caller)) + + +def log_warn(msg: str, _stderr: bool = False): + msg = f'\x1b[1;33m{_log_formatted(msg=msg, level=3)}\x1b[0m\n' + + if _stderr: + stderr.write(msg) + + else: + stdout.write(msg) + + +def log_error(msg: str): + stderr.write(f'\033[01;{_log_formatted(msg=msg, level=2)}\x1b[0m\n') diff --git a/src/oxl_utils/net.py b/src/oxl_utils/net.py new file mode 100644 index 0000000..d329638 --- /dev/null +++ b/src/oxl_utils/net.py @@ -0,0 +1,57 @@ +from socket import socket, AF_INET, AF_INET6, SOCK_STREAM +from os import environ + +from dns.resolver import Resolver, NoAnswer, NXDOMAIN, LifetimeTimeout, NoNameservers +from dns.exception import SyntaxError as DNSSyntaxError + +from .valid.net import valid_ip + +DEFAULT_NAMESERVERS = ['1.1.1.1', '8.8.8.8'] +NS_ENV_KEY = 'NAMESERVERS' + +dns_resolver = Resolver(configure=False) +dns_resolver.lifetime = 0.1 +dns_resolver.timeout = 0.1 + +dns_resolver.nameservers = environ[NS_ENV_KEY].split(',') if NS_ENV_KEY in environ else DEFAULT_NAMESERVERS + + +def resolve_dns(v: str, t: str = 'A', timeout: float = dns_resolver.timeout) -> list[str]: + try: + if t != 'PTR': + r = [r.to_text() for r in dns_resolver.resolve(v, t, lifetime=timeout)] + + else: + r = [r.to_text() for r in dns_resolver.resolve_address(v, lifetime=timeout)] + + r.sort() + return r + + except (IndexError, NoAnswer, NXDOMAIN, DNSSyntaxError, NoNameservers, LifetimeTimeout): + return [] + + +def resolve_first_ip(v: str) -> (str, None): + r = resolve_dns(v, t='A') + if len(r) > 0: + return r[0] + + r = resolve_dns(v, t='AAAA') + if len(r) > 0: + return r[0] + + return None + + +def is_port_open(target: str, port: (str, int), timeout: int = 1) -> bool: + ip = target + if not valid_ip(target): + ip = resolve_first_ip(target) + if ip is None: + return False + + ip_proto = AF_INET if ip.find(':') == -1 else AF_INET6 + + with socket(ip_proto, SOCK_STREAM) as s: + s.settimeout(timeout) + return s.connect_ex((ip, int(port))) == 0 diff --git a/src/oxl_utils/net_test.py b/src/oxl_utils/net_test.py new file mode 100644 index 0000000..f426466 --- /dev/null +++ b/src/oxl_utils/net_test.py @@ -0,0 +1,64 @@ +import pytest + + +@pytest.mark.parametrize('v, s', [ + ( + {'v': 'oxl.at', 't': 'A'}, + {'exist': True, 'len': 1}, + ), + ( + {'v': 'oxl.at', 't': 'AAAA'}, + {'exist': True, 'len': 1}, + ), + ( + {'v': 'xyz.oxl.at', 't': 'MX'}, + {'exist': False, 'len': 0}, + ), + ( + {'v': 'xyz.oxl.at', 't': 'MX'}, + {'exist': False, 'len': 0}, + ), + ( + {'v': '1.1.1.1', 't': 'PTR'}, + {'exist': True, 'len': 1, 'v': ['one.one.one.one.']}, + ), + ( + {'v': 'one.one.one.one', 't': 'A'}, + {'exist': True, 'len': 2, 'v': ['1.0.0.1', '1.1.1.1']}, + ), +]) +def test_dns(v: dict, s: dict): + from .net import resolve_dns + r = resolve_dns(**v, timeout=2.0) + assert (len(r) > 0) == s['exist'] + assert len(r) == s['len'] + if 'v' in s: + assert r == s['v'] + + +@pytest.mark.parametrize('v, s', [ + ('one.one.one.one', '1.0.0.1'), + ('test.abc.sldjflsdkl.sdlfj', None), +]) +def test_dns_first_ip(v: str, s: str): + from .net import resolve_first_ip + assert resolve_first_ip(v) == s + + +@pytest.mark.parametrize('v, s', [ + ( + {'target': '1.1.1.1', 'port': 53}, + True, + ), + ( + {'target': '1.1.1.1', 'port': 54}, + False, + ), + ( + {'target': 'one.one.one.one', 'port': 53}, + True, + ), +]) +def test_port(v: dict, s: bool): + from .net import is_port_open + assert is_port_open(**v) == s diff --git a/src/oxl_utils/state.py b/src/oxl_utils/state.py new file mode 100644 index 0000000..29dfa9d --- /dev/null +++ b/src/oxl_utils/state.py @@ -0,0 +1,9 @@ +def is_null(data) -> bool: + if data is None: + return True + + return str(data).strip() == '' + + +def is_set(data: str) -> bool: + return not is_null(data) diff --git a/src/oxl_utils/state_test.py b/src/oxl_utils/state_test.py new file mode 100644 index 0000000..ed979c6 --- /dev/null +++ b/src/oxl_utils/state_test.py @@ -0,0 +1,31 @@ +import pytest + + +@pytest.mark.parametrize('v, r', [ + ('', True), + (' ', True), + (' ', True), + (None, True), + ('test', False), + (1, False), + (True, False), + ('None', False), +]) +def test_null(v: any, r: bool): + from .state import is_null + assert is_null(v) == r + + +@pytest.mark.parametrize('v, r', [ + ('', False), + (' ', False), + (' ', False), + (None, False), + ('test', True), + (1, True), + (True, True), + ('None', True), +]) +def test_set(v: any, r: bool): + from .state import is_set + assert is_set(v) == r diff --git a/src/oxl_utils/subps.py b/src/oxl_utils/subps.py new file mode 100644 index 0000000..58184c6 --- /dev/null +++ b/src/oxl_utils/subps.py @@ -0,0 +1,85 @@ +import subprocess +from pathlib import Path +from os import environ, getcwd +from functools import cache + +from .log import log + +# pylint: disable=R0914 +def process( + cmd: (str, list), timeout_sec: int = None, shell: bool = False, timeout_shell: bool = True, + cwd: Path = None, env: dict = None, env_inherit: bool = False, env_remove: list = None, + empty_none: bool = True, +) -> dict: + if cwd is None: + cwd = getcwd() + + cmd_str = cmd + if isinstance(cmd, list): + cmd_str = ' '.join(cmd) + + if shell: + cmd = cmd_str + if timeout_shell and timeout_sec is not None: + cmd = f'timeout {timeout_sec} {cmd}' + + elif not isinstance(cmd, list): + cmd = cmd.split(' ') + + log(msg=f"Executing command: '{cmd_str}'", level=6) + + if env is None: + env = {} + + if env_inherit: + if env_remove is None: + env_remove = [] + + # merge provided env with current env + env = {**environ.copy(), **env} + for k in env_remove: + if k in env: + env.pop(k) + + try: + with subprocess.Popen( + cmd, + shell=shell, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + cwd=cwd, + env=env, + ) as p: + b_stdout, b_stderr = p.communicate(timeout=timeout_sec) + stdout, stderr, rc = b_stdout.decode('utf-8').strip(), b_stderr.decode('utf-8').strip(), p.returncode + + except (subprocess.TimeoutExpired, subprocess.SubprocessError, subprocess.CalledProcessError, + OSError, IOError) as error: + stdout, stderr, rc = '', str(error), 1 + + + if empty_none: + if stdout.strip() == '': + stdout = None + + if stderr.strip() == '': + stderr = None + + return { + 'stdout': stdout, + 'stderr': stderr, + 'rc': rc, + } + + +@cache +def process_with_cache( + cmd: (str, list), timeout_sec: int = None, shell: bool = False, timeout_shell: bool = True, + cwd: Path = None, env: dict = None, env_inherit: bool = False, env_remove: list = None, + empty_none: bool = True, +) -> dict: + # read-only commands which results can be cached + return process( + cmd=cmd, timeout_sec=timeout_sec, shell=shell, timeout_shell=timeout_shell, cwd=cwd, + env=env, env_inherit=env_inherit, env_remove=env_remove, empty_none=empty_none, + ) diff --git a/src/oxl_utils/subps_test.py b/src/oxl_utils/subps_test.py new file mode 100644 index 0000000..20585ff --- /dev/null +++ b/src/oxl_utils/subps_test.py @@ -0,0 +1,55 @@ +import pytest + + +@pytest.mark.parametrize('kwargs, s', [ + ( + {'cmd': 'echo abc'}, + {'rc': 0, 'stdout': 'abc', 'stderr': None}, + ), + ( + {'cmd': ['echo', 'abc']}, + {'rc': 0, 'stdout': 'abc', 'stderr': None}, + ), + ( + {'cmd': ['echo', 'abc'], 'shell': True}, + {'rc': 0, 'stdout': 'abc', 'stderr': None}, + ), + ( + {'cmd': 'echo abc', 'shell': True}, + {'rc': 0, 'stdout': 'abc', 'stderr': None}, + ), + ( + {'cmd': 'sleep 1'}, + {'rc': 0, 'stdout': None, 'stderr': None}, + ), + ( + {'cmd': 'sleep 1', 'timeout_sec': 0}, + {'rc': 1, 'stdout': None, 'stderr': 'timed out'}, + ), + ( + {'cmd': 'sleep 1', 'timeout_sec': 0, 'shell': True}, + {'rc': 1, 'stdout': None, 'stderr': 'timed out'}, + ), + ( + {'cmd': 'sleep 1', 'timeout_sec': 0, 'shell': True, 'timeout_shell': False}, + {'rc': 1, 'stdout': None, 'stderr': 'timed out'}, + ), + ( + {'cmd': 'mkdir /tmp/abc/def/ghi'}, + {'rc': 1, 'stdout': None, 'stderr': 'No such file or directory'}, + ), + ( + {'cmd': 'echo', 'empty_none': False}, + {'rc': 0, 'stdout': '', 'stderr': ''}, + ), +]) +def test_subps(kwargs: dict, s: dict): + from .subps import process + r = process(**kwargs) + assert r['rc'] == s['rc'] + assert r['stdout'] == s['stdout'] + assert r['stderr'] == s['stderr'] or ( + isinstance(r['stderr'], str) and + isinstance(s['stderr'], str) and + r['stderr'].find(s['stderr']) != -1 + ) diff --git a/src/oxl_utils/valid/__init__.py b/src/oxl_utils/valid/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/oxl_utils/valid/base.py b/src/oxl_utils/valid/base.py new file mode 100644 index 0000000..1a2403d --- /dev/null +++ b/src/oxl_utils/valid/base.py @@ -0,0 +1,5 @@ +def _reg_match(reg, v: (str, None)) -> bool: + if v is None: + v = '' + + return reg.match(v) is not None diff --git a/src/oxl_utils/valid/dns.py b/src/oxl_utils/valid/dns.py new file mode 100644 index 0000000..6c8824b --- /dev/null +++ b/src/oxl_utils/valid/dns.py @@ -0,0 +1,15 @@ +import re as regex + +from .base import _reg_match + +MATCH_DOMAIN = regex.compile( + r'^(([a-zA-Z]{1})|([a-zA-Z]{1}[a-zA-Z]{1})|' + r'([a-zA-Z]{1}[0-9]{1})|([0-9]{1}[a-zA-Z]{1})|' + r'([-_.a-zA-Z0-9]{0,61}[a-zA-Z0-9]))\.' + r'([a-zA-Z]{2,13}|[a-zA-Z0-9-]{2,30}.[a-zA-Z]{2,3})$' +) + + +def valid_domain(value: str) -> bool: + # see: https://validators.readthedocs.io/en/latest/_modules/validators/domain.html#domain + return _reg_match(reg=MATCH_DOMAIN, v=value) diff --git a/src/oxl_utils/valid/dns_test.py b/src/oxl_utils/valid/dns_test.py new file mode 100644 index 0000000..a9136bd --- /dev/null +++ b/src/oxl_utils/valid/dns_test.py @@ -0,0 +1,16 @@ +import pytest + + +@pytest.mark.parametrize('v, s', [ + ('oxl.at', True), + ('abc.oxl.at', True), + ('!abc.oxl.at', False), + ('abc. bool: - if not email or '@' not in email: +EMAIL_REGEX_USER = regex.compile(r"^[a-zA-Z0-9_+~\-\.]*$") + + +def has_mailserver(email: str, dns: bool = True) -> bool: + if not isinstance(email, str): return False - user_part, domain_part = email.rsplit('@', 1) + if not dns: + return True + + domain = email.split('@', 1)[1] + return len(resolve_dns(domain, t='MX')) > 0 - if not EMAIL_REGEX_USER.match(user_part): + +def valid_email(email: str, dns: bool = False) -> bool: + if not email or not isinstance(email, str) or '@' not in email: return False - if not EMAIL_REGEX_DOMAIN.match(domain_part): - # Try for possible IDN domain-part - try: - domain_part = domain_part.encode('idna').decode('ascii') - return EMAIL_REGEX_DOMAIN.match(domain_part) + user_part, domain_part = email.rsplit('@', 1) - except UnicodeError: - return False + if not _reg_match(reg=EMAIL_REGEX_USER, v=user_part): + return False + + if not valid_domain(domain_part): + return False - return True + return has_mailserver(email=email, dns=dns) diff --git a/src/oxl_utils/valid/email_test.py b/src/oxl_utils/valid/email_test.py new file mode 100644 index 0000000..e6097b2 --- /dev/null +++ b/src/oxl_utils/valid/email_test.py @@ -0,0 +1,25 @@ +import pytest + + +@pytest.mark.parametrize('v, s', [ + ('test@oxl.at', True), + ('test@abc.oxl.at', False), +]) +def test_email_dns(v: str, s: bool): + from .email import has_mailserver + assert has_mailserver(v) == s + + +@pytest.mark.parametrize('v, s', [ + ({'email': 'test@oxl.at', 'dns': False}, True), + ({'email': 'tes t@oxl.at', 'dns': False}, False), + ({'email': 'test@abc.oxl.at', 'dns': False}, True), + ({'email': 'test@abc.oxl.at', 'dns': True}, False), + ({'email': 'test+abc@oxl.at', 'dns': False}, True), + ({'email': 'test_abc@oxl.at', 'dns': False}, True), + ({'email': 'test!@oxl.at', 'dns': False}, False), + ({'email': 'test<@oxl.at', 'dns': False}, False), +]) +def test_email(v: dict, s: bool): + from .email import valid_email + assert valid_email(**v) == s diff --git a/src/oxl_utils/valid/net.py b/src/oxl_utils/valid/net.py new file mode 100644 index 0000000..73c8680 --- /dev/null +++ b/src/oxl_utils/valid/net.py @@ -0,0 +1,89 @@ +from ipaddress import ip_address, IPv4Address, IPv6Address, AddressValueError, IPv4Network, IPv6Network, NetmaskValueError + + +def valid_ip(ip: str) -> bool: + if not isinstance(ip, str): + return False + + try: + ip_address(ip) + return True + + except (AddressValueError, ValueError): + return False + + +def valid_ip4(ip: str) -> bool: + if not isinstance(ip, str): + return False + + try: + IPv4Address(ip) + return True + + except AddressValueError: + return False + + +def valid_ip6(ip: str) -> bool: + if not isinstance(ip, str): + return False + + try: + IPv6Address(ip) + return True + + except AddressValueError: + return False + + +def valid_net4(ip: str, strict: bool = False) -> bool: + if not isinstance(ip, str): + return False + + try: + IPv4Network(ip, strict=strict) + return True + + except (AddressValueError, NetmaskValueError): + return False + + +def valid_net6(ip: str, strict: bool = False) -> bool: + if not isinstance(ip, str): + return False + + try: + IPv6Network(ip, strict=strict) + return True + + except (AddressValueError, NetmaskValueError): + return False + + +def valid_public_ip(ip: str) -> bool: + ip = str(ip) + try: + ip = IPv4Address(ip) + return ip.is_global and \ + not ip.is_loopback and \ + not ip.is_reserved and \ + not ip.is_multicast and \ + not ip.is_link_local + + except AddressValueError: + try: + ip = IPv6Address(ip) + return ip.is_global and \ + not ip.is_loopback and \ + not ip.is_reserved and \ + not ip.is_multicast and \ + not ip.is_link_local + + except AddressValueError: + return False + + +def valid_asn(asn: str) -> bool: + asn = str(asn) + return asn.isdigit() and 0 <= int(asn) <= 4_294_967_294 diff --git a/src/oxl_utils/valid/net_test.py b/src/oxl_utils/valid/net_test.py new file mode 100644 index 0000000..178632a --- /dev/null +++ b/src/oxl_utils/valid/net_test.py @@ -0,0 +1,122 @@ +import pytest + + +@pytest.mark.parametrize('v, s', [ + ('1.1.1.1', True), + ('192.168.0.1', True), + ('::1', True), + ('2adb::1', True), + ('2adb:.:1', False), + ('1.1.1:1', False), + ('test', False), + ('1', False), + ('!', False), + (True, False), +]) +def test_ip(v: str, s: bool): + from .net import valid_ip + assert valid_ip(v) == s + + +@pytest.mark.parametrize('v, s', [ + ('1.1.1.1', True), + ('192.168.0.1', True), + ('::1', False), + ('2adb::1', False), + ('2adb:.:1', False), + ('1.1.1:1', False), + ('test', False), + ('1', False), + ('!', False), + (True, False), +]) +def test_ip4(v: str, s: bool): + from .net import valid_ip4 + assert valid_ip4(v) == s + + +@pytest.mark.parametrize('v, s', [ + ('1.1.1.1', False), + ('192.168.0.1', False), + ('::1', True), + ('2adb::1', True), + ('2adb:.:1', False), + ('1.1.1:1', False), + ('test', False), + ('1', False), + ('!', False), + (True, False), +]) +def test_ip6(v: str, s: bool): + from .net import valid_ip6 + assert valid_ip6(v) == s + + +@pytest.mark.parametrize('v, s', [ + ('1.1.1.1/32', True), + ('192.168.10.0/24', True), + ('1.1.1.1', True), + ('192.168.0.1', True), + ('::1', False), + ('::1/128', False), + ('2adb::1', False), + ('2adb:.:1', False), + ('1.1.1:1', False), + ('test', False), + ('1', False), + ('!', False), + (True, False), +]) +def test_net4(v: str, s: bool): + from .net import valid_net4 + assert valid_net4(v) == s + + +@pytest.mark.parametrize('v, s', [ + ('1.1.1.1/32', False), + ('192.168.10.0/24', False), + ('1.1.1.1', False), + ('192.168.0.1', False), + ('::1', True), + ('::1/128', True), + ('2adb::1', True), + ('2adb::1/64', True), + ('2adb:.:1', False), + ('1.1.1:1', False), + ('test', False), + ('1', False), + ('!', False), + (True, False), +]) +def test_net6(v: str, s: bool): + from .net import valid_net6 + assert valid_net6(v) == s + + +@pytest.mark.parametrize('v, s', [ + ('1.1.1.1', True), + ('192.168.0.1', False), + ('::1', False), + ('2adb::1', True), + ('2adb:.:1', False), + ('1.1.1:1', False), + ('test', False), + ('1', False), + ('!', False), + (True, False), +]) +def test_public_ip(v: str, s: bool): + from .net import valid_public_ip + assert valid_public_ip(v) == s + + +@pytest.mark.parametrize('v, s', [ + (1337, True), + ('a', False), + (True, False), + (3829229, True), + (-1, False), +]) +def test_asn(v: str, s: bool): + from .net import valid_asn + assert valid_asn(v) == s