From 0aa77ab2b8955534363db73a9bcf4ad47937ac9e Mon Sep 17 00:00:00 2001 From: kiraum Date: Thu, 19 Dec 2024 23:46:41 +0100 Subject: [PATCH] feat(irrexplorer-cli): init --- .github/dependabot.yml | 11 ++ .github/workflows/linter.yml | 30 ++++ .pre-commit-config.yaml | 52 +++++++ README.md | 29 +++- irrexplorer_cli/__init__.py | 0 irrexplorer_cli/helpers.py | 123 ++++++++++++++++ irrexplorer_cli/irrexplorer.py | 247 +++++++++++++++++++++++++++++++++ irrexplorer_cli/main.py | 78 +++++++++++ irrexplorer_cli/models.py | 76 ++++++++++ irrexplorer_cli/py.typed | 0 irrexplorer_cli/queries.py | 73 ++++++++++ pyproject.toml | 73 ++++++++++ requirements.lock | 124 +++++++++++++++++ tests/__init__.py | 0 tests/fixtures.py | 46 ++++++ tests/test_cli.py | 111 +++++++++++++++ tests/test_helpers.py | 144 +++++++++++++++++++ tests/test_irrexplorer.py | 199 ++++++++++++++++++++++++++ tests/test_main.py | 78 +++++++++++ tests/test_models.py | 50 +++++++ tests/test_queries.py | 50 +++++++ 21 files changed, 1593 insertions(+), 1 deletion(-) create mode 100644 .github/dependabot.yml create mode 100644 .github/workflows/linter.yml create mode 100644 .pre-commit-config.yaml create mode 100644 irrexplorer_cli/__init__.py create mode 100644 irrexplorer_cli/helpers.py create mode 100644 irrexplorer_cli/irrexplorer.py create mode 100644 irrexplorer_cli/main.py create mode 100644 irrexplorer_cli/models.py create mode 100644 irrexplorer_cli/py.typed create mode 100644 irrexplorer_cli/queries.py create mode 100644 pyproject.toml create mode 100644 requirements.lock create mode 100644 tests/__init__.py create mode 100644 tests/fixtures.py create mode 100644 tests/test_cli.py create mode 100644 tests/test_helpers.py create mode 100644 tests/test_irrexplorer.py create mode 100644 tests/test_main.py create mode 100644 tests/test_models.py create mode 100644 tests/test_queries.py diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..9d866e3 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,11 @@ +# To get started with Dependabot version updates, you'll need to specify which +# package ecosystems to update and where the package manifests are located. +# Please see the documentation for all configuration options: +# https://docs.github.com/code-security/dependabot/dependabot-version-updates/configuration-options-for-the-dependabot.yml-file + +version: 2 +updates: + - package-ecosystem: "pip" # See documentation for possible values + directory: "/" # Location of package manifests + schedule: + interval: "weekly" diff --git a/.github/workflows/linter.yml b/.github/workflows/linter.yml new file mode 100644 index 0000000..b723e3a --- /dev/null +++ b/.github/workflows/linter.yml @@ -0,0 +1,30 @@ +name: Test (linter/formatter/coverage) + +on: [push] + +jobs: + build: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.13"] + + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install dependencies + run: | + pip install --upgrade uv + uv pip sync --system --break-system-packages requirements.lock + + - name: Install package + run: | + pip install -e . + + - name: Run all linters and formatters + run: | + pre-commit run --all-files diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..37a07e9 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,52 @@ +repos: +- repo: https://github.com/pre-commit/pre-commit-hooks + rev: 'v5.0.0' + hooks: + - id: check-yaml + - id: end-of-file-fixer + - id: trailing-whitespace + - id: check-toml + - id: check-added-large-files + +- repo: local + hooks: + - id: isort + name: isort + entry: isort + language: system + types: [python] + + - id: black + name: black + entry: black + language: python + types_or: [python, pyi] + + - id: ruff + name: ruff + entry: ruff check + language: python + types_or: [python, pyi] + + - id: mypy + name: mypy + entry: mypy + language: python + types_or: [python, pyi] + require_serial: true + + - id: pylint + name: pylint + entry: pylint + language: system + types: [python] + args: + - --rcfile=pyproject.toml + + - id: pytest + name: pytest + entry: pytest + language: system + types: [python] + pass_filenames: false + args: ['--cov=irrexplorer_cli', '--cov-fail-under=100', 'tests/'] diff --git a/README.md b/README.md index 14fba05..add562d 100644 --- a/README.md +++ b/README.md @@ -1 +1,28 @@ -# irrexplorer-cli \ No newline at end of file +# irrexplorer-cli + +Here's how to compile and install the package using uv: + +uv pip install --editable ".[dev]" + +Copied + +Execute + +For a production build without development dependencies: + +uv pip install . + + + + +pytest tests/ -v --cov=irrexplorer_cli + + +TODO: Update README + + +pre-commit autoupdate --repo https://github.com/pre-commit/pre-commit-hooks + + + +pytest --cov=irrexplorer_cli --cov-report=term-missing tests/ diff --git a/irrexplorer_cli/__init__.py b/irrexplorer_cli/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/irrexplorer_cli/helpers.py b/irrexplorer_cli/helpers.py new file mode 100644 index 0000000..4fffd4a --- /dev/null +++ b/irrexplorer_cli/helpers.py @@ -0,0 +1,123 @@ +"""Helper functions for the CLI.""" + +import ipaddress +import re +from typing import Any, Dict, List + +from irrexplorer_cli.models import PrefixInfo, PrefixResult + + +def validate_prefix_format(prefix_input: str) -> bool: + """Validate IPv4 or IPv6 prefix format.""" + try: + ipaddress.ip_network(prefix_input) + return True + except ValueError: + return False + + +def validate_asn_format(asn_input: str) -> bool: + """Validate ASN format.""" + asn_pattern = r"^(?:AS|as)?(\d+)$" + match = re.match(asn_pattern, asn_input) + if match: + asn_number = int(match.group(1)) + return 0 <= asn_number <= 4294967295 + return False + + +def format_prefix_result(result: PrefixInfo, prefix_type: str) -> str: + """Format a single prefix result for CSV output.""" + prefix_result = PrefixResult( + prefix=result.prefix, + categoryOverall=result.categoryOverall, + rir=result.rir, + rpkiRoutes=result.rpkiRoutes, + bgpOrigins=result.bgpOrigins, + irrRoutes=result.irrRoutes, + messages=result.messages, + ) + + rpki_status = "NOT_FOUND" + if prefix_result.rpkiRoutes: + rpki_status = prefix_result.rpkiRoutes[0].rpkiStatus + + bgp_origins = "|".join(str(asn) for asn in prefix_result.bgpOrigins) + + irr_routes = [] + for db, routes in prefix_result.irrRoutes.items(): + for route in routes: + irr_routes.append(f"{db}:AS{route.asn}:{route.rpkiStatus}") + irr_routes_str = "|".join(irr_routes) + + messages = "|".join(msg.text for msg in prefix_result.messages) + + return ( + f"{prefix_type},{prefix_result.prefix},{prefix_result.categoryOverall}," + f"{prefix_result.rir},{rpki_status},{bgp_origins},{irr_routes_str},{messages}" + ) + + +def format_direct_origins(as_number: str, results: Dict[str, List[Dict[str, Any]]]) -> None: + """Format and print direct origin prefixes.""" + for pfx_dict in results.get("directOrigin", []): + pfx = PrefixInfo(**pfx_dict) + rpki_status = "NOT_FOUND" + if pfx.rpkiRoutes: + rpki_status = pfx.rpkiRoutes[0].rpkiStatus + + bgp_origins = "|".join(str(as_number) for as_number in pfx.bgpOrigins) + irr_routes = [] + for db, routes in pfx.irrRoutes.items(): + for route in routes: + irr_routes.append(f"{db}:AS{route.asn}:{route.rpkiStatus}") + irr_routes_str = "|".join(irr_routes) + messages = "|".join(msg.text for msg in pfx.messages) + + print( + f"\nDIRECT,{as_number},{pfx.prefix},{pfx.categoryOverall},{pfx.rir}," + f"{rpki_status},{bgp_origins},{irr_routes_str},{messages}", + end="", + ) + + +def format_overlapping_prefixes(as_number: str, results: Dict[str, List[Dict[str, Any]]]) -> None: + """Format and print overlapping prefixes.""" + for pfx_dict in results.get("overlaps", []): + pfx = PrefixInfo(**pfx_dict) + rpki_status = "NOT_FOUND" + if pfx.rpkiRoutes: + rpki_status = pfx.rpkiRoutes[0].rpkiStatus + + bgp_origins = "|".join(str(as_number) for as_number in pfx.bgpOrigins) + irr_routes = [] + for db, routes in pfx.irrRoutes.items(): + for route in routes: + irr_routes.append(f"{db}:AS{route.asn}:{route.rpkiStatus}") + irr_routes_str = "|".join(irr_routes) + messages = "|".join(msg.text for msg in pfx.messages) + + print( + f"\nOVERLAP,{as_number},{pfx.prefix},{pfx.categoryOverall},{pfx.rir}," + f"{rpki_status},{bgp_origins},{irr_routes_str},{messages}", + end="", + ) + + +def format_as_sets(as_number: str, sets_data: Dict[str, Dict[str, List[str]]]) -> None: + """Format and print AS sets.""" + if sets_data and sets_data.get("setsPerIrr"): + for irr, sets in sets_data["setsPerIrr"].items(): + for as_set in sets: + print(f"\nSET,{as_number},{as_set},{irr},N/A,N/A,N/A,N/A,N/A", end="") + + +async def find_least_specific_prefix(direct_overlaps: List[PrefixInfo]) -> str | None: + """Find the least specific prefix from the overlaps.""" + least_specific = None + for info in direct_overlaps: + if "/" in info.prefix: + _, mask = info.prefix.split("/") + if least_specific is None or int(mask) < int(least_specific.split("/")[1]): + least_specific = info.prefix + return least_specific diff --git a/irrexplorer_cli/irrexplorer.py b/irrexplorer_cli/irrexplorer.py new file mode 100644 index 0000000..2cef09d --- /dev/null +++ b/irrexplorer_cli/irrexplorer.py @@ -0,0 +1,247 @@ +"""Core functionality for IRR Explorer CLI.""" + +from typing import Any, Dict, List, Optional, cast + +import backoff +import httpx +from rich.columns import Columns +from rich.console import Console +from rich.panel import Panel +from rich.table import Table +from rich.text import Text + +from irrexplorer_cli.helpers import find_least_specific_prefix + +from .models import PrefixInfo + + +class IrrExplorer: + """IRR Explorer API client for prefix information retrieval.""" + + def __init__(self, base_url: str = "https://irrexplorer.nlnog.net") -> None: + """Initialize IRR Explorer client with base URL.""" + self.base_url = base_url + self.client = httpx.AsyncClient() + + @backoff.on_exception(backoff.expo, (httpx.HTTPError, httpx.RequestError), max_tries=3, max_time=30) + async def fetch_prefix_info(self, prefix: str) -> List[PrefixInfo]: + """Fetch prefix information from IRR Explorer API.""" + url = f"{self.base_url}/api/prefixes/prefix/{prefix}" + response = await self.client.get(url) + response.raise_for_status() + return [PrefixInfo(**item) for item in response.json()] + + @backoff.on_exception(backoff.expo, (httpx.HTTPError, httpx.RequestError), max_tries=3, max_time=30) + async def fetch_asn_info(self, asn: str) -> Dict[str, Any]: + """Fetch prefix information for an AS number.""" + url = f"https://irrexplorer.nlnog.net/api/prefixes/asn/{asn}" + async with httpx.AsyncClient() as client: + response = await client.get(url) + response.raise_for_status() + return cast(Dict[str, Any], response.json()) + + @backoff.on_exception(backoff.expo, (httpx.HTTPError, httpx.RequestError), max_tries=3, max_time=30) + async def fetch_asn_sets(self, asn: str) -> Dict[str, Any]: + """Fetch AS sets information for an AS number.""" + url = f"{self.base_url}/api/sets/member-of/{asn}" + response = await self.client.get(url) + response.raise_for_status() + return cast(Dict[str, Any], response.json()) + + async def close(self) -> None: + """Close the HTTP client connection.""" + await self.client.aclose() + + +class IrrDisplay: + """Display handler for IRR Explorer prefix information.""" + + def __init__(self) -> None: + """Initialize display handler with Rich console.""" + self.console = Console() + + async def create_prefix_panel(self, info: PrefixInfo) -> Panel: + """Create Rich panel with prefix information.""" + table = Table(show_header=True, header_style="bold cyan", expand=True) + table.add_column("Property") + table.add_column("Value") + + # Add basic info asynchronously + await self.add_basic_info(table, info) + await self.add_bgp_origins(table, info) + await self.add_rpki_routes(table, info) + await self.add_irr_routes(table, info) + await self.add_status(table, info) + await self.add_messages(table, info) + + return Panel( + table, + title=f"[bold]{info.prefix}[/bold]", + expand=False, + border_style=await self.get_status_style(info.categoryOverall), + ) + + async def sort_and_group_panels(self, prefix_infos: List[PrefixInfo]) -> List[Panel]: + """Sort and group prefix information panels by status category.""" + status_groups: Dict[str, List[Panel]] = {"success": [], "warning": [], "error": [], "danger": [], "info": []} + + sorted_infos = sorted(prefix_infos, key=lambda x: (-int(x.prefix.split("/")[1]), x.categoryOverall)) + + for info in sorted_infos: + panel = await self.create_prefix_panel(info) + status_groups[info.categoryOverall].append(panel) + + return [ + panel for status in ["success", "warning", "error", "danger", "info"] for panel in status_groups[status] + ] + + async def add_basic_info(self, table: Table, info: PrefixInfo) -> None: + """Add basic prefix information to the display table.""" + table.add_row("Prefix", info.prefix) + table.add_row("RIR", info.rir) + + async def add_bgp_origins(self, table: Table, info: PrefixInfo) -> None: + """Add BGP origin information to the display table.""" + bgp_origins = ", ".join(f"AS{asn}" for asn in info.bgpOrigins) if info.bgpOrigins else "None" + table.add_row("BGP Origins", bgp_origins) + + async def add_rpki_routes(self, table: Table, info: PrefixInfo) -> None: + """Add RPKI route information to the display table.""" + if info.rpkiRoutes: + rpki_rows = [ + f"AS{route.asn} (MaxLength: {route.rpkiMaxLength}, Status: {route.rpkiStatus})" + for route in info.rpkiRoutes + ] + table.add_row("RPKI Routes", "\n".join(rpki_rows)) + + async def add_irr_routes(self, table: Table, info: PrefixInfo) -> None: + """Add IRR route information to the display table.""" + if info.irrRoutes: + irr_rows = [ + f"{db}: AS{route.asn} ({route.rpkiStatus})" for db, routes in info.irrRoutes.items() for route in routes + ] + if irr_rows: + table.add_row("IRR Routes", "\n".join(irr_rows)) + + async def add_status(self, table: Table, info: PrefixInfo) -> None: + """Add status information to the display table.""" + status_style = await self.get_status_style(info.categoryOverall) + table.add_row("Status", Text(info.categoryOverall, style=status_style)) + + async def add_messages(self, table: Table, info: PrefixInfo) -> None: + """Add message information to the display table.""" + if info.messages: + messages = "\n".join(f"• {msg.text}" for msg in info.messages) + table.add_row("Messages", messages) + + async def get_status_style(self, category: str) -> str: + """Get Rich style color based on status category.""" + return {"success": "green", "warning": "yellow", "error": "red", "danger": "red", "info": "blue"}.get( + category, "white" + ) + + async def display_prefix_info(self, direct_overlaps: List[PrefixInfo]) -> None: + """Display prefix information in Rich panels.""" + await self.display_direct_overlaps(direct_overlaps) + least_specific = await find_least_specific_prefix(direct_overlaps) + if least_specific: + await self.display_all_overlaps(least_specific) + + async def display_direct_overlaps(self, direct_overlaps: List[PrefixInfo]) -> None: + """Display directly overlapping prefixes.""" + direct_panels = await self.sort_and_group_panels(direct_overlaps) + direct_columns = Columns(direct_panels, equal=True, expand=True) + self.console.print( + Panel( + direct_columns, + title=f"[bold]Directly overlapping prefixes of {direct_overlaps[0].prefix}[/bold]", + expand=False, + ) + ) + + async def display_all_overlaps(self, least_specific: str) -> None: + """Display all overlaps for least specific prefix.""" + try: + explorer = IrrExplorer() + all_overlaps = await explorer.fetch_prefix_info(least_specific) + all_panels = await self.sort_and_group_panels(all_overlaps) + all_columns = Columns(all_panels, equal=True, expand=True) + + self.console.print("\n") + self.console.print( + Panel( + all_columns, + title=f"[bold]All overlaps of least specific match {least_specific}[/bold]", + expand=False, + ) + ) + await explorer.close() + except (httpx.HTTPError, ValueError, RuntimeError) as e: + self.console.print(f"[red]Error fetching overlaps for {least_specific}: {str(e)}[/red]") + finally: + await explorer.close() + + def get_rpki_status(self, prefix: Dict[str, Any]) -> str: + """Extract RPKI status from prefix info.""" + if prefix.get("rpkiRoutes"): + return cast(str, prefix["rpkiRoutes"][0]["rpkiStatus"]) + return "UNKNOWN" + + async def display_asn_info( + self, data: Dict[str, Any], asn: str, sets_data: Optional[Dict[str, Any]] = None + ) -> None: + """Display AS query results with rich formatting.""" + await self.display_direct_origins(data, asn) + await self.display_overlaps(data, asn) + self.display_as_sets(sets_data, asn) + + async def display_direct_origins(self, data: Dict[str, Any], asn: str) -> None: + """Display direct origin prefixes.""" + if data.get("directOrigin"): + direct_infos = [PrefixInfo(**prefix) for prefix in data["directOrigin"]] + direct_panels = await self.sort_and_group_panels(direct_infos) + direct_columns = Columns(direct_panels, equal=True, expand=True) + self.console.print( + Panel( + direct_columns, + title=f"[bold]Prefixes directly originated by {asn}[/bold]", + expand=False, + ) + ) + + async def display_overlaps(self, data: Dict[str, Any], asn: str) -> None: + """Display overlapping prefixes.""" + if data.get("overlaps"): + overlap_infos = [PrefixInfo(**prefix) for prefix in data["overlaps"]] + overlap_panels = await self.sort_and_group_panels(overlap_infos) + if overlap_panels: + self.console.print("\n") + overlap_columns = Columns(overlap_panels, equal=True, expand=True) + self.console.print( + Panel( + overlap_columns, + title=f"[bold]Overlapping prefixes related to {asn}[/bold]", + expand=False, + ) + ) + self.console.print("\n") + + def display_as_sets(self, sets_data: Optional[Dict[str, Any]], asn: str) -> None: + """Display AS sets information.""" + if sets_data and sets_data.get("setsPerIrr"): + sets_panels = [] + for irr, sets in sets_data["setsPerIrr"].items(): + table = Table(show_header=False, expand=True) + for as_set in sets: + table.add_row(as_set) + panel = Panel(table, title=f"[bold]{irr}[/bold]", border_style="blue") + sets_panels.append(panel) + + sets_columns = Columns(sets_panels, equal=True, expand=True) + self.console.print( + Panel( + sets_columns, + title=f"[bold]AS Sets including {asn}[/bold]", + expand=False, + ) + ) diff --git a/irrexplorer_cli/main.py b/irrexplorer_cli/main.py new file mode 100644 index 0000000..1e571e4 --- /dev/null +++ b/irrexplorer_cli/main.py @@ -0,0 +1,78 @@ +"""Command-line interface for IRR Explorer queries.""" + +import asyncio +from importlib.metadata import version + +import typer +from click import Context +from rich.console import Console + +from irrexplorer_cli.helpers import validate_asn_format, validate_prefix_format +from irrexplorer_cli.queries import async_asn_query, async_prefix_query + +__version__ = version("irrexplorer-cli") + +CTX_OPTION = typer.Option(None, hidden=True) +app = typer.Typer( + help="CLI tool to query IRR Explorer data for prefix information", + no_args_is_help=True, + context_settings={"help_option_names": ["-h", "--help"]}, +) +console = Console() + + +def version_display(display_version: bool) -> None: + """Display version information and exit.""" + if display_version: + print(f"[bold]IRR Explorer CLI[/bold] version: {__version__}") + raise typer.Exit() + + +@app.callback() +def callback( + _: bool = typer.Option(None, "--version", "-v", callback=version_display, is_eager=True), +) -> None: + """Query IRR Explorer for prefix information.""" + + +@app.command(no_args_is_help=True) +def prefix( + prefix_query: str = typer.Argument(None, help="Prefix to query (e.g., 193.0.0.0/21)"), + output_format: str = typer.Option(None, "--format", "-f", help="Output format (json or csv)"), + ctx: Context = CTX_OPTION, +) -> None: + """Query IRR Explorer for prefix information.""" + if not prefix_query: + if ctx: + typer.echo(ctx.get_help()) + raise typer.Exit() + + if not validate_prefix_format(prefix_query): + typer.echo(f"Error: Invalid prefix format: {prefix_query}") + raise typer.Exit(1) + + asyncio.run(async_prefix_query(prefix_query, output_format)) + + +@app.command() +def asn( + asn_query: str = typer.Argument(None, help="AS number to query (e.g., AS2111, as2111, or 2111)"), + output_format: str = typer.Option(None, "--format", "-f", help="Output format (json or csv)"), + ctx: Context = CTX_OPTION, +) -> None: + """Query IRR Explorer for AS number information.""" + if not asn_query: + if ctx: + typer.echo(ctx.get_help()) + raise typer.Exit() + + if not validate_asn_format(asn_query): + typer.echo(f"Error: Invalid ASN format: {asn_query}") + raise typer.Exit(1) + + if not asn_query.upper().startswith("AS"): + asn_query = f"AS{asn_query}" + else: + asn_query = f"AS{asn_query[2:]}" + + asyncio.run(async_asn_query(asn_query, output_format)) diff --git a/irrexplorer_cli/models.py b/irrexplorer_cli/models.py new file mode 100644 index 0000000..f4b9091 --- /dev/null +++ b/irrexplorer_cli/models.py @@ -0,0 +1,76 @@ +"""Data models for IRR Explorer API responses.""" + +from typing import Dict, List, Optional + +from pydantic import BaseModel + + +class BaseRoute(BaseModel): + """Base model for route information.""" + + rpkiStatus: str + rpkiMaxLength: Optional[int] + asn: int + rpslText: str + rpslPk: str + + +class RpkiRoute(BaseRoute): + """RPKI route information model.""" + + +class IrrRoute(BaseRoute): + """IRR route information model.""" + + +class Message(BaseModel): + """Message model for API responses.""" + + text: str + category: str + + +class PrefixInfo(BaseModel): + """Prefix information model containing route and status details.""" + + prefix: str + rir: str + bgpOrigins: List[int] + rpkiRoutes: List[RpkiRoute] + irrRoutes: Dict[str, List[IrrRoute]] + categoryOverall: str + messages: List[Message] + prefixSortKey: str + goodnessOverall: int + + +class AsResponse(BaseModel): + """Response model for AS queries.""" + + directOrigin: List[PrefixInfo] + overlaps: List[PrefixInfo] + + +class PrefixResult(BaseModel): + """Prefix query result information.""" + + prefix: str + categoryOverall: str + rir: str + rpkiRoutes: List[RpkiRoute] + bgpOrigins: List[int] + irrRoutes: Dict[str, List[IrrRoute]] + messages: List[Message] + + +class AsSets(BaseModel): + """AS Sets information.""" + + setsPerIrr: Dict[str, List[str]] + + +class AsnResult(BaseModel): + """ASN query result information.""" + + directOrigin: List[PrefixResult] + overlaps: List[PrefixResult] diff --git a/irrexplorer_cli/py.typed b/irrexplorer_cli/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/irrexplorer_cli/queries.py b/irrexplorer_cli/queries.py new file mode 100644 index 0000000..f239ca3 --- /dev/null +++ b/irrexplorer_cli/queries.py @@ -0,0 +1,73 @@ +"""Query functions for the CLI tool.""" + +import json +from typing import Optional + +import httpx + +from irrexplorer_cli.helpers import ( + find_least_specific_prefix, + format_as_sets, + format_direct_origins, + format_overlapping_prefixes, + format_prefix_result, +) +from irrexplorer_cli.irrexplorer import IrrDisplay, IrrExplorer + + +async def process_overlaps(explorer: IrrExplorer, least_specific: str) -> None: + """Process and print overlapping prefixes.""" + try: + all_overlaps = await explorer.fetch_prefix_info(least_specific) + for result in all_overlaps: + print(format_prefix_result(result, "OVERLAP")) + except (httpx.HTTPError, ValueError, RuntimeError): + pass + + +async def async_prefix_query(pfx: str, output_format: Optional[str] = None) -> None: + """Execute asynchronous prefix query and display results.""" + explorer = IrrExplorer() + display = IrrDisplay() + try: + direct_overlaps = await explorer.fetch_prefix_info(pfx) + + if output_format == "json": + json_data = [result.model_dump() for result in direct_overlaps] + print(json.dumps(json_data, indent=2)) + elif output_format == "csv": + print("Type,Prefix,Category,RIR,RPKI_Status,BGP_Origins,IRR_Routes,Messages") + + for result in direct_overlaps: + print(format_prefix_result(result, "DIRECT")) + + least_specific = await find_least_specific_prefix(direct_overlaps) + if least_specific: + await process_overlaps(explorer, least_specific) + else: + await display.display_prefix_info(direct_overlaps) + finally: + await explorer.close() + + +async def async_asn_query(as_number: str, output_format: Optional[str] = None) -> None: + """Execute asynchronous ASN query and display results.""" + explorer = IrrExplorer() + display = IrrDisplay() + try: + results = await explorer.fetch_asn_info(as_number) + sets_data = await explorer.fetch_asn_sets(as_number) + + if output_format == "json": + combined_data = {"asn_info": results, "as_sets": sets_data} + print(json.dumps(combined_data, indent=2), end="\n") + elif output_format == "csv": + print("Type,ASN,Prefix,Category,RIR,RPKI_Status,BGP_Origins,IRR_Routes,Messages", end="") + format_direct_origins(as_number, results) + format_overlapping_prefixes(as_number, results) + format_as_sets(as_number, sets_data) + print() + else: + await display.display_asn_info(results, as_number, sets_data) + finally: + await explorer.close() diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..9aaf5e5 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,73 @@ +[project] +name = "irrexplorer-cli" +version = "0.0.1" +description = "CLI tool to query IRR Explorer data" +authors = [{name = "kiraum", email = "tfgoncalves@xpto.it" }] +dependencies = [ + "httpx", + "rich", + "typer", + "pydantic", + "backoff", + "asyncio" +] +requires-python = ">=3.13" + +[project.optional-dependencies] +dev = [ + "pytest", + "black", + "ruff", + "mypy", + "pytest-asyncio", + "respx", + "pytest-cov", + "isort", + "pylint", + "pre-commit" +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build] +packages = ["irrexplorer_cli"] +include = [ + "irrexplorer_cli/py.typed", +] + +[tool.isort] +profile = "black" +multi_line_output = 3 +line_length = 120 + +[tool.ruff] +line-length = 120 + +[tool.ruff.lint] +select = ["E", "F", "I", "B"] + +[tool.mypy] +python_version = "3.13" +strict = true +warn_return_any = true +warn_unused_configs = true +disallow_untyped_defs = true +check_untyped_defs = true + +[tool.black] +line-length = 120 + +[project.scripts] +irrexplorer = "irrexplorer_cli.main:app" + +[tool.pytest.ini_options] +asyncio_mode = "auto" +asyncio_default_fixture_loop_scope = "function" + +[tool.pylint] +max-line-length = 120 +disable = [] +enable = "all" +good-names = ["i", "j", "k", "ex", "Run", "_", "id", "f"] diff --git a/requirements.lock b/requirements.lock new file mode 100644 index 0000000..9bcbf66 --- /dev/null +++ b/requirements.lock @@ -0,0 +1,124 @@ +# This file was autogenerated by uv via the following command: +# uv pip compile --extra dev pyproject.toml -o requirements.lock +annotated-types==0.7.0 + # via pydantic +anyio==4.7.0 + # via httpx +astroid==3.3.7 + # via pylint +asyncio==3.4.3 + # via irrexplorer-cli (pyproject.toml) +backoff==2.2.1 + # via irrexplorer-cli (pyproject.toml) +black==24.10.0 + # via irrexplorer-cli (pyproject.toml) +certifi==2024.12.14 + # via + # httpcore + # httpx +cfgv==3.4.0 + # via pre-commit +click==8.1.8 + # via + # black + # typer +coverage==7.6.9 + # via pytest-cov +dill==0.3.9 + # via pylint +distlib==0.3.9 + # via virtualenv +filelock==3.16.1 + # via virtualenv +h11==0.14.0 + # via httpcore +httpcore==1.0.7 + # via httpx +httpx==0.28.1 + # via + # irrexplorer-cli (pyproject.toml) + # respx +identify==2.6.3 + # via pre-commit +idna==3.10 + # via + # anyio + # httpx +iniconfig==2.0.0 + # via pytest +isort==5.13.2 + # via + # irrexplorer-cli (pyproject.toml) + # pylint +markdown-it-py==3.0.0 + # via rich +mccabe==0.7.0 + # via pylint +mdurl==0.1.2 + # via markdown-it-py +mypy==1.14.0 + # via irrexplorer-cli (pyproject.toml) +mypy-extensions==1.0.0 + # via + # black + # mypy +nodeenv==1.9.1 + # via pre-commit +packaging==24.2 + # via + # black + # pytest +pathspec==0.12.1 + # via black +platformdirs==4.3.6 + # via + # black + # pylint + # virtualenv +pluggy==1.5.0 + # via pytest +pre-commit==4.0.1 + # via irrexplorer-cli (pyproject.toml) +pydantic==2.10.4 + # via irrexplorer-cli (pyproject.toml) +pydantic-core==2.27.2 + # via pydantic +pygments==2.18.0 + # via rich +pylint==3.3.2 + # via irrexplorer-cli (pyproject.toml) +pytest==8.3.4 + # via + # irrexplorer-cli (pyproject.toml) + # pytest-asyncio + # pytest-cov +pytest-asyncio==0.25.0 + # via irrexplorer-cli (pyproject.toml) +pytest-cov==6.0.0 + # via irrexplorer-cli (pyproject.toml) +pyyaml==6.0.2 + # via pre-commit +respx==0.22.0 + # via irrexplorer-cli (pyproject.toml) +rich==13.9.4 + # via + # irrexplorer-cli (pyproject.toml) + # typer +ruff==0.8.4 + # via irrexplorer-cli (pyproject.toml) +shellingham==1.5.4 + # via typer +sniffio==1.3.1 + # via anyio +tomlkit==0.13.2 + # via pylint +typer==0.15.1 + # via irrexplorer-cli (pyproject.toml) +typing-extensions==4.12.2 + # via + # mypy + # pydantic + # pydantic-core + # typer +virtualenv==20.28.0 + # via pre-commit diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/fixtures.py b/tests/fixtures.py new file mode 100644 index 0000000..1996d2c --- /dev/null +++ b/tests/fixtures.py @@ -0,0 +1,46 @@ +""" Fixtures for tests """ + +from typing import Any + +from irrexplorer_cli.models import PrefixInfo + +COMMON_SETS_DATA = {"setsPerIrr": {"RIPE": ["AS-TEST"], "ARIN": ["AS-TEST2"]}} +COMMON_RPKI_ROUTE = { + "rpkiStatus": "VALID", + "rpkiMaxLength": 24, + "asn": 12345, + "rpslText": "route: 192.0.2.0/24", + "rpslPk": "192.0.2.0/24AS12345/ML24", +} + +COMMON_PREFIX_INFO = { + "prefix": "192.0.2.0/24", + "categoryOverall": "success", + "rir": "RIPE", + "rpkiRoutes": [COMMON_RPKI_ROUTE], + "bgpOrigins": [12345], + "irrRoutes": {}, + "messages": [], + "prefixSortKey": "1", + "goodnessOverall": 1, +} + +COMMON_ASN_DATA = { + "directOrigin": [COMMON_PREFIX_INFO], + "overlaps": [], +} + +COMMON_SETS_DATA = { + "setsPerIrr": { + "RIPE": ["AS-TEST1", "AS-TEST2"], + "ARIN": ["AS-TEST3"], + "AFRINIC": ["AS-TEST4", "AS-TEST5", "AS-TEST6"], + } +} + + +def create_basic_prefix_info(**kwargs: Any) -> PrefixInfo: + """Create a basic PrefixInfo object.""" + data = COMMON_PREFIX_INFO.copy() + data.update(kwargs) + return PrefixInfo.model_validate(data) diff --git a/tests/test_cli.py b/tests/test_cli.py new file mode 100644 index 0000000..6d59008 --- /dev/null +++ b/tests/test_cli.py @@ -0,0 +1,111 @@ +"""Test suite for CLI functionality.""" + +from unittest.mock import patch + +import httpx +import pytest +from typer.testing import CliRunner + +from irrexplorer_cli.irrexplorer import IrrExplorer +from irrexplorer_cli.main import app + +runner = CliRunner() + + +def test_query_command_http_error() -> None: + """Test HTTP error handling in query command.""" + with patch("irrexplorer_cli.irrexplorer.IrrExplorer.fetch_prefix_info") as mock_fetch: + mock_fetch.side_effect = httpx.HTTPError("Test error") + result = runner.invoke(app, ["prefix", "5.57.21.0/24"], catch_exceptions=True) + assert result.exit_code + + +@pytest.mark.asyncio +async def test_explorer_close_error() -> None: + """Test error handling during explorer client closure.""" + explorer = IrrExplorer() + with patch.object(explorer.client, "aclose") as mock_close: + mock_close.side_effect = RuntimeError("Close error") + try: + await explorer.close() + except RuntimeError as e: + assert str(e) == "Close error" + + +def test_callback_without_version() -> None: + """Test callback execution without version flag.""" + result = runner.invoke(app) + assert not result.exit_code + + +def test_query_empty_prefix() -> None: + """Test query command with empty prefix""" + result = runner.invoke(app, ["prefix", ""]) + assert "Usage: root prefix" in result.stdout + + +def test_asn_command() -> None: + """Test ASN query command.""" + with patch("irrexplorer_cli.irrexplorer.IrrExplorer.fetch_asn_info") as mock_fetch: + mock_fetch.return_value = {"directOrigin": [], "overlaps": []} + result = runner.invoke(app, ["asn", "AS202196"]) + assert not result.exit_code + + +def test_json_output_format() -> None: + """Test JSON output format.""" + result = runner.invoke(app, ["prefix", "5.57.21.0/24", "--format", "json"]) + assert not result.exit_code + + +def test_csv_output_format() -> None: + """Test CSV output format.""" + result = runner.invoke(app, ["prefix", "5.57.21.0/24", "--format", "csv"]) + assert not result.exit_code + + +def test_query_command() -> None: + """Test prefix query command execution.""" + result = runner.invoke(app, ["prefix", "5.57.21.0/24"]) + assert not result.exit_code + + +def test_query_command_invalid_prefix() -> None: + """Test query command with invalid prefix.""" + result = runner.invoke(app, ["prefix", "invalid"]) + assert "Error: Invalid prefix format" in result.stdout + + +def test_version_command() -> None: + """Test version command output.""" + result = runner.invoke(app, ["--version"]) + assert not result.exit_code + assert "IRR Explorer CLI" in result.stdout + + +def test_help_command() -> None: + """Test help command display.""" + result = runner.invoke(app, ["--help"]) + assert not result.exit_code + assert "Usage" in result.stdout + assert "Options" in result.stdout + + +def test_query_without_prefix() -> None: + """Test query command without prefix argument.""" + result = runner.invoke(app, ["query"]) + assert "Usage" in result.stdout + + +def test_format_option() -> None: + """Test format option handling.""" + result = runner.invoke(app, ["prefix", "5.57.21.0/24", "--format", "json"]) + assert not result.exit_code + + +def test_asn_query_command() -> None: + """Test ASN query command execution.""" + with patch("irrexplorer_cli.main.async_asn_query", return_value=None) as mock_query: + result = runner.invoke(app, ["asn", "AS202196"]) + assert not result.exit_code + mock_query.assert_called_once_with("AS202196", None) diff --git a/tests/test_helpers.py b/tests/test_helpers.py new file mode 100644 index 0000000..9886b1b --- /dev/null +++ b/tests/test_helpers.py @@ -0,0 +1,144 @@ +"""Test suite for helper functions.""" + +from typing import Any, Dict, List + +import pytest + +from irrexplorer_cli.helpers import ( + find_least_specific_prefix, + format_as_sets, + format_direct_origins, + format_overlapping_prefixes, + format_prefix_result, + validate_asn_format, + validate_prefix_format, +) +from tests.fixtures import ( + COMMON_ASN_DATA, + COMMON_PREFIX_INFO, + COMMON_RPKI_ROUTE, + COMMON_SETS_DATA, + create_basic_prefix_info, +) + + +def test_validate_prefix_format() -> None: + """Test prefix format validation.""" + assert validate_prefix_format("192.0.2.0/24") is True + assert validate_prefix_format("2001:db8::/32") is True + assert validate_prefix_format("invalid") is False + assert validate_prefix_format("256.256.256.256/24") is False + + +def test_validate_asn_format() -> None: + """Test ASN format validation.""" + assert validate_asn_format("AS12345") is True + assert validate_asn_format("12345") is True + assert validate_asn_format("as12345") is True + assert validate_asn_format("invalid") is False + assert validate_asn_format("AS4294967296") is False # Too large + + +def test_format_prefix_result() -> None: + """Test prefix result formatting.""" + prefix_info = create_basic_prefix_info( + messages=[{"category": "info", "text": "Test message"}], irrRoutes={"RIPE": []} + ) + + result = format_prefix_result(prefix_info, "DIRECT") + assert isinstance(result, str) + assert "192.0.2.0/24" in result + assert "VALID" in result + + +@pytest.mark.asyncio +async def test_find_least_specific_prefix() -> None: + """Test finding least specific prefix.""" + overlaps = [ + create_basic_prefix_info(prefix="192.0.2.0/24"), + create_basic_prefix_info(prefix="192.0.2.0/23"), + create_basic_prefix_info(prefix="192.0.2.0/25"), + ] + result = await find_least_specific_prefix(overlaps) + assert result == "192.0.2.0/23" + + +def test_format_as_sets() -> None: + """Test AS sets formatting.""" + sets_data = {"setsPerIrr": {"RIPE": ["AS-TEST1", "AS-TEST2"]}} + format_as_sets("AS12345", sets_data) # Test output formatting + + +def test_format_direct_origins_with_empty_results() -> None: + """Test direct origins formatting with empty results.""" + empty_results: Dict[str, List[Any]] = {"directOrigin": []} + format_direct_origins("AS12345", empty_results) + + +def test_format_overlapping_prefixes_with_complex_data() -> None: + """Test overlapping prefixes with multiple IRR routes.""" + complex_prefix_info = COMMON_PREFIX_INFO.copy() + complex_prefix_info.update( + { + "bgpOrigins": [12345, 67890], + "irrRoutes": {"RIPE": [COMMON_RPKI_ROUTE]}, + "messages": [ + {"category": "info", "text": "Test message 1"}, + {"category": "warning", "text": "Test message 2"}, + ], + } + ) + + complex_data = {"overlaps": [complex_prefix_info]} + + format_overlapping_prefixes("AS12345", complex_data) + + +@pytest.mark.asyncio +async def test_find_least_specific_prefix_empty() -> None: + """Test finding least specific prefix with empty list.""" + result = await find_least_specific_prefix([]) + assert result is None + + +def test_format_direct_origins_with_multiple_routes() -> None: + """Test direct origins formatting with multiple routes.""" + multi_route_prefix = COMMON_PREFIX_INFO.copy() + multi_route_prefix.update( + { + "rpkiRoutes": [], # Test no RPKI routes case + "bgpOrigins": [12345, 67890], + "irrRoutes": { + "RIPE": [COMMON_RPKI_ROUTE], + "ARIN": [ + { + "rpkiStatus": "INVALID", + "rpkiMaxLength": 24, + "asn": 67890, + "rpslText": "route: 192.0.2.0/24", + "rpslPk": "192.0.2.0/24AS67890/ML24", + } + ], + }, + } + ) + + test_data = {"directOrigin": [multi_route_prefix]} + + format_direct_origins("AS12345", test_data) + + +def test_format_as_sets_with_multiple_irrs() -> None: + """Test AS sets formatting with multiple IRR databases.""" + format_as_sets("AS12345", COMMON_SETS_DATA) + + +def test_format_as_sets_empty() -> None: + """Test AS sets formatting with empty data.""" + format_as_sets("AS12345", {"setsPerIrr": {}}) + format_as_sets("AS12345", {}) + + +def test_format_direct_origins_with_rpki_routes() -> None: + """Test direct origins formatting with RPKI routes.""" + format_direct_origins("AS12345", COMMON_ASN_DATA) diff --git a/tests/test_irrexplorer.py b/tests/test_irrexplorer.py new file mode 100644 index 0000000..cb5aa23 --- /dev/null +++ b/tests/test_irrexplorer.py @@ -0,0 +1,199 @@ +"""Test suite for IRR Explorer core functionality.""" + +from typing import Any +from unittest.mock import AsyncMock, patch + +import httpx +import pytest + +from irrexplorer_cli.irrexplorer import IrrDisplay, IrrExplorer +from irrexplorer_cli.models import PrefixInfo +from tests.fixtures import COMMON_ASN_DATA, COMMON_PREFIX_INFO, COMMON_SETS_DATA, create_basic_prefix_info + + +@pytest.mark.asyncio +async def test_fetch_prefix_info() -> None: + """Test successful prefix information retrieval.""" + explorer = IrrExplorer() + result = await explorer.fetch_prefix_info("5.57.21.0/24") + assert isinstance(result, list) + assert all(isinstance(item, PrefixInfo) for item in result) + await explorer.close() + + +@pytest.mark.asyncio +async def test_display_prefix_info() -> None: + """Test prefix information display formatting.""" + display = IrrDisplay() + test_data = [create_basic_prefix_info(prefix="5.57.21.0/24", rpkiRoutes=[])] + await display.display_prefix_info(test_data) + + +@pytest.mark.asyncio +async def test_create_prefix_panel() -> None: + """Test panel creation with prefix information.""" + display = IrrDisplay() + test_info = create_basic_prefix_info(prefix="5.57.21.0/24", rpkiRoutes=[]) + panel = await display.create_prefix_panel(test_info) + assert panel is not None + + +@pytest.mark.asyncio +async def test_display_prefix_info_least_specific_error() -> None: + """Test error handling for least specific prefix display.""" + display = IrrDisplay() + test_data = [create_basic_prefix_info(prefix="5.57.0.0/16", rpkiRoutes=[])] + + with patch("irrexplorer_cli.irrexplorer.IrrExplorer") as mock_explorer_class: + mock_instance = mock_explorer_class.return_value + mock_instance.fetch_prefix_info = AsyncMock(side_effect=ValueError("Test error")) + mock_instance.close = AsyncMock() + + with patch.object(display.console, "print") as mock_print: + await display.display_prefix_info(test_data) + mock_print.assert_called_with("[red]Error fetching overlaps for 5.57.0.0/16: Test error[/red]") + + +@pytest.mark.asyncio +async def test_fetch_asn_sets() -> None: + """Test AS sets information retrieval.""" + explorer = IrrExplorer() + result = await explorer.fetch_asn_sets("AS202196") + assert isinstance(result, dict) + await explorer.close() + + +@pytest.mark.asyncio +async def test_display_status_categories() -> None: + """Test status category display styles.""" + display = IrrDisplay() + styles = { + "success": await display.get_status_style("success"), + "warning": await display.get_status_style("warning"), + "error": await display.get_status_style("error"), + } + assert all(isinstance(style, str) for style in styles.values()) + + +@pytest.mark.asyncio +async def test_asn_info_display() -> None: + """Test ASN info display with complete data.""" + display = IrrDisplay() + await display.display_asn_info(COMMON_ASN_DATA, "AS12345", COMMON_SETS_DATA) + + +@pytest.mark.asyncio +async def test_display_overlaps() -> None: + """Test overlaps display functionality.""" + display = IrrDisplay() + + # Create data using the fixture but override specific fields + overlap_info = COMMON_PREFIX_INFO.copy() + overlap_info.update({"categoryOverall": "warning", "rpkiRoutes": [], "irrRoutes": {"RIPE": []}}) + + data = {"overlaps": [overlap_info]} + + await display.display_overlaps(data, "AS12345") + + +@pytest.mark.asyncio +async def test_sort_and_group_panels() -> None: + """Test panel sorting and grouping.""" + display = IrrDisplay() + + prefix_infos = [ + create_basic_prefix_info(prefix="192.0.2.0/24", categoryOverall="success", rpkiRoutes=[]), + create_basic_prefix_info(prefix="192.0.2.0/25", categoryOverall="warning", rpkiRoutes=[]), + ] + + panels = await display.sort_and_group_panels(prefix_infos) + assert len(panels) > 0 + + +@pytest.mark.asyncio +async def test_fetch_asn_info_error() -> None: + """Test ASN info fetch error handling.""" + explorer = IrrExplorer() + with patch("httpx.AsyncClient.get") as mock_get: + mock_get.side_effect = httpx.RequestError("Connection error") + with pytest.raises(httpx.RequestError): + await explorer.fetch_asn_info("AS12345") + await explorer.close() + + +@pytest.mark.asyncio +async def test_display_direct_origins_with_errors() -> None: + """Test direct origins display with error conditions.""" + display = IrrDisplay() + + error_prefix = COMMON_PREFIX_INFO.copy() + error_prefix.update( + { + "categoryOverall": "error", + "rpkiRoutes": [], + "bgpOrigins": [], + "messages": [{"category": "error", "text": "Test error"}], + "goodnessOverall": 0, + } + ) + + data = {"directOrigin": [error_prefix]} + + await display.display_direct_origins(data, "AS12345") + + +@pytest.mark.asyncio +async def test_display_all_overlaps_error() -> None: + """Test all overlaps display with error handling.""" + display = IrrDisplay() + with patch("irrexplorer_cli.irrexplorer.IrrExplorer.fetch_prefix_info") as mock_fetch: + mock_fetch.side_effect = httpx.HTTPError("Test error") + await display.display_all_overlaps("192.0.2.0/24") + + +@pytest.mark.asyncio +async def test_fetch_asn_info_with_backoff() -> None: + """Test ASN info fetch with backoff retry.""" + explorer = IrrExplorer() + mock_request = httpx.Request("GET", "https://irrexplorer.nlnog.net/api/prefixes/asn/AS12345") + mock_response = httpx.Response(200, json={}, request=mock_request) + + with patch("httpx.AsyncClient.get") as mock_get: + mock_get.side_effect = [ + httpx.RequestError("Timeout"), # First attempt fails + mock_response, # Second attempt succeeds + ] + result = await explorer.fetch_asn_info("AS12345") + assert isinstance(result, dict) + await explorer.close() + + +@pytest.mark.asyncio +async def test_display_prefix_info_with_overlaps() -> None: + """Test prefix info display with overlapping prefixes.""" + display = IrrDisplay() + test_data = [ + create_basic_prefix_info(prefix="192.0.2.0/16", categoryOverall="info", rpkiRoutes=[]), + create_basic_prefix_info(prefix="192.0.2.0/24", categoryOverall="danger", rpkiRoutes=[]), + ] + await display.display_prefix_info(test_data) + + +def test_get_rpki_status() -> None: + """Test RPKI status extraction from prefix info.""" + display = IrrDisplay() + + # Test with RPKI routes present + prefix_with_routes = {"rpkiRoutes": [{"rpkiStatus": "VALID"}]} + + # Test with RPKI routes present + prefix_with_routes = {"rpkiRoutes": [{"rpkiStatus": "VALID"}]} + assert display.get_rpki_status(prefix_with_routes) == "VALID" + + # Test with no RPKI routes + prefix_without_routes: dict[str, list[Any]] = {"rpkiRoutes": []} + assert display.get_rpki_status(prefix_without_routes) == "UNKNOWN" + + # Test with missing rpkiRoutes key + prefix_missing_routes: dict[str, Any] = {} + assert display.get_rpki_status(prefix_missing_routes) == "UNKNOWN" diff --git a/tests/test_main.py b/tests/test_main.py new file mode 100644 index 0000000..09b1476 --- /dev/null +++ b/tests/test_main.py @@ -0,0 +1,78 @@ +"""Test cases for main module.""" + +from unittest.mock import patch + +import pytest +import typer +from click.core import Command, Context +from typer.testing import CliRunner + +from irrexplorer_cli.main import app, asn, prefix + +runner = CliRunner() + + +def test_exit_with_version() -> None: + """Test exit with version flag.""" + result = runner.invoke(app, ["--version"]) + assert not result.exit_code + + +def test_exit_with_help() -> None: + """Test exit with help display.""" + result = runner.invoke(app, ["prefix"]) + assert "Usage" in result.stdout + assert not result.exit_code + + +def test_prefix_command_without_context() -> None: + """Test prefix command without context.""" + with patch("typer.Context", return_value=None): + result = runner.invoke(app, ["prefix", ""]) + assert not result.exit_code + + +def test_invalid_asn_format() -> None: + """Test exit with invalid ASN format.""" + result = runner.invoke(app, ["asn", "invalid-asn"]) + assert "Error: Invalid ASN format" in result.stdout + assert result.exit_code == 1 + + +def test_asn_format_conversion() -> None: + """Test ASN format conversion.""" + result = runner.invoke(app, ["asn", "12345"]) + assert not result.exit_code + + +def test_prefix_command_with_context() -> None: + """Test prefix command with context.""" + result = runner.invoke(app, ["prefix"]) + assert not result.exit_code + assert "Usage" in result.stdout + + +def test_asn_command_with_context() -> None: + """Test ASN command with context.""" + result = runner.invoke(app, ["asn"]) + assert not result.exit_code + assert "Usage" in result.stdout + + +def test_asn_command_without_context() -> None: + """Test ASN command without context.""" + with patch("typer.Context", return_value=None): + result = runner.invoke(app, ["asn", ""]) + assert not result.exit_code + + +def test_prefix_direct_no_args() -> None: + """Test prefix function directly with no arguments.""" + with pytest.raises(typer.Exit): + prefix("", "", Context(Command("test"))) + + +def test_asn_direct_no_args() -> None: + """Test asn function directly with no arguments.""" + with pytest.raises(typer.Exit): + asn("", "", Context(Command("test"))) diff --git a/tests/test_models.py b/tests/test_models.py new file mode 100644 index 0000000..d359124 --- /dev/null +++ b/tests/test_models.py @@ -0,0 +1,50 @@ +"""Test suite for data models.""" + +from typing import Dict, List + +from irrexplorer_cli.models import AsResponse, AsSets, IrrRoute, PrefixInfo +from tests.fixtures import COMMON_PREFIX_INFO, COMMON_RPKI_ROUTE + + +def test_prefix_info_model() -> None: + """Test PrefixInfo model validation and instantiation.""" + data = COMMON_PREFIX_INFO.copy() + data.update( + { + "rir": "RIPE NCC", + "bgpOrigins": [202196], + "categoryOverall": "warning", + "messages": [{"category": "warning", "text": "Test message"}], + } + ) + + prefix_info = PrefixInfo.model_validate(data) + assert prefix_info.prefix == "192.0.2.0/24" # Updated to match fixture + assert prefix_info.rir == "RIPE NCC" + assert prefix_info.bgpOrigins == [202196] + assert prefix_info.categoryOverall == "warning" + assert len(prefix_info.rpkiRoutes) == 1 + assert len(prefix_info.messages) == 1 + assert prefix_info.irrRoutes == {} + + +def test_irr_route_model() -> None: + """Test IrrRoute model validation.""" + irr_route = IrrRoute.model_validate(COMMON_RPKI_ROUTE) + assert irr_route.rpkiStatus == "VALID" + assert irr_route.asn == 12345 + + +def test_as_response_model() -> None: + """Test AsResponse model validation.""" + data: Dict[str, List[PrefixInfo]] = {"directOrigin": [], "overlaps": []} + response = AsResponse.model_validate(data) + assert isinstance(response.directOrigin, list) + assert isinstance(response.overlaps, list) + + +def test_as_sets_model() -> None: + """Test AsSets model validation.""" + data: Dict[str, Dict[str, List[str]]] = {"setsPerIrr": {"RIPE": ["AS-TEST"]}} + sets = AsSets.model_validate(data) + assert "RIPE" in sets.setsPerIrr diff --git a/tests/test_queries.py b/tests/test_queries.py new file mode 100644 index 0000000..3a8de7f --- /dev/null +++ b/tests/test_queries.py @@ -0,0 +1,50 @@ +""" Tests for the queries module. """ + +import json +from typing import Any, Dict, List +from unittest.mock import patch + +import httpx +import pytest + +from irrexplorer_cli.irrexplorer import IrrExplorer +from irrexplorer_cli.queries import async_asn_query, process_overlaps + + +@pytest.mark.asyncio +async def test_asn_query_json_output() -> None: + """Test ASN query with JSON output.""" + mock_results: Dict[str, List[Any]] = {"directOrigin": [], "overlaps": []} + mock_sets: Dict[str, Dict[str, Any]] = {"setsPerIrr": {}} + + with ( + patch("irrexplorer_cli.irrexplorer.IrrExplorer.fetch_asn_info", return_value=mock_results), + patch("irrexplorer_cli.irrexplorer.IrrExplorer.fetch_asn_sets", return_value=mock_sets), + patch("builtins.print") as mock_print, + ): + await async_asn_query("AS12345", "json") + mock_print.assert_called_with(json.dumps({"asn_info": mock_results, "as_sets": mock_sets}, indent=2), end="\n") + + +@pytest.mark.asyncio +async def test_asn_query_csv_output() -> None: + """Test ASN query with CSV output.""" + mock_results: Dict[str, List[Any]] = {"directOrigin": [], "overlaps": []} + mock_sets: Dict[str, Dict[str, Any]] = {"setsPerIrr": {}} + + with ( + patch("irrexplorer_cli.irrexplorer.IrrExplorer.fetch_asn_info", return_value=mock_results), + patch("irrexplorer_cli.irrexplorer.IrrExplorer.fetch_asn_sets", return_value=mock_sets), + patch("builtins.print") as mock_print, + ): + await async_asn_query("AS12345", "csv") + mock_print.assert_any_call("Type,ASN,Prefix,Category,RIR,RPKI_Status,BGP_Origins,IRR_Routes,Messages", end="") + + +@pytest.mark.asyncio +async def test_process_overlaps_error_handling() -> None: + """Test error handling in process_overlaps function.""" + explorer = IrrExplorer() + with patch("irrexplorer_cli.irrexplorer.IrrExplorer.fetch_prefix_info", side_effect=httpx.HTTPError("Test error")): + await process_overlaps(explorer, "192.0.2.0/24") + await explorer.close()