From 83bb925e1ff6e360d24db878a86adcb9d896785a Mon Sep 17 00:00:00 2001 From: kiraum Date: Thu, 19 Dec 2024 23:46:41 +0100 Subject: [PATCH] feat(irrexplorer-cli): init --- .github/dependabot.yml | 11 ++ .github/workflows/build_and_publish.yml | 38 ++++ .github/workflows/linter.yml | 30 +++ .pre-commit-config.yaml | 52 +++++ CHANGELOG.md | 24 +++ README.md | 91 ++++++++- irrexplorer_cli/__init__.py | 0 irrexplorer_cli/helpers.py | 123 ++++++++++++ irrexplorer_cli/irrexplorer.py | 247 ++++++++++++++++++++++++ irrexplorer_cli/main.py | 78 ++++++++ irrexplorer_cli/models.py | 76 ++++++++ irrexplorer_cli/py.typed | 0 irrexplorer_cli/queries.py | 73 +++++++ pyproject.toml | 73 +++++++ requirements.lock | 124 ++++++++++++ tests/__init__.py | 0 tests/fixtures.py | 46 +++++ tests/test_cli.py | 111 +++++++++++ tests/test_helpers.py | 144 ++++++++++++++ tests/test_irrexplorer.py | 199 +++++++++++++++++++ tests/test_main.py | 78 ++++++++ tests/test_models.py | 50 +++++ tests/test_queries.py | 50 +++++ 23 files changed, 1717 insertions(+), 1 deletion(-) create mode 100644 .github/dependabot.yml create mode 100644 .github/workflows/build_and_publish.yml create mode 100644 .github/workflows/linter.yml create mode 100644 .pre-commit-config.yaml create mode 100644 CHANGELOG.md create mode 100644 irrexplorer_cli/__init__.py create mode 100644 irrexplorer_cli/helpers.py create mode 100644 irrexplorer_cli/irrexplorer.py create mode 100644 irrexplorer_cli/main.py create mode 100644 irrexplorer_cli/models.py create mode 100644 irrexplorer_cli/py.typed create mode 100644 irrexplorer_cli/queries.py create mode 100644 pyproject.toml create mode 100644 requirements.lock create mode 100644 tests/__init__.py create mode 100644 tests/fixtures.py create mode 100644 tests/test_cli.py create mode 100644 tests/test_helpers.py create mode 100644 tests/test_irrexplorer.py create mode 100644 tests/test_main.py create mode 100644 tests/test_models.py create mode 100644 tests/test_queries.py diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..9d866e3 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,11 @@ +# To get started with Dependabot version updates, you'll need to specify which +# package ecosystems to update and where the package manifests are located. +# Please see the documentation for all configuration options: +# https://docs.github.com/code-security/dependabot/dependabot-version-updates/configuration-options-for-the-dependabot.yml-file + +version: 2 +updates: + - package-ecosystem: "pip" # See documentation for possible values + directory: "/" # Location of package manifests + schedule: + interval: "weekly" diff --git a/.github/workflows/build_and_publish.yml b/.github/workflows/build_and_publish.yml new file mode 100644 index 0000000..4342d6d --- /dev/null +++ b/.github/workflows/build_and_publish.yml @@ -0,0 +1,38 @@ +name: Publish to PyPI + +on: + push: + tags: + - 'v*' + +jobs: + publish: + runs-on: ubuntu-latest + permissions: + id-token: write + + steps: + - uses: actions/checkout@v4 + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.13' + + - name: Install UV and build tools + run: | + pip install --upgrade uv + uv pip install build + + - name: Verify CHANGELOG + run: | + if ! grep -q "## \[$(git describe --tags --abbrev=0 | sed 's/v//')\]" CHANGELOG.md; then + echo "CHANGELOG.md not updated for this version" + exit 1 + fi + + - name: Build package + run: uv build + + - name: Publish to PyPI + uses: pypa/gh-action-pypi-publish@release/v1 diff --git a/.github/workflows/linter.yml b/.github/workflows/linter.yml new file mode 100644 index 0000000..b723e3a --- /dev/null +++ b/.github/workflows/linter.yml @@ -0,0 +1,30 @@ +name: Test (linter/formatter/coverage) + +on: [push] + +jobs: + build: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.13"] + + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install dependencies + run: | + pip install --upgrade uv + uv pip sync --system --break-system-packages requirements.lock + + - name: Install package + run: | + pip install -e . + + - name: Run all linters and formatters + run: | + pre-commit run --all-files diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..37a07e9 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,52 @@ +repos: +- repo: https://github.com/pre-commit/pre-commit-hooks + rev: 'v5.0.0' + hooks: + - id: check-yaml + - id: end-of-file-fixer + - id: trailing-whitespace + - id: check-toml + - id: check-added-large-files + +- repo: local + hooks: + - id: isort + name: isort + entry: isort + language: system + types: [python] + + - id: black + name: black + entry: black + language: python + types_or: [python, pyi] + + - id: ruff + name: ruff + entry: ruff check + language: python + types_or: [python, pyi] + + - id: mypy + name: mypy + entry: mypy + language: python + types_or: [python, pyi] + require_serial: true + + - id: pylint + name: pylint + entry: pylint + language: system + types: [python] + args: + - --rcfile=pyproject.toml + + - id: pytest + name: pytest + entry: pytest + language: system + types: [python] + pass_filenames: false + args: ['--cov=irrexplorer_cli', '--cov-fail-under=100', 'tests/'] diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..ee0f083 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,24 @@ +# Changelog +All notable changes to irrexplorer-cli will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [0.0.1] - 2024-12-21 +### Added +- Initial development release +- Command-line interface for IRRexplorer.net queries +- Prefix information lookup functionality +- ASN details lookup functionality +- Multiple output formats (json, csv, text) +- Async support for efficient data retrieval +- Integration with IRRexplorer v2 service +- Support for Python 3.13+ +- Documentation and usage examples + +### Dependencies +- httpx for HTTP requests +- typer for CLI interface +- rich for text formatting + +[0.0.1]: https://github.com/kiraum/irrexplorer-cli/releases/tag/v0.0.1 diff --git a/README.md b/README.md index 14fba05..c8aa935 100644 --- a/README.md +++ b/README.md @@ -1 +1,90 @@ -# irrexplorer-cli \ No newline at end of file +# IRRexplorer CLI + +A command-line interface to query and explore IRR & BGP data from IRRexplorer.net in real-time. + +## Overview + +IRRexplorer CLI provides a simple way to access and analyze Internet Routing Registry (IRR) and BGP data through the command line. It interfaces with the IRRexplorer v2 service to help network operators and administrators debug routing data and verify filtering strategies. + +## Features + +- Query prefix information +- Lookup ASN details +- Real-time data access from IRRexplorer.net +- Easy-to-use command-line interface +- Async support for efficient data retrieval + +## Installation + +```bash +pip install irrexplorer-cli +``` + +## Usage + +Query Prefix Information +```bash +irrexplorer prefix 192.0.2.0/24 +``` + +Query ASN Information +```bash +irrexplorer asn AS64496 +``` + +The `-f` or `--format` flag allows you to specify the output format: + +* `json`: Output results in JSON format +* `csv`: Output results in CSV format +* Default format is human-readable text + +## Requirements + +* Python 3.13+ +* httpx +* typer +* rich + +## Development + +1. Clone the repository: +```bash +git clone https://github.com/kiraum/irrexplorer-cli.git +``` + +2. Create/activate venv: +```bash +python3 -m venv venv +. venv/bin/activate +``` + +3. Install dependencies: +```bash +pip install --upgrade uv +uv pip sync requirements.lock +``` + +4. Run pre-commit tests before to push: +```bash +pre-commit run --all-files +``` + +## Data Sources + +The CLI tool queries data from IRRexplorer.net, which includes: + +* IRR objects and relations (route(6) and as-sets) +* RPKI ROAs and validation status +* BGP origins from DFZ +* RIRstats + +## Contributing +Contributions are welcome! Please feel free to submit a Pull Request. + +## License + +See [LICENSE](LICENSE) file for details. + +## Credits + +This tool interfaces with IRRexplorer v2, a project maintained by Stichting NLNOG and DashCare BV. diff --git a/irrexplorer_cli/__init__.py b/irrexplorer_cli/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/irrexplorer_cli/helpers.py b/irrexplorer_cli/helpers.py new file mode 100644 index 0000000..4fffd4a --- /dev/null +++ b/irrexplorer_cli/helpers.py @@ -0,0 +1,123 @@ +"""Helper functions for the CLI.""" + +import ipaddress +import re +from typing import Any, Dict, List + +from irrexplorer_cli.models import PrefixInfo, PrefixResult + + +def validate_prefix_format(prefix_input: str) -> bool: + """Validate IPv4 or IPv6 prefix format.""" + try: + ipaddress.ip_network(prefix_input) + return True + except ValueError: + return False + + +def validate_asn_format(asn_input: str) -> bool: + """Validate ASN format.""" + asn_pattern = r"^(?:AS|as)?(\d+)$" + match = re.match(asn_pattern, asn_input) + if match: + asn_number = int(match.group(1)) + return 0 <= asn_number <= 4294967295 + return False + + +def format_prefix_result(result: PrefixInfo, prefix_type: str) -> str: + """Format a single prefix result for CSV output.""" + prefix_result = PrefixResult( + prefix=result.prefix, + categoryOverall=result.categoryOverall, + rir=result.rir, + rpkiRoutes=result.rpkiRoutes, + bgpOrigins=result.bgpOrigins, + irrRoutes=result.irrRoutes, + messages=result.messages, + ) + + rpki_status = "NOT_FOUND" + if prefix_result.rpkiRoutes: + rpki_status = prefix_result.rpkiRoutes[0].rpkiStatus + + bgp_origins = "|".join(str(asn) for asn in prefix_result.bgpOrigins) + + irr_routes = [] + for db, routes in prefix_result.irrRoutes.items(): + for route in routes: + irr_routes.append(f"{db}:AS{route.asn}:{route.rpkiStatus}") + irr_routes_str = "|".join(irr_routes) + + messages = "|".join(msg.text for msg in prefix_result.messages) + + return ( + f"{prefix_type},{prefix_result.prefix},{prefix_result.categoryOverall}," + f"{prefix_result.rir},{rpki_status},{bgp_origins},{irr_routes_str},{messages}" + ) + + +def format_direct_origins(as_number: str, results: Dict[str, List[Dict[str, Any]]]) -> None: + """Format and print direct origin prefixes.""" + for pfx_dict in results.get("directOrigin", []): + pfx = PrefixInfo(**pfx_dict) + rpki_status = "NOT_FOUND" + if pfx.rpkiRoutes: + rpki_status = pfx.rpkiRoutes[0].rpkiStatus + + bgp_origins = "|".join(str(as_number) for as_number in pfx.bgpOrigins) + irr_routes = [] + for db, routes in pfx.irrRoutes.items(): + for route in routes: + irr_routes.append(f"{db}:AS{route.asn}:{route.rpkiStatus}") + irr_routes_str = "|".join(irr_routes) + messages = "|".join(msg.text for msg in pfx.messages) + + print( + f"\nDIRECT,{as_number},{pfx.prefix},{pfx.categoryOverall},{pfx.rir}," + f"{rpki_status},{bgp_origins},{irr_routes_str},{messages}", + end="", + ) + + +def format_overlapping_prefixes(as_number: str, results: Dict[str, List[Dict[str, Any]]]) -> None: + """Format and print overlapping prefixes.""" + for pfx_dict in results.get("overlaps", []): + pfx = PrefixInfo(**pfx_dict) + rpki_status = "NOT_FOUND" + if pfx.rpkiRoutes: + rpki_status = pfx.rpkiRoutes[0].rpkiStatus + + bgp_origins = "|".join(str(as_number) for as_number in pfx.bgpOrigins) + irr_routes = [] + for db, routes in pfx.irrRoutes.items(): + for route in routes: + irr_routes.append(f"{db}:AS{route.asn}:{route.rpkiStatus}") + irr_routes_str = "|".join(irr_routes) + messages = "|".join(msg.text for msg in pfx.messages) + + print( + f"\nOVERLAP,{as_number},{pfx.prefix},{pfx.categoryOverall},{pfx.rir}," + f"{rpki_status},{bgp_origins},{irr_routes_str},{messages}", + end="", + ) + + +def format_as_sets(as_number: str, sets_data: Dict[str, Dict[str, List[str]]]) -> None: + """Format and print AS sets.""" + if sets_data and sets_data.get("setsPerIrr"): + for irr, sets in sets_data["setsPerIrr"].items(): + for as_set in sets: + print(f"\nSET,{as_number},{as_set},{irr},N/A,N/A,N/A,N/A,N/A", end="") + + +async def find_least_specific_prefix(direct_overlaps: List[PrefixInfo]) -> str | None: + """Find the least specific prefix from the overlaps.""" + least_specific = None + for info in direct_overlaps: + if "/" in info.prefix: + _, mask = info.prefix.split("/") + if least_specific is None or int(mask) < int(least_specific.split("/")[1]): + least_specific = info.prefix + return least_specific diff --git a/irrexplorer_cli/irrexplorer.py b/irrexplorer_cli/irrexplorer.py new file mode 100644 index 0000000..2cef09d --- /dev/null +++ b/irrexplorer_cli/irrexplorer.py @@ -0,0 +1,247 @@ +"""Core functionality for IRR Explorer CLI.""" + +from typing import Any, Dict, List, Optional, cast + +import backoff +import httpx +from rich.columns import Columns +from rich.console import Console +from rich.panel import Panel +from rich.table import Table +from rich.text import Text + +from irrexplorer_cli.helpers import find_least_specific_prefix + +from .models import PrefixInfo + + +class IrrExplorer: + """IRR Explorer API client for prefix information retrieval.""" + + def __init__(self, base_url: str = "https://irrexplorer.nlnog.net") -> None: + """Initialize IRR Explorer client with base URL.""" + self.base_url = base_url + self.client = httpx.AsyncClient() + + @backoff.on_exception(backoff.expo, (httpx.HTTPError, httpx.RequestError), max_tries=3, max_time=30) + async def fetch_prefix_info(self, prefix: str) -> List[PrefixInfo]: + """Fetch prefix information from IRR Explorer API.""" + url = f"{self.base_url}/api/prefixes/prefix/{prefix}" + response = await self.client.get(url) + response.raise_for_status() + return [PrefixInfo(**item) for item in response.json()] + + @backoff.on_exception(backoff.expo, (httpx.HTTPError, httpx.RequestError), max_tries=3, max_time=30) + async def fetch_asn_info(self, asn: str) -> Dict[str, Any]: + """Fetch prefix information for an AS number.""" + url = f"https://irrexplorer.nlnog.net/api/prefixes/asn/{asn}" + async with httpx.AsyncClient() as client: + response = await client.get(url) + response.raise_for_status() + return cast(Dict[str, Any], response.json()) + + @backoff.on_exception(backoff.expo, (httpx.HTTPError, httpx.RequestError), max_tries=3, max_time=30) + async def fetch_asn_sets(self, asn: str) -> Dict[str, Any]: + """Fetch AS sets information for an AS number.""" + url = f"{self.base_url}/api/sets/member-of/{asn}" + response = await self.client.get(url) + response.raise_for_status() + return cast(Dict[str, Any], response.json()) + + async def close(self) -> None: + """Close the HTTP client connection.""" + await self.client.aclose() + + +class IrrDisplay: + """Display handler for IRR Explorer prefix information.""" + + def __init__(self) -> None: + """Initialize display handler with Rich console.""" + self.console = Console() + + async def create_prefix_panel(self, info: PrefixInfo) -> Panel: + """Create Rich panel with prefix information.""" + table = Table(show_header=True, header_style="bold cyan", expand=True) + table.add_column("Property") + table.add_column("Value") + + # Add basic info asynchronously + await self.add_basic_info(table, info) + await self.add_bgp_origins(table, info) + await self.add_rpki_routes(table, info) + await self.add_irr_routes(table, info) + await self.add_status(table, info) + await self.add_messages(table, info) + + return Panel( + table, + title=f"[bold]{info.prefix}[/bold]", + expand=False, + border_style=await self.get_status_style(info.categoryOverall), + ) + + async def sort_and_group_panels(self, prefix_infos: List[PrefixInfo]) -> List[Panel]: + """Sort and group prefix information panels by status category.""" + status_groups: Dict[str, List[Panel]] = {"success": [], "warning": [], "error": [], "danger": [], "info": []} + + sorted_infos = sorted(prefix_infos, key=lambda x: (-int(x.prefix.split("/")[1]), x.categoryOverall)) + + for info in sorted_infos: + panel = await self.create_prefix_panel(info) + status_groups[info.categoryOverall].append(panel) + + return [ + panel for status in ["success", "warning", "error", "danger", "info"] for panel in status_groups[status] + ] + + async def add_basic_info(self, table: Table, info: PrefixInfo) -> None: + """Add basic prefix information to the display table.""" + table.add_row("Prefix", info.prefix) + table.add_row("RIR", info.rir) + + async def add_bgp_origins(self, table: Table, info: PrefixInfo) -> None: + """Add BGP origin information to the display table.""" + bgp_origins = ", ".join(f"AS{asn}" for asn in info.bgpOrigins) if info.bgpOrigins else "None" + table.add_row("BGP Origins", bgp_origins) + + async def add_rpki_routes(self, table: Table, info: PrefixInfo) -> None: + """Add RPKI route information to the display table.""" + if info.rpkiRoutes: + rpki_rows = [ + f"AS{route.asn} (MaxLength: {route.rpkiMaxLength}, Status: {route.rpkiStatus})" + for route in info.rpkiRoutes + ] + table.add_row("RPKI Routes", "\n".join(rpki_rows)) + + async def add_irr_routes(self, table: Table, info: PrefixInfo) -> None: + """Add IRR route information to the display table.""" + if info.irrRoutes: + irr_rows = [ + f"{db}: AS{route.asn} ({route.rpkiStatus})" for db, routes in info.irrRoutes.items() for route in routes + ] + if irr_rows: + table.add_row("IRR Routes", "\n".join(irr_rows)) + + async def add_status(self, table: Table, info: PrefixInfo) -> None: + """Add status information to the display table.""" + status_style = await self.get_status_style(info.categoryOverall) + table.add_row("Status", Text(info.categoryOverall, style=status_style)) + + async def add_messages(self, table: Table, info: PrefixInfo) -> None: + """Add message information to the display table.""" + if info.messages: + messages = "\n".join(f"• {msg.text}" for msg in info.messages) + table.add_row("Messages", messages) + + async def get_status_style(self, category: str) -> str: + """Get Rich style color based on status category.""" + return {"success": "green", "warning": "yellow", "error": "red", "danger": "red", "info": "blue"}.get( + category, "white" + ) + + async def display_prefix_info(self, direct_overlaps: List[PrefixInfo]) -> None: + """Display prefix information in Rich panels.""" + await self.display_direct_overlaps(direct_overlaps) + least_specific = await find_least_specific_prefix(direct_overlaps) + if least_specific: + await self.display_all_overlaps(least_specific) + + async def display_direct_overlaps(self, direct_overlaps: List[PrefixInfo]) -> None: + """Display directly overlapping prefixes.""" + direct_panels = await self.sort_and_group_panels(direct_overlaps) + direct_columns = Columns(direct_panels, equal=True, expand=True) + self.console.print( + Panel( + direct_columns, + title=f"[bold]Directly overlapping prefixes of {direct_overlaps[0].prefix}[/bold]", + expand=False, + ) + ) + + async def display_all_overlaps(self, least_specific: str) -> None: + """Display all overlaps for least specific prefix.""" + try: + explorer = IrrExplorer() + all_overlaps = await explorer.fetch_prefix_info(least_specific) + all_panels = await self.sort_and_group_panels(all_overlaps) + all_columns = Columns(all_panels, equal=True, expand=True) + + self.console.print("\n") + self.console.print( + Panel( + all_columns, + title=f"[bold]All overlaps of least specific match {least_specific}[/bold]", + expand=False, + ) + ) + await explorer.close() + except (httpx.HTTPError, ValueError, RuntimeError) as e: + self.console.print(f"[red]Error fetching overlaps for {least_specific}: {str(e)}[/red]") + finally: + await explorer.close() + + def get_rpki_status(self, prefix: Dict[str, Any]) -> str: + """Extract RPKI status from prefix info.""" + if prefix.get("rpkiRoutes"): + return cast(str, prefix["rpkiRoutes"][0]["rpkiStatus"]) + return "UNKNOWN" + + async def display_asn_info( + self, data: Dict[str, Any], asn: str, sets_data: Optional[Dict[str, Any]] = None + ) -> None: + """Display AS query results with rich formatting.""" + await self.display_direct_origins(data, asn) + await self.display_overlaps(data, asn) + self.display_as_sets(sets_data, asn) + + async def display_direct_origins(self, data: Dict[str, Any], asn: str) -> None: + """Display direct origin prefixes.""" + if data.get("directOrigin"): + direct_infos = [PrefixInfo(**prefix) for prefix in data["directOrigin"]] + direct_panels = await self.sort_and_group_panels(direct_infos) + direct_columns = Columns(direct_panels, equal=True, expand=True) + self.console.print( + Panel( + direct_columns, + title=f"[bold]Prefixes directly originated by {asn}[/bold]", + expand=False, + ) + ) + + async def display_overlaps(self, data: Dict[str, Any], asn: str) -> None: + """Display overlapping prefixes.""" + if data.get("overlaps"): + overlap_infos = [PrefixInfo(**prefix) for prefix in data["overlaps"]] + overlap_panels = await self.sort_and_group_panels(overlap_infos) + if overlap_panels: + self.console.print("\n") + overlap_columns = Columns(overlap_panels, equal=True, expand=True) + self.console.print( + Panel( + overlap_columns, + title=f"[bold]Overlapping prefixes related to {asn}[/bold]", + expand=False, + ) + ) + self.console.print("\n") + + def display_as_sets(self, sets_data: Optional[Dict[str, Any]], asn: str) -> None: + """Display AS sets information.""" + if sets_data and sets_data.get("setsPerIrr"): + sets_panels = [] + for irr, sets in sets_data["setsPerIrr"].items(): + table = Table(show_header=False, expand=True) + for as_set in sets: + table.add_row(as_set) + panel = Panel(table, title=f"[bold]{irr}[/bold]", border_style="blue") + sets_panels.append(panel) + + sets_columns = Columns(sets_panels, equal=True, expand=True) + self.console.print( + Panel( + sets_columns, + title=f"[bold]AS Sets including {asn}[/bold]", + expand=False, + ) + ) diff --git a/irrexplorer_cli/main.py b/irrexplorer_cli/main.py new file mode 100644 index 0000000..1e571e4 --- /dev/null +++ b/irrexplorer_cli/main.py @@ -0,0 +1,78 @@ +"""Command-line interface for IRR Explorer queries.""" + +import asyncio +from importlib.metadata import version + +import typer +from click import Context +from rich.console import Console + +from irrexplorer_cli.helpers import validate_asn_format, validate_prefix_format +from irrexplorer_cli.queries import async_asn_query, async_prefix_query + +__version__ = version("irrexplorer-cli") + +CTX_OPTION = typer.Option(None, hidden=True) +app = typer.Typer( + help="CLI tool to query IRR Explorer data for prefix information", + no_args_is_help=True, + context_settings={"help_option_names": ["-h", "--help"]}, +) +console = Console() + + +def version_display(display_version: bool) -> None: + """Display version information and exit.""" + if display_version: + print(f"[bold]IRR Explorer CLI[/bold] version: {__version__}") + raise typer.Exit() + + +@app.callback() +def callback( + _: bool = typer.Option(None, "--version", "-v", callback=version_display, is_eager=True), +) -> None: + """Query IRR Explorer for prefix information.""" + + +@app.command(no_args_is_help=True) +def prefix( + prefix_query: str = typer.Argument(None, help="Prefix to query (e.g., 193.0.0.0/21)"), + output_format: str = typer.Option(None, "--format", "-f", help="Output format (json or csv)"), + ctx: Context = CTX_OPTION, +) -> None: + """Query IRR Explorer for prefix information.""" + if not prefix_query: + if ctx: + typer.echo(ctx.get_help()) + raise typer.Exit() + + if not validate_prefix_format(prefix_query): + typer.echo(f"Error: Invalid prefix format: {prefix_query}") + raise typer.Exit(1) + + asyncio.run(async_prefix_query(prefix_query, output_format)) + + +@app.command() +def asn( + asn_query: str = typer.Argument(None, help="AS number to query (e.g., AS2111, as2111, or 2111)"), + output_format: str = typer.Option(None, "--format", "-f", help="Output format (json or csv)"), + ctx: Context = CTX_OPTION, +) -> None: + """Query IRR Explorer for AS number information.""" + if not asn_query: + if ctx: + typer.echo(ctx.get_help()) + raise typer.Exit() + + if not validate_asn_format(asn_query): + typer.echo(f"Error: Invalid ASN format: {asn_query}") + raise typer.Exit(1) + + if not asn_query.upper().startswith("AS"): + asn_query = f"AS{asn_query}" + else: + asn_query = f"AS{asn_query[2:]}" + + asyncio.run(async_asn_query(asn_query, output_format)) diff --git a/irrexplorer_cli/models.py b/irrexplorer_cli/models.py new file mode 100644 index 0000000..f4b9091 --- /dev/null +++ b/irrexplorer_cli/models.py @@ -0,0 +1,76 @@ +"""Data models for IRR Explorer API responses.""" + +from typing import Dict, List, Optional + +from pydantic import BaseModel + + +class BaseRoute(BaseModel): + """Base model for route information.""" + + rpkiStatus: str + rpkiMaxLength: Optional[int] + asn: int + rpslText: str + rpslPk: str + + +class RpkiRoute(BaseRoute): + """RPKI route information model.""" + + +class IrrRoute(BaseRoute): + """IRR route information model.""" + + +class Message(BaseModel): + """Message model for API responses.""" + + text: str + category: str + + +class PrefixInfo(BaseModel): + """Prefix information model containing route and status details.""" + + prefix: str + rir: str + bgpOrigins: List[int] + rpkiRoutes: List[RpkiRoute] + irrRoutes: Dict[str, List[IrrRoute]] + categoryOverall: str + messages: List[Message] + prefixSortKey: str + goodnessOverall: int + + +class AsResponse(BaseModel): + """Response model for AS queries.""" + + directOrigin: List[PrefixInfo] + overlaps: List[PrefixInfo] + + +class PrefixResult(BaseModel): + """Prefix query result information.""" + + prefix: str + categoryOverall: str + rir: str + rpkiRoutes: List[RpkiRoute] + bgpOrigins: List[int] + irrRoutes: Dict[str, List[IrrRoute]] + messages: List[Message] + + +class AsSets(BaseModel): + """AS Sets information.""" + + setsPerIrr: Dict[str, List[str]] + + +class AsnResult(BaseModel): + """ASN query result information.""" + + directOrigin: List[PrefixResult] + overlaps: List[PrefixResult] diff --git a/irrexplorer_cli/py.typed b/irrexplorer_cli/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/irrexplorer_cli/queries.py b/irrexplorer_cli/queries.py new file mode 100644 index 0000000..f239ca3 --- /dev/null +++ b/irrexplorer_cli/queries.py @@ -0,0 +1,73 @@ +"""Query functions for the CLI tool.""" + +import json +from typing import Optional + +import httpx + +from irrexplorer_cli.helpers import ( + find_least_specific_prefix, + format_as_sets, + format_direct_origins, + format_overlapping_prefixes, + format_prefix_result, +) +from irrexplorer_cli.irrexplorer import IrrDisplay, IrrExplorer + + +async def process_overlaps(explorer: IrrExplorer, least_specific: str) -> None: + """Process and print overlapping prefixes.""" + try: + all_overlaps = await explorer.fetch_prefix_info(least_specific) + for result in all_overlaps: + print(format_prefix_result(result, "OVERLAP")) + except (httpx.HTTPError, ValueError, RuntimeError): + pass + + +async def async_prefix_query(pfx: str, output_format: Optional[str] = None) -> None: + """Execute asynchronous prefix query and display results.""" + explorer = IrrExplorer() + display = IrrDisplay() + try: + direct_overlaps = await explorer.fetch_prefix_info(pfx) + + if output_format == "json": + json_data = [result.model_dump() for result in direct_overlaps] + print(json.dumps(json_data, indent=2)) + elif output_format == "csv": + print("Type,Prefix,Category,RIR,RPKI_Status,BGP_Origins,IRR_Routes,Messages") + + for result in direct_overlaps: + print(format_prefix_result(result, "DIRECT")) + + least_specific = await find_least_specific_prefix(direct_overlaps) + if least_specific: + await process_overlaps(explorer, least_specific) + else: + await display.display_prefix_info(direct_overlaps) + finally: + await explorer.close() + + +async def async_asn_query(as_number: str, output_format: Optional[str] = None) -> None: + """Execute asynchronous ASN query and display results.""" + explorer = IrrExplorer() + display = IrrDisplay() + try: + results = await explorer.fetch_asn_info(as_number) + sets_data = await explorer.fetch_asn_sets(as_number) + + if output_format == "json": + combined_data = {"asn_info": results, "as_sets": sets_data} + print(json.dumps(combined_data, indent=2), end="\n") + elif output_format == "csv": + print("Type,ASN,Prefix,Category,RIR,RPKI_Status,BGP_Origins,IRR_Routes,Messages", end="") + format_direct_origins(as_number, results) + format_overlapping_prefixes(as_number, results) + format_as_sets(as_number, sets_data) + print() + else: + await display.display_asn_info(results, as_number, sets_data) + finally: + await explorer.close() diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..2560c6e --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,73 @@ +[project] +name = "irrexplorer-cli" +version = "0.0.1" +description = "A command-line interface to query and explore IRR & BGP data from IRRexplorer.net" +authors = [{name = "kiraum", email = "tfgoncalves@xpto.it" }] +dependencies = [ + "httpx", + "rich", + "typer", + "pydantic", + "backoff", + "asyncio" +] +requires-python = ">=3.13" + +[project.optional-dependencies] +dev = [ + "pytest", + "black", + "ruff", + "mypy", + "pytest-asyncio", + "respx", + "pytest-cov", + "isort", + "pylint", + "pre-commit" +] + +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[tool.hatch.build] +packages = ["irrexplorer_cli"] +include = [ + "irrexplorer_cli/py.typed", +] + +[tool.isort] +profile = "black" +multi_line_output = 3 +line_length = 120 + +[tool.ruff] +line-length = 120 + +[tool.ruff.lint] +select = ["E", "F", "I", "B"] + +[tool.mypy] +python_version = "3.13" +strict = true +warn_return_any = true +warn_unused_configs = true +disallow_untyped_defs = true +check_untyped_defs = true + +[tool.black] +line-length = 120 + +[project.scripts] +irrexplorer = "irrexplorer_cli.main:app" + +[tool.pytest.ini_options] +asyncio_mode = "auto" +asyncio_default_fixture_loop_scope = "function" + +[tool.pylint] +max-line-length = 120 +disable = [] +enable = "all" +good-names = ["i", "j", "k", "ex", "Run", "_", "id", "f"] diff --git a/requirements.lock b/requirements.lock new file mode 100644 index 0000000..9bcbf66 --- /dev/null +++ b/requirements.lock @@ -0,0 +1,124 @@ +# This file was autogenerated by uv via the following command: +# uv pip compile --extra dev pyproject.toml -o requirements.lock +annotated-types==0.7.0 + # via pydantic +anyio==4.7.0 + # via httpx +astroid==3.3.7 + # via pylint +asyncio==3.4.3 + # via irrexplorer-cli (pyproject.toml) +backoff==2.2.1 + # via irrexplorer-cli (pyproject.toml) +black==24.10.0 + # via irrexplorer-cli (pyproject.toml) +certifi==2024.12.14 + # via + # httpcore + # httpx +cfgv==3.4.0 + # via pre-commit +click==8.1.8 + # via + # black + # typer +coverage==7.6.9 + # via pytest-cov +dill==0.3.9 + # via pylint +distlib==0.3.9 + # via virtualenv +filelock==3.16.1 + # via virtualenv +h11==0.14.0 + # via httpcore +httpcore==1.0.7 + # via httpx +httpx==0.28.1 + # via + # irrexplorer-cli (pyproject.toml) + # respx +identify==2.6.3 + # via pre-commit +idna==3.10 + # via + # anyio + # httpx +iniconfig==2.0.0 + # via pytest +isort==5.13.2 + # via + # irrexplorer-cli (pyproject.toml) + # pylint +markdown-it-py==3.0.0 + # via rich +mccabe==0.7.0 + # via pylint +mdurl==0.1.2 + # via markdown-it-py +mypy==1.14.0 + # via irrexplorer-cli (pyproject.toml) +mypy-extensions==1.0.0 + # via + # black + # mypy +nodeenv==1.9.1 + # via pre-commit +packaging==24.2 + # via + # black + # pytest +pathspec==0.12.1 + # via black +platformdirs==4.3.6 + # via + # black + # pylint + # virtualenv +pluggy==1.5.0 + # via pytest +pre-commit==4.0.1 + # via irrexplorer-cli (pyproject.toml) +pydantic==2.10.4 + # via irrexplorer-cli (pyproject.toml) +pydantic-core==2.27.2 + # via pydantic +pygments==2.18.0 + # via rich +pylint==3.3.2 + # via irrexplorer-cli (pyproject.toml) +pytest==8.3.4 + # via + # irrexplorer-cli (pyproject.toml) + # pytest-asyncio + # pytest-cov +pytest-asyncio==0.25.0 + # via irrexplorer-cli (pyproject.toml) +pytest-cov==6.0.0 + # via irrexplorer-cli (pyproject.toml) +pyyaml==6.0.2 + # via pre-commit +respx==0.22.0 + # via irrexplorer-cli (pyproject.toml) +rich==13.9.4 + # via + # irrexplorer-cli (pyproject.toml) + # typer +ruff==0.8.4 + # via irrexplorer-cli (pyproject.toml) +shellingham==1.5.4 + # via typer +sniffio==1.3.1 + # via anyio +tomlkit==0.13.2 + # via pylint +typer==0.15.1 + # via irrexplorer-cli (pyproject.toml) +typing-extensions==4.12.2 + # via + # mypy + # pydantic + # pydantic-core + # typer +virtualenv==20.28.0 + # via pre-commit diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/fixtures.py b/tests/fixtures.py new file mode 100644 index 0000000..1996d2c --- /dev/null +++ b/tests/fixtures.py @@ -0,0 +1,46 @@ +""" Fixtures for tests """ + +from typing import Any + +from irrexplorer_cli.models import PrefixInfo + +COMMON_SETS_DATA = {"setsPerIrr": {"RIPE": ["AS-TEST"], "ARIN": ["AS-TEST2"]}} +COMMON_RPKI_ROUTE = { + "rpkiStatus": "VALID", + "rpkiMaxLength": 24, + "asn": 12345, + "rpslText": "route: 192.0.2.0/24", + "rpslPk": "192.0.2.0/24AS12345/ML24", +} + +COMMON_PREFIX_INFO = { + "prefix": "192.0.2.0/24", + "categoryOverall": "success", + "rir": "RIPE", + "rpkiRoutes": [COMMON_RPKI_ROUTE], + "bgpOrigins": [12345], + "irrRoutes": {}, + "messages": [], + "prefixSortKey": "1", + "goodnessOverall": 1, +} + +COMMON_ASN_DATA = { + "directOrigin": [COMMON_PREFIX_INFO], + "overlaps": [], +} + +COMMON_SETS_DATA = { + "setsPerIrr": { + "RIPE": ["AS-TEST1", "AS-TEST2"], + "ARIN": ["AS-TEST3"], + "AFRINIC": ["AS-TEST4", "AS-TEST5", "AS-TEST6"], + } +} + + +def create_basic_prefix_info(**kwargs: Any) -> PrefixInfo: + """Create a basic PrefixInfo object.""" + data = COMMON_PREFIX_INFO.copy() + data.update(kwargs) + return PrefixInfo.model_validate(data) diff --git a/tests/test_cli.py b/tests/test_cli.py new file mode 100644 index 0000000..7ac9213 --- /dev/null +++ b/tests/test_cli.py @@ -0,0 +1,111 @@ +"""Test suite for CLI functionality.""" + +from unittest.mock import patch + +import httpx +import pytest +from typer.testing import CliRunner + +from irrexplorer_cli.irrexplorer import IrrExplorer +from irrexplorer_cli.main import app + +runner = CliRunner() + + +def test_query_command_http_error() -> None: + """Test HTTP error handling in query command.""" + with patch("irrexplorer_cli.irrexplorer.IrrExplorer.fetch_prefix_info") as mock_fetch: + mock_fetch.side_effect = httpx.HTTPError("Test error") + result = runner.invoke(app, ["prefix", "5.57.21.0/24"], catch_exceptions=True) + assert result.exit_code + + +@pytest.mark.asyncio +async def test_explorer_close_error() -> None: + """Test error handling during explorer client closure.""" + explorer = IrrExplorer() + with patch.object(explorer.client, "aclose") as mock_close: + mock_close.side_effect = RuntimeError("Close error") + try: + await explorer.close() + except RuntimeError as e: + assert str(e) == "Close error" + + +def test_callback_without_version() -> None: + """Test callback execution without version flag.""" + result = runner.invoke(app) + assert not result.exit_code + + +def test_query_empty_prefix() -> None: + """Test query command with empty prefix""" + result = runner.invoke(app, ["prefix", ""]) + assert not result.exit_code + + +def test_asn_command() -> None: + """Test ASN query command.""" + with patch("irrexplorer_cli.irrexplorer.IrrExplorer.fetch_asn_info") as mock_fetch: + mock_fetch.return_value = {"directOrigin": [], "overlaps": []} + result = runner.invoke(app, ["asn", "AS202196"]) + assert not result.exit_code + + +def test_json_output_format() -> None: + """Test JSON output format.""" + result = runner.invoke(app, ["prefix", "5.57.21.0/24", "--format", "json"]) + assert not result.exit_code + + +def test_csv_output_format() -> None: + """Test CSV output format.""" + result = runner.invoke(app, ["prefix", "5.57.21.0/24", "--format", "csv"]) + assert not result.exit_code + + +def test_query_command() -> None: + """Test prefix query command execution.""" + result = runner.invoke(app, ["prefix", "5.57.21.0/24"]) + assert not result.exit_code + + +def test_query_command_invalid_prefix() -> None: + """Test query command with invalid prefix.""" + result = runner.invoke(app, ["prefix", "invalid"]) + assert "Error: Invalid prefix format" in result.stdout + + +def test_version_command() -> None: + """Test version command output.""" + result = runner.invoke(app, ["--version"]) + assert not result.exit_code + assert "IRR Explorer CLI" in result.stdout + + +def test_help_command() -> None: + """Test help command display.""" + result = runner.invoke(app, ["--help"]) + assert not result.exit_code + assert "Usage" in result.stdout + assert "Options" in result.stdout + + +def test_query_without_prefix() -> None: + """Test query command without prefix argument.""" + result = runner.invoke(app, ["query"]) + assert "Usage" in result.stdout + + +def test_format_option() -> None: + """Test format option handling.""" + result = runner.invoke(app, ["prefix", "5.57.21.0/24", "--format", "json"]) + assert not result.exit_code + + +def test_asn_query_command() -> None: + """Test ASN query command execution.""" + with patch("irrexplorer_cli.main.async_asn_query", return_value=None) as mock_query: + result = runner.invoke(app, ["asn", "AS202196"]) + assert not result.exit_code + mock_query.assert_called_once_with("AS202196", None) diff --git a/tests/test_helpers.py b/tests/test_helpers.py new file mode 100644 index 0000000..9886b1b --- /dev/null +++ b/tests/test_helpers.py @@ -0,0 +1,144 @@ +"""Test suite for helper functions.""" + +from typing import Any, Dict, List + +import pytest + +from irrexplorer_cli.helpers import ( + find_least_specific_prefix, + format_as_sets, + format_direct_origins, + format_overlapping_prefixes, + format_prefix_result, + validate_asn_format, + validate_prefix_format, +) +from tests.fixtures import ( + COMMON_ASN_DATA, + COMMON_PREFIX_INFO, + COMMON_RPKI_ROUTE, + COMMON_SETS_DATA, + create_basic_prefix_info, +) + + +def test_validate_prefix_format() -> None: + """Test prefix format validation.""" + assert validate_prefix_format("192.0.2.0/24") is True + assert validate_prefix_format("2001:db8::/32") is True + assert validate_prefix_format("invalid") is False + assert validate_prefix_format("256.256.256.256/24") is False + + +def test_validate_asn_format() -> None: + """Test ASN format validation.""" + assert validate_asn_format("AS12345") is True + assert validate_asn_format("12345") is True + assert validate_asn_format("as12345") is True + assert validate_asn_format("invalid") is False + assert validate_asn_format("AS4294967296") is False # Too large + + +def test_format_prefix_result() -> None: + """Test prefix result formatting.""" + prefix_info = create_basic_prefix_info( + messages=[{"category": "info", "text": "Test message"}], irrRoutes={"RIPE": []} + ) + + result = format_prefix_result(prefix_info, "DIRECT") + assert isinstance(result, str) + assert "192.0.2.0/24" in result + assert "VALID" in result + + +@pytest.mark.asyncio +async def test_find_least_specific_prefix() -> None: + """Test finding least specific prefix.""" + overlaps = [ + create_basic_prefix_info(prefix="192.0.2.0/24"), + create_basic_prefix_info(prefix="192.0.2.0/23"), + create_basic_prefix_info(prefix="192.0.2.0/25"), + ] + result = await find_least_specific_prefix(overlaps) + assert result == "192.0.2.0/23" + + +def test_format_as_sets() -> None: + """Test AS sets formatting.""" + sets_data = {"setsPerIrr": {"RIPE": ["AS-TEST1", "AS-TEST2"]}} + format_as_sets("AS12345", sets_data) # Test output formatting + + +def test_format_direct_origins_with_empty_results() -> None: + """Test direct origins formatting with empty results.""" + empty_results: Dict[str, List[Any]] = {"directOrigin": []} + format_direct_origins("AS12345", empty_results) + + +def test_format_overlapping_prefixes_with_complex_data() -> None: + """Test overlapping prefixes with multiple IRR routes.""" + complex_prefix_info = COMMON_PREFIX_INFO.copy() + complex_prefix_info.update( + { + "bgpOrigins": [12345, 67890], + "irrRoutes": {"RIPE": [COMMON_RPKI_ROUTE]}, + "messages": [ + {"category": "info", "text": "Test message 1"}, + {"category": "warning", "text": "Test message 2"}, + ], + } + ) + + complex_data = {"overlaps": [complex_prefix_info]} + + format_overlapping_prefixes("AS12345", complex_data) + + +@pytest.mark.asyncio +async def test_find_least_specific_prefix_empty() -> None: + """Test finding least specific prefix with empty list.""" + result = await find_least_specific_prefix([]) + assert result is None + + +def test_format_direct_origins_with_multiple_routes() -> None: + """Test direct origins formatting with multiple routes.""" + multi_route_prefix = COMMON_PREFIX_INFO.copy() + multi_route_prefix.update( + { + "rpkiRoutes": [], # Test no RPKI routes case + "bgpOrigins": [12345, 67890], + "irrRoutes": { + "RIPE": [COMMON_RPKI_ROUTE], + "ARIN": [ + { + "rpkiStatus": "INVALID", + "rpkiMaxLength": 24, + "asn": 67890, + "rpslText": "route: 192.0.2.0/24", + "rpslPk": "192.0.2.0/24AS67890/ML24", + } + ], + }, + } + ) + + test_data = {"directOrigin": [multi_route_prefix]} + + format_direct_origins("AS12345", test_data) + + +def test_format_as_sets_with_multiple_irrs() -> None: + """Test AS sets formatting with multiple IRR databases.""" + format_as_sets("AS12345", COMMON_SETS_DATA) + + +def test_format_as_sets_empty() -> None: + """Test AS sets formatting with empty data.""" + format_as_sets("AS12345", {"setsPerIrr": {}}) + format_as_sets("AS12345", {}) + + +def test_format_direct_origins_with_rpki_routes() -> None: + """Test direct origins formatting with RPKI routes.""" + format_direct_origins("AS12345", COMMON_ASN_DATA) diff --git a/tests/test_irrexplorer.py b/tests/test_irrexplorer.py new file mode 100644 index 0000000..cb5aa23 --- /dev/null +++ b/tests/test_irrexplorer.py @@ -0,0 +1,199 @@ +"""Test suite for IRR Explorer core functionality.""" + +from typing import Any +from unittest.mock import AsyncMock, patch + +import httpx +import pytest + +from irrexplorer_cli.irrexplorer import IrrDisplay, IrrExplorer +from irrexplorer_cli.models import PrefixInfo +from tests.fixtures import COMMON_ASN_DATA, COMMON_PREFIX_INFO, COMMON_SETS_DATA, create_basic_prefix_info + + +@pytest.mark.asyncio +async def test_fetch_prefix_info() -> None: + """Test successful prefix information retrieval.""" + explorer = IrrExplorer() + result = await explorer.fetch_prefix_info("5.57.21.0/24") + assert isinstance(result, list) + assert all(isinstance(item, PrefixInfo) for item in result) + await explorer.close() + + +@pytest.mark.asyncio +async def test_display_prefix_info() -> None: + """Test prefix information display formatting.""" + display = IrrDisplay() + test_data = [create_basic_prefix_info(prefix="5.57.21.0/24", rpkiRoutes=[])] + await display.display_prefix_info(test_data) + + +@pytest.mark.asyncio +async def test_create_prefix_panel() -> None: + """Test panel creation with prefix information.""" + display = IrrDisplay() + test_info = create_basic_prefix_info(prefix="5.57.21.0/24", rpkiRoutes=[]) + panel = await display.create_prefix_panel(test_info) + assert panel is not None + + +@pytest.mark.asyncio +async def test_display_prefix_info_least_specific_error() -> None: + """Test error handling for least specific prefix display.""" + display = IrrDisplay() + test_data = [create_basic_prefix_info(prefix="5.57.0.0/16", rpkiRoutes=[])] + + with patch("irrexplorer_cli.irrexplorer.IrrExplorer") as mock_explorer_class: + mock_instance = mock_explorer_class.return_value + mock_instance.fetch_prefix_info = AsyncMock(side_effect=ValueError("Test error")) + mock_instance.close = AsyncMock() + + with patch.object(display.console, "print") as mock_print: + await display.display_prefix_info(test_data) + mock_print.assert_called_with("[red]Error fetching overlaps for 5.57.0.0/16: Test error[/red]") + + +@pytest.mark.asyncio +async def test_fetch_asn_sets() -> None: + """Test AS sets information retrieval.""" + explorer = IrrExplorer() + result = await explorer.fetch_asn_sets("AS202196") + assert isinstance(result, dict) + await explorer.close() + + +@pytest.mark.asyncio +async def test_display_status_categories() -> None: + """Test status category display styles.""" + display = IrrDisplay() + styles = { + "success": await display.get_status_style("success"), + "warning": await display.get_status_style("warning"), + "error": await display.get_status_style("error"), + } + assert all(isinstance(style, str) for style in styles.values()) + + +@pytest.mark.asyncio +async def test_asn_info_display() -> None: + """Test ASN info display with complete data.""" + display = IrrDisplay() + await display.display_asn_info(COMMON_ASN_DATA, "AS12345", COMMON_SETS_DATA) + + +@pytest.mark.asyncio +async def test_display_overlaps() -> None: + """Test overlaps display functionality.""" + display = IrrDisplay() + + # Create data using the fixture but override specific fields + overlap_info = COMMON_PREFIX_INFO.copy() + overlap_info.update({"categoryOverall": "warning", "rpkiRoutes": [], "irrRoutes": {"RIPE": []}}) + + data = {"overlaps": [overlap_info]} + + await display.display_overlaps(data, "AS12345") + + +@pytest.mark.asyncio +async def test_sort_and_group_panels() -> None: + """Test panel sorting and grouping.""" + display = IrrDisplay() + + prefix_infos = [ + create_basic_prefix_info(prefix="192.0.2.0/24", categoryOverall="success", rpkiRoutes=[]), + create_basic_prefix_info(prefix="192.0.2.0/25", categoryOverall="warning", rpkiRoutes=[]), + ] + + panels = await display.sort_and_group_panels(prefix_infos) + assert len(panels) > 0 + + +@pytest.mark.asyncio +async def test_fetch_asn_info_error() -> None: + """Test ASN info fetch error handling.""" + explorer = IrrExplorer() + with patch("httpx.AsyncClient.get") as mock_get: + mock_get.side_effect = httpx.RequestError("Connection error") + with pytest.raises(httpx.RequestError): + await explorer.fetch_asn_info("AS12345") + await explorer.close() + + +@pytest.mark.asyncio +async def test_display_direct_origins_with_errors() -> None: + """Test direct origins display with error conditions.""" + display = IrrDisplay() + + error_prefix = COMMON_PREFIX_INFO.copy() + error_prefix.update( + { + "categoryOverall": "error", + "rpkiRoutes": [], + "bgpOrigins": [], + "messages": [{"category": "error", "text": "Test error"}], + "goodnessOverall": 0, + } + ) + + data = {"directOrigin": [error_prefix]} + + await display.display_direct_origins(data, "AS12345") + + +@pytest.mark.asyncio +async def test_display_all_overlaps_error() -> None: + """Test all overlaps display with error handling.""" + display = IrrDisplay() + with patch("irrexplorer_cli.irrexplorer.IrrExplorer.fetch_prefix_info") as mock_fetch: + mock_fetch.side_effect = httpx.HTTPError("Test error") + await display.display_all_overlaps("192.0.2.0/24") + + +@pytest.mark.asyncio +async def test_fetch_asn_info_with_backoff() -> None: + """Test ASN info fetch with backoff retry.""" + explorer = IrrExplorer() + mock_request = httpx.Request("GET", "https://irrexplorer.nlnog.net/api/prefixes/asn/AS12345") + mock_response = httpx.Response(200, json={}, request=mock_request) + + with patch("httpx.AsyncClient.get") as mock_get: + mock_get.side_effect = [ + httpx.RequestError("Timeout"), # First attempt fails + mock_response, # Second attempt succeeds + ] + result = await explorer.fetch_asn_info("AS12345") + assert isinstance(result, dict) + await explorer.close() + + +@pytest.mark.asyncio +async def test_display_prefix_info_with_overlaps() -> None: + """Test prefix info display with overlapping prefixes.""" + display = IrrDisplay() + test_data = [ + create_basic_prefix_info(prefix="192.0.2.0/16", categoryOverall="info", rpkiRoutes=[]), + create_basic_prefix_info(prefix="192.0.2.0/24", categoryOverall="danger", rpkiRoutes=[]), + ] + await display.display_prefix_info(test_data) + + +def test_get_rpki_status() -> None: + """Test RPKI status extraction from prefix info.""" + display = IrrDisplay() + + # Test with RPKI routes present + prefix_with_routes = {"rpkiRoutes": [{"rpkiStatus": "VALID"}]} + + # Test with RPKI routes present + prefix_with_routes = {"rpkiRoutes": [{"rpkiStatus": "VALID"}]} + assert display.get_rpki_status(prefix_with_routes) == "VALID" + + # Test with no RPKI routes + prefix_without_routes: dict[str, list[Any]] = {"rpkiRoutes": []} + assert display.get_rpki_status(prefix_without_routes) == "UNKNOWN" + + # Test with missing rpkiRoutes key + prefix_missing_routes: dict[str, Any] = {} + assert display.get_rpki_status(prefix_missing_routes) == "UNKNOWN" diff --git a/tests/test_main.py b/tests/test_main.py new file mode 100644 index 0000000..09b1476 --- /dev/null +++ b/tests/test_main.py @@ -0,0 +1,78 @@ +"""Test cases for main module.""" + +from unittest.mock import patch + +import pytest +import typer +from click.core import Command, Context +from typer.testing import CliRunner + +from irrexplorer_cli.main import app, asn, prefix + +runner = CliRunner() + + +def test_exit_with_version() -> None: + """Test exit with version flag.""" + result = runner.invoke(app, ["--version"]) + assert not result.exit_code + + +def test_exit_with_help() -> None: + """Test exit with help display.""" + result = runner.invoke(app, ["prefix"]) + assert "Usage" in result.stdout + assert not result.exit_code + + +def test_prefix_command_without_context() -> None: + """Test prefix command without context.""" + with patch("typer.Context", return_value=None): + result = runner.invoke(app, ["prefix", ""]) + assert not result.exit_code + + +def test_invalid_asn_format() -> None: + """Test exit with invalid ASN format.""" + result = runner.invoke(app, ["asn", "invalid-asn"]) + assert "Error: Invalid ASN format" in result.stdout + assert result.exit_code == 1 + + +def test_asn_format_conversion() -> None: + """Test ASN format conversion.""" + result = runner.invoke(app, ["asn", "12345"]) + assert not result.exit_code + + +def test_prefix_command_with_context() -> None: + """Test prefix command with context.""" + result = runner.invoke(app, ["prefix"]) + assert not result.exit_code + assert "Usage" in result.stdout + + +def test_asn_command_with_context() -> None: + """Test ASN command with context.""" + result = runner.invoke(app, ["asn"]) + assert not result.exit_code + assert "Usage" in result.stdout + + +def test_asn_command_without_context() -> None: + """Test ASN command without context.""" + with patch("typer.Context", return_value=None): + result = runner.invoke(app, ["asn", ""]) + assert not result.exit_code + + +def test_prefix_direct_no_args() -> None: + """Test prefix function directly with no arguments.""" + with pytest.raises(typer.Exit): + prefix("", "", Context(Command("test"))) + + +def test_asn_direct_no_args() -> None: + """Test asn function directly with no arguments.""" + with pytest.raises(typer.Exit): + asn("", "", Context(Command("test"))) diff --git a/tests/test_models.py b/tests/test_models.py new file mode 100644 index 0000000..d359124 --- /dev/null +++ b/tests/test_models.py @@ -0,0 +1,50 @@ +"""Test suite for data models.""" + +from typing import Dict, List + +from irrexplorer_cli.models import AsResponse, AsSets, IrrRoute, PrefixInfo +from tests.fixtures import COMMON_PREFIX_INFO, COMMON_RPKI_ROUTE + + +def test_prefix_info_model() -> None: + """Test PrefixInfo model validation and instantiation.""" + data = COMMON_PREFIX_INFO.copy() + data.update( + { + "rir": "RIPE NCC", + "bgpOrigins": [202196], + "categoryOverall": "warning", + "messages": [{"category": "warning", "text": "Test message"}], + } + ) + + prefix_info = PrefixInfo.model_validate(data) + assert prefix_info.prefix == "192.0.2.0/24" # Updated to match fixture + assert prefix_info.rir == "RIPE NCC" + assert prefix_info.bgpOrigins == [202196] + assert prefix_info.categoryOverall == "warning" + assert len(prefix_info.rpkiRoutes) == 1 + assert len(prefix_info.messages) == 1 + assert prefix_info.irrRoutes == {} + + +def test_irr_route_model() -> None: + """Test IrrRoute model validation.""" + irr_route = IrrRoute.model_validate(COMMON_RPKI_ROUTE) + assert irr_route.rpkiStatus == "VALID" + assert irr_route.asn == 12345 + + +def test_as_response_model() -> None: + """Test AsResponse model validation.""" + data: Dict[str, List[PrefixInfo]] = {"directOrigin": [], "overlaps": []} + response = AsResponse.model_validate(data) + assert isinstance(response.directOrigin, list) + assert isinstance(response.overlaps, list) + + +def test_as_sets_model() -> None: + """Test AsSets model validation.""" + data: Dict[str, Dict[str, List[str]]] = {"setsPerIrr": {"RIPE": ["AS-TEST"]}} + sets = AsSets.model_validate(data) + assert "RIPE" in sets.setsPerIrr diff --git a/tests/test_queries.py b/tests/test_queries.py new file mode 100644 index 0000000..3a8de7f --- /dev/null +++ b/tests/test_queries.py @@ -0,0 +1,50 @@ +""" Tests for the queries module. """ + +import json +from typing import Any, Dict, List +from unittest.mock import patch + +import httpx +import pytest + +from irrexplorer_cli.irrexplorer import IrrExplorer +from irrexplorer_cli.queries import async_asn_query, process_overlaps + + +@pytest.mark.asyncio +async def test_asn_query_json_output() -> None: + """Test ASN query with JSON output.""" + mock_results: Dict[str, List[Any]] = {"directOrigin": [], "overlaps": []} + mock_sets: Dict[str, Dict[str, Any]] = {"setsPerIrr": {}} + + with ( + patch("irrexplorer_cli.irrexplorer.IrrExplorer.fetch_asn_info", return_value=mock_results), + patch("irrexplorer_cli.irrexplorer.IrrExplorer.fetch_asn_sets", return_value=mock_sets), + patch("builtins.print") as mock_print, + ): + await async_asn_query("AS12345", "json") + mock_print.assert_called_with(json.dumps({"asn_info": mock_results, "as_sets": mock_sets}, indent=2), end="\n") + + +@pytest.mark.asyncio +async def test_asn_query_csv_output() -> None: + """Test ASN query with CSV output.""" + mock_results: Dict[str, List[Any]] = {"directOrigin": [], "overlaps": []} + mock_sets: Dict[str, Dict[str, Any]] = {"setsPerIrr": {}} + + with ( + patch("irrexplorer_cli.irrexplorer.IrrExplorer.fetch_asn_info", return_value=mock_results), + patch("irrexplorer_cli.irrexplorer.IrrExplorer.fetch_asn_sets", return_value=mock_sets), + patch("builtins.print") as mock_print, + ): + await async_asn_query("AS12345", "csv") + mock_print.assert_any_call("Type,ASN,Prefix,Category,RIR,RPKI_Status,BGP_Origins,IRR_Routes,Messages", end="") + + +@pytest.mark.asyncio +async def test_process_overlaps_error_handling() -> None: + """Test error handling in process_overlaps function.""" + explorer = IrrExplorer() + with patch("irrexplorer_cli.irrexplorer.IrrExplorer.fetch_prefix_info", side_effect=httpx.HTTPError("Test error")): + await process_overlaps(explorer, "192.0.2.0/24") + await explorer.close()