Skip to content

Commit

Permalink
feat(irrexplorer-cli): init
Browse files Browse the repository at this point in the history
  • Loading branch information
kiraum committed Dec 20, 2024
1 parent 3f390db commit 27aad49
Show file tree
Hide file tree
Showing 16 changed files with 801 additions and 1 deletion.
11 changes: 11 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# To get started with Dependabot version updates, you'll need to specify which
# package ecosystems to update and where the package manifests are located.
# Please see the documentation for all configuration options:
# https://docs.github.com/code-security/dependabot/dependabot-version-updates/configuration-options-for-the-dependabot.yml-file

version: 2
updates:
- package-ecosystem: "pip" # See documentation for possible values
directory: "/" # Location of package manifests
schedule:
interval: "weekly"
63 changes: 63 additions & 0 deletions .github/workflows/linter.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
name: Test (linter/formatter)

on: [push]

jobs:
build:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.13"]

steps:
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v3
with:
python-version: ${{ matrix.python-version }}

- name: Install dependencies
run: |
pip install --upgrade uv
uv pip sync --system --break-system-packages requirements.lock
- name: Check YAML files
run: |
pre-commit run check-yaml --all-files
- name: Check file endings
run: |
pre-commit run end-of-file-fixer --all-files
- name: Check trailing whitespace
run: |
pre-commit run trailing-whitespace --all-files
- name: Check TOML files
run: |
pre-commit run check-toml --all-files
- name: Check large files
run: |
pre-commit run check-added-large-files --all-files
- name: Run isort
run: |
isort --check --diff $(git ls-files '*.py')
- name: Run black
run: |
black --check --diff $(git ls-files '*.py')
- name: Run ruff
run: |
ruff check $(git ls-files '*.py')
- name: Run mypy
run: |
mypy $(git ls-files '*.py')
- name: Run pylint
run: |
pylint --rcfile=pyproject.toml $(git ls-files '*.py')
44 changes: 44 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: 'v4.5.0'
hooks:
- id: check-yaml
- id: end-of-file-fixer
- id: trailing-whitespace
- id: check-toml
- id: check-added-large-files

- repo: local
hooks:
- id: isort
name: isort
entry: isort
language: system
types: [python]

- id: black
name: black
entry: black
language: python
types_or: [python, pyi]

- id: ruff
name: ruff
entry: ruff check
language: python
types_or: [python, pyi]

- id: mypy
name: mypy
entry: mypy
language: python
types_or: [python, pyi]
require_serial: true

- id: pylint
name: pylint
entry: pylint
language: system
types: [python]
args:
- --rcfile=pyproject.toml
22 changes: 21 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,21 @@
# irrexplorer-cli
# irrexplorer-cli

Here's how to compile and install the package using uv:

uv pip install --editable ".[dev]"

Copied

Execute

For a production build without development dependencies:

uv pip install .




pytest tests/ -v --cov=irrexplorer_cli


TODO: Update README
Empty file added irrexplorer_cli/__init__.py
Empty file.
161 changes: 161 additions & 0 deletions irrexplorer_cli/irrexplorer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
"""Core functionality for IRR Explorer CLI."""

from typing import Dict, List

import backoff
import httpx
from rich.columns import Columns
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from rich.text import Text

from .models import PrefixInfo


class IrrExplorer:
"""IRR Explorer API client for prefix information retrieval."""

def __init__(self, base_url: str = "https://irrexplorer.nlnog.net") -> None:
"""Initialize IRR Explorer client with base URL."""
self.base_url = base_url
self.client = httpx.AsyncClient()

@backoff.on_exception(backoff.expo, (httpx.HTTPError, httpx.RequestError), max_tries=3, max_time=30)
async def fetch_prefix_info(self, prefix: str) -> List[PrefixInfo]:
"""Fetch prefix information from IRR Explorer API."""
url = f"{self.base_url}/api/prefixes/prefix/{prefix}"
response = await self.client.get(url)
response.raise_for_status()
return [PrefixInfo(**item) for item in response.json()]

async def close(self) -> None:
"""Close the HTTP client connection."""
await self.client.aclose()


class IrrDisplay:
"""Display handler for IRR Explorer prefix information."""

def __init__(self) -> None:
"""Initialize display handler with Rich console."""
self.console = Console()

async def create_prefix_panel(self, info: PrefixInfo) -> Panel:
"""Create Rich panel with prefix information."""
table = Table(show_header=True, header_style="bold cyan", expand=True)
table.add_column("Property")
table.add_column("Value")

# Add basic info asynchronously
await self.add_basic_info(table, info)
await self.add_bgp_origins(table, info)
await self.add_rpki_routes(table, info)
await self.add_irr_routes(table, info)
await self.add_status(table, info)
await self.add_messages(table, info)

return Panel(
table,
title=f"[bold]{info.prefix}[/bold]",
expand=False,
border_style=await self.get_status_style(info.categoryOverall),
)

async def sort_and_group_panels(self, prefix_infos: List[PrefixInfo]) -> List[Panel]:
"""Sort and group prefix information panels by status category."""
status_groups: Dict[str, List[Panel]] = {"success": [], "warning": [], "error": [], "danger": [], "info": []}

sorted_infos = sorted(prefix_infos, key=lambda x: (-int(x.prefix.split("/")[1]), x.categoryOverall))

for info in sorted_infos:
panel = await self.create_prefix_panel(info)
status_groups[info.categoryOverall].append(panel)

return [
panel for status in ["success", "warning", "error", "danger", "info"] for panel in status_groups[status]
]

async def add_basic_info(self, table: Table, info: PrefixInfo) -> None:
"""Add basic prefix information to the display table."""
table.add_row("Prefix", info.prefix)
table.add_row("RIR", info.rir)

async def add_bgp_origins(self, table: Table, info: PrefixInfo) -> None:
"""Add BGP origin information to the display table."""
bgp_origins = ", ".join(f"AS{asn}" for asn in info.bgpOrigins) if info.bgpOrigins else "None"
table.add_row("BGP Origins", bgp_origins)

async def add_rpki_routes(self, table: Table, info: PrefixInfo) -> None:
"""Add RPKI route information to the display table."""
if info.rpkiRoutes:
rpki_rows = [
f"AS{route.asn} (MaxLength: {route.rpkiMaxLength}, Status: {route.rpkiStatus})"
for route in info.rpkiRoutes
]
table.add_row("RPKI Routes", "\n".join(rpki_rows))

async def add_irr_routes(self, table: Table, info: PrefixInfo) -> None:
"""Add IRR route information to the display table."""
if info.irrRoutes:
irr_rows = [
f"{db}: AS{route.asn} ({route.rpkiStatus})" for db, routes in info.irrRoutes.items() for route in routes
]
if irr_rows:
table.add_row("IRR Routes", "\n".join(irr_rows))

async def add_status(self, table: Table, info: PrefixInfo) -> None:
"""Add status information to the display table."""
status_style = await self.get_status_style(info.categoryOverall)
table.add_row("Status", Text(info.categoryOverall, style=status_style))

async def add_messages(self, table: Table, info: PrefixInfo) -> None:
"""Add message information to the display table."""
if info.messages:
messages = "\n".join(f"• {msg.text}" for msg in info.messages)
table.add_row("Messages", messages)

async def get_status_style(self, category: str) -> str:
"""Get Rich style color based on status category."""
return {"success": "green", "warning": "yellow", "error": "red", "danger": "red", "info": "blue"}.get(
category, "white"
)

async def display_prefix_info(self, direct_overlaps: List[PrefixInfo]) -> None:
"""Display prefix information in Rich panels."""
direct_panels = await self.sort_and_group_panels(direct_overlaps)

direct_columns = Columns(direct_panels, equal=True, expand=True)
self.console.print(
Panel(
direct_columns,
title=f"[bold]Directly overlapping prefixes of {direct_overlaps[0].prefix}[/bold]",
expand=False,
)
)

least_specific = None
for info in direct_overlaps:
if "/" in info.prefix:
_, mask = info.prefix.split("/")
if least_specific is None or int(mask) < int(least_specific.split("/")[1]):
least_specific = info.prefix

if least_specific:
try:
explorer = IrrExplorer()
all_overlaps = await explorer.fetch_prefix_info(least_specific)
all_panels = await self.sort_and_group_panels(all_overlaps)
all_columns = Columns(all_panels, equal=True, expand=True)

self.console.print("\n")
self.console.print(
Panel(
all_columns,
title=f"[bold]All overlaps of least specific match {least_specific}[/bold]",
expand=False,
)
)
await explorer.close()
except (httpx.HTTPError, ValueError, RuntimeError) as e:
self.console.print(f"[red]Error fetching overlaps for {least_specific}: {e}[/red]")
70 changes: 70 additions & 0 deletions irrexplorer_cli/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""Command-line interface for IRR Explorer queries."""

import asyncio
import json
from importlib.metadata import version
from typing import Optional

import typer
from rich.console import Console

from irrexplorer_cli.irrexplorer import IrrDisplay, IrrExplorer

__version__ = version("irrexplorer-cli")

app = typer.Typer(
help="CLI tool to query IRR Explorer data for prefix information",
no_args_is_help=True,
context_settings={"help_option_names": ["-h", "--help"]},
)
console = Console()


def version_display(display_version: bool) -> None:
"""Display version information and exit."""
if display_version:
print(f"[bold]IRR Explorer CLI[/bold] version: {__version__}")
raise typer.Exit()


@app.callback()
def callback(
_: bool = typer.Option(None, "--version", "-v", callback=version_display, is_eager=True),
) -> None:
"""Query IRR Explorer for prefix information."""


@app.command(no_args_is_help=True)
def query(
ctx: typer.Context,
prefix: str = typer.Argument(
None,
help="IP prefix to query (e.g., 5.57.21.0/24)",
),
output_format: Optional[str] = typer.Option(
None,
"--format",
"-f",
help="Output format (table, json)",
),
) -> None:
"""Query IRR Explorer for prefix information."""
if not prefix:
ctx.get_help()
raise typer.Exit()
asyncio.run(async_query(prefix, output_format))


async def async_query(prefix: str, output_format: Optional[str] = None) -> None:
"""Execute asynchronous prefix query and display results."""
explorer = IrrExplorer()
display = IrrDisplay()
try:
results = await explorer.fetch_prefix_info(prefix)
if output_format == "json":
json_data = [result.model_dump() for result in results]
print(json.dumps(json_data, indent=2))
else:
await display.display_prefix_info(results)
finally:
await explorer.close()
Loading

0 comments on commit 27aad49

Please sign in to comment.