Skip to content

Commit

Permalink
feat(irrexplorer-cli): init
Browse files Browse the repository at this point in the history
  • Loading branch information
kiraum committed Dec 21, 2024
1 parent 3f390db commit affeabf
Show file tree
Hide file tree
Showing 20 changed files with 1,807 additions and 1 deletion.
11 changes: 11 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# To get started with Dependabot version updates, you'll need to specify which
# package ecosystems to update and where the package manifests are located.
# Please see the documentation for all configuration options:
# https://docs.github.com/code-security/dependabot/dependabot-version-updates/configuration-options-for-the-dependabot.yml-file

version: 2
updates:
- package-ecosystem: "pip" # See documentation for possible values
directory: "/" # Location of package manifests
schedule:
interval: "weekly"
26 changes: 26 additions & 0 deletions .github/workflows/linter.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
name: Test (linter/formatter/coverage)

on: [push]

jobs:
build:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.13"]

steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}

- name: Install dependencies
run: |
pip install --upgrade uv
uv pip sync --system --break-system-packages requirements.lock
- name: Run all linters and formatters
run: |
pre-commit run --all-files
52 changes: 52 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: 'v5.0.0'
hooks:
- id: check-yaml
- id: end-of-file-fixer
- id: trailing-whitespace
- id: check-toml
- id: check-added-large-files

- repo: local
hooks:
- id: isort
name: isort
entry: isort
language: system
types: [python]

- id: black
name: black
entry: black
language: python
types_or: [python, pyi]

- id: ruff
name: ruff
entry: ruff check
language: python
types_or: [python, pyi]

- id: mypy
name: mypy
entry: mypy
language: python
types_or: [python, pyi]
require_serial: true

- id: pylint
name: pylint
entry: pylint
language: system
types: [python]
args:
- --rcfile=pyproject.toml

- id: pytest
name: pytest
entry: pytest
language: system
types: [python]
pass_filenames: false
args: ['--cov=irrexplorer_cli', '--cov-fail-under=100', 'tests/']
29 changes: 28 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,28 @@
# irrexplorer-cli
# irrexplorer-cli

Here's how to compile and install the package using uv:

uv pip install --editable ".[dev]"

Copied

Execute

For a production build without development dependencies:

uv pip install .




pytest tests/ -v --cov=irrexplorer_cli


TODO: Update README


pre-commit autoupdate --repo https://github.com/pre-commit/pre-commit-hooks



pytest --cov=irrexplorer_cli --cov-report=term-missing tests/
Empty file added irrexplorer_cli/__init__.py
Empty file.
123 changes: 123 additions & 0 deletions irrexplorer_cli/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
"""Helper functions for the CLI."""

import ipaddress
import re
from typing import Any, Dict, List

from irrexplorer_cli.models import PrefixInfo, PrefixResult


def validate_prefix_format(prefix_input: str) -> bool:
"""Validate IPv4 or IPv6 prefix format."""
try:
ipaddress.ip_network(prefix_input)
return True
except ValueError:
return False


def validate_asn_format(asn_input: str) -> bool:
"""Validate ASN format."""
asn_pattern = r"^(?:AS|as)?(\d+)$"
match = re.match(asn_pattern, asn_input)
if match:
asn_number = int(match.group(1))
return 0 <= asn_number <= 4294967295
return False


def format_prefix_result(result: PrefixInfo, prefix_type: str) -> str:
"""Format a single prefix result for CSV output."""
prefix_result = PrefixResult(
prefix=result.prefix,
categoryOverall=result.categoryOverall,
rir=result.rir,
rpkiRoutes=result.rpkiRoutes,
bgpOrigins=result.bgpOrigins,
irrRoutes=result.irrRoutes,
messages=result.messages,
)

rpki_status = "NOT_FOUND"
if prefix_result.rpkiRoutes:
rpki_status = prefix_result.rpkiRoutes[0].rpkiStatus

bgp_origins = "|".join(str(asn) for asn in prefix_result.bgpOrigins)

irr_routes = []
for db, routes in prefix_result.irrRoutes.items():
for route in routes:
irr_routes.append(f"{db}:AS{route.asn}:{route.rpkiStatus}")
irr_routes_str = "|".join(irr_routes)

messages = "|".join(msg.text for msg in prefix_result.messages)

return (
f"{prefix_type},{prefix_result.prefix},{prefix_result.categoryOverall},"
f"{prefix_result.rir},{rpki_status},{bgp_origins},{irr_routes_str},{messages}"
)


def format_direct_origins(as_number: str, results: Dict[str, List[Dict[str, Any]]]) -> None:
"""Format and print direct origin prefixes."""
for pfx_dict in results.get("directOrigin", []):
pfx = PrefixInfo(**pfx_dict)
rpki_status = "NOT_FOUND"
if pfx.rpkiRoutes:
rpki_status = pfx.rpkiRoutes[0].rpkiStatus

bgp_origins = "|".join(str(as_number) for as_number in pfx.bgpOrigins)
irr_routes = []
for db, routes in pfx.irrRoutes.items():
for route in routes:
irr_routes.append(f"{db}:AS{route.asn}:{route.rpkiStatus}")
irr_routes_str = "|".join(irr_routes)
messages = "|".join(msg.text for msg in pfx.messages)

print(
f"\nDIRECT,{as_number},{pfx.prefix},{pfx.categoryOverall},{pfx.rir},"
f"{rpki_status},{bgp_origins},{irr_routes_str},{messages}",
end="",
)


def format_overlapping_prefixes(as_number: str, results: Dict[str, List[Dict[str, Any]]]) -> None:
"""Format and print overlapping prefixes."""
for pfx_dict in results.get("overlaps", []):
pfx = PrefixInfo(**pfx_dict)
rpki_status = "NOT_FOUND"
if pfx.rpkiRoutes:
rpki_status = pfx.rpkiRoutes[0].rpkiStatus

bgp_origins = "|".join(str(as_number) for as_number in pfx.bgpOrigins)
irr_routes = []
for db, routes in pfx.irrRoutes.items():
for route in routes:
irr_routes.append(f"{db}:AS{route.asn}:{route.rpkiStatus}")
irr_routes_str = "|".join(irr_routes)
messages = "|".join(msg.text for msg in pfx.messages)

print(
f"\nOVERLAP,{as_number},{pfx.prefix},{pfx.categoryOverall},{pfx.rir},"
f"{rpki_status},{bgp_origins},{irr_routes_str},{messages}",
end="",
)


def format_as_sets(as_number: str, sets_data: Dict[str, Dict[str, List[str]]]) -> None:
"""Format and print AS sets."""
if sets_data and sets_data.get("setsPerIrr"):
for irr, sets in sets_data["setsPerIrr"].items():
for as_set in sets:
print(f"\nSET,{as_number},{as_set},{irr},N/A,N/A,N/A,N/A,N/A", end="")


async def find_least_specific_prefix(direct_overlaps: List[PrefixInfo]) -> str | None:
"""Find the least specific prefix from the overlaps."""
least_specific = None
for info in direct_overlaps:
if "/" in info.prefix:
_, mask = info.prefix.split("/")
if least_specific is None or int(mask) < int(least_specific.split("/")[1]):
least_specific = info.prefix
return least_specific
Loading

0 comments on commit affeabf

Please sign in to comment.