Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tests #4

Merged
merged 6 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions .github/workflows/tox.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
name: Run tests
on:
push:
tags:
- "*.*.*"
branches:
- main
pull_request:

jobs:
tox:
name: "tox"
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2

- name: Setup Python
uses: actions/setup-python@v2
with:
python-version: '3.11'
architecture: 'x64'

- name: Install Dependencies
run: |
python -m pip install --upgrade pip
pip install tox wheel flake8 build

- name: Run tests
run: tox
8 changes: 5 additions & 3 deletions fixca/__main__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import sys
import resotolib.proc
from signal import SIGTERM
from tempfile import TemporaryDirectory
from resotolib.logger import log, setup_logger, add_args as logging_add_args
from resotolib.web import WebServer
Expand All @@ -9,19 +10,20 @@
from .args import parse_args
from .ca import CA, WebApp, CaApp
from threading import Event
from typing import Any


shutdown_event = Event()


def shutdown(event) -> None:
def shutdown(even: Any) -> None:
log.info("Shutting down")
shutdown_event.set()


def main() -> None:
setup_logger("fixca")
args = parse_args([logging_add_args])
args = parse_args([logging_add_args]) # type: ignore
log.info(f"Starting FIX CA on port {args.port}")
resotolib.proc.initializer()
resotolib.proc.parent_pid = os.getpid()
Expand Down Expand Up @@ -63,7 +65,7 @@ def main() -> None:
shutdown_event.wait()
web_server.shutdown()

resotolib.proc.kill_children(resotolib.proc.SIGTERM, ensure_death=True)
resotolib.proc.kill_children(SIGTERM, ensure_death=True)
log.info("Shutdown complete")
sys.exit(0)

Expand Down
5 changes: 3 additions & 2 deletions fixca/args.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import os
from argparse import ArgumentParser, Namespace
from typing import Callable, List
from resotolib.args import ArgumentParser as ResotoArgumentParser
from typing import Callable, List, Union


def parse_args(add_args: List[Callable]) -> Namespace:
def parse_args(add_args: List[Callable[[ArgumentParser], None]]) -> Namespace:
parser = ArgumentParser(prog="fixca", description="FIX Certification Authority")
parser.add_argument("--psk", dest="psk", help="Pre-shared-key", default=os.environ.get("FIXCA_PSK"))
parser.add_argument(
Expand Down
59 changes: 31 additions & 28 deletions fixca/ca.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from functools import wraps
from prometheus_client.exposition import generate_latest, CONTENT_TYPE_LATEST
from typing import Optional, Dict, Callable, Tuple, Union, Any, List
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey, RSAPublicKey
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
from cryptography.x509.base import Certificate, CertificateSigningRequest
from resotolib.logger import log
from resotolib.x509 import (
Expand All @@ -25,10 +25,10 @@


class CertificateAuthority:
def __init__(self):
self.cert = None
self.__key = None
self.__initialized = False
def __init__(self) -> None:
self.cert: Optional[Certificate] = None
self.__key: Optional[RSAPrivateKey] = None
self.__initialized: bool = False

@staticmethod
def requires_initialized(func: Callable[..., Any]) -> Callable[..., Any]:
Expand All @@ -42,6 +42,7 @@ def wrapper(ca_instance: "CertificateAuthority", *args: Any, **kwargs: Any) -> A

@requires_initialized
def sign(self, csr: CertificateSigningRequest) -> Certificate:
assert self.__key is not None and self.cert is not None
return sign_csr(csr, self.__key, self.cert)

def initialize(self, namespace: str = "cert-manager", secret_name: str = "fix-ca", dummy_ca: bool = False) -> None:
Expand All @@ -62,7 +63,7 @@ def __load_ca_data(
log.info("Loading CA data")
ca_secret = get_secret(namespace=namespace, secret_name=secret_name)

if isinstance(ca_secret, dict) and (not "tls.key" in ca_secret or not "tls.crt" in ca_secret):
if isinstance(ca_secret, dict) and ("tls.key" not in ca_secret or "tls.crt" not in ca_secret):
ca_secret = None
log.error("CA secret is missing key or cert")

Expand Down Expand Up @@ -126,6 +127,7 @@ def store_secret(
include_ca_bundle: bool = False,
) -> None:
log.info(f"Storing certificate {cert_crt.subject.rfc4514_string()} in {namespace}/{secret_name}")
assert self.cert is not None
secret = {
key_cert: cert_to_bytes(cert_crt).decode("utf-8"),
key_key: key_to_bytes(cert_key).decode("utf-8"),
Expand All @@ -143,10 +145,10 @@ def store_secret(


CA: CertificateAuthority = CertificateAuthority()
PSK: Optional[Union[str, Certificate, RSAPublicKey]] = None
PSK: Optional[str] = None


def jwt_check():
def jwt_check() -> None:
headers = cherrypy.request.headers
assert PSK is not None

Expand Down Expand Up @@ -182,8 +184,8 @@ def __init__(
if self.mountpoint not in ("/", ""):
self.config[self.mountpoint] = config

@cherrypy.expose
@cherrypy.tools.allow(methods=["GET"])
@cherrypy.expose # type: ignore
@cherrypy.tools.allow(methods=["GET"]) # type: ignore
def health(self) -> str:
cherrypy.response.headers["Content-Type"] = "text/plain"
unhealthy = [f"- {name}" for name, fn in self.health_conditions.items() if not fn()]
Expand All @@ -195,37 +197,37 @@ def health(self) -> str:
cherrypy.response.headers["Content-Type"] = "text/plain"
return "not ok\r\n\r\n" + "\r\n".join(unhealthy) + "\r\n"

@cherrypy.expose
@cherrypy.tools.allow(methods=["GET"])
@cherrypy.expose # type: ignore
@cherrypy.tools.allow(methods=["GET"]) # type: ignore
def metrics(self) -> bytes:
cherrypy.response.headers["Content-Type"] = CONTENT_TYPE_LATEST
return generate_latest()


class CaApp:
def __init__(self, ca: CertificateAuthority, psk_or_cert: Union[str, Certificate, RSAPublicKey]) -> None:
def __init__(self, ca: CertificateAuthority, psk: str) -> None:
global PSK
self.ca = ca
self.psk_or_cert = psk_or_cert
self.psk = psk
self.config = {"/": {"tools.gzip.on": False}}
PSK = self.psk_or_cert
PSK = self.psk

@cherrypy.expose
@cherrypy.tools.allow(methods=["GET"])
@cherrypy.expose # type: ignore
@cherrypy.tools.allow(methods=["GET"]) # type: ignore
def cert(self) -> bytes:
assert self.psk_or_cert is not None
assert self.psk is not None and self.ca.cert is not None
fingerprint = cert_fingerprint(self.ca.cert)
cherrypy.response.headers["Content-Type"] = "application/x-pem-file"
cherrypy.response.headers["SHA256-Fingerprint"] = fingerprint
cherrypy.response.headers["Content-Disposition"] = 'attachment; filename="fix_root_ca.pem"'
cherrypy.response.headers["Authorization"] = "Bearer " + encode_jwt(
{"sha256_fingerprint": fingerprint}, self.psk_or_cert
{"sha256_fingerprint": fingerprint}, self.psk
)
return cert_to_bytes(self.ca.cert)

@cherrypy.expose
@cherrypy.tools.allow(methods=["POST"])
@cherrypy.tools.jwt_check()
@cherrypy.expose # type: ignore
@cherrypy.tools.allow(methods=["POST"]) # type: ignore
@cherrypy.tools.jwt_check() # type: ignore
def sign(self) -> bytes:
try:
csr = load_csr_from_bytes(cherrypy.request.body.read())
Expand All @@ -242,13 +244,14 @@ def sign(self) -> bytes:
cherrypy.response.headers["Content-Disposition"] = f'attachment; filename="{filename}"'
return cert_to_bytes(crt)

@cherrypy.expose
@cherrypy.tools.json_out()
@cherrypy.tools.json_in()
@cherrypy.tools.allow(methods=["POST"])
@cherrypy.tools.jwt_check()
def generate(self) -> bytes:
@cherrypy.expose # type: ignore
@cherrypy.tools.json_out() # type: ignore
@cherrypy.tools.json_in() # type: ignore
@cherrypy.tools.allow(methods=["POST"]) # type: ignore
@cherrypy.tools.jwt_check() # type: ignore
def generate(self) -> Dict[str, Any]:
try:
assert self.ca.cert is not None
request_json = cherrypy.request.json
remote_addr = cherrypy.request.remote.ip
include_ca_cert = str_to_bool(request_json.get("include_ca_cert", False))
Expand Down
28 changes: 14 additions & 14 deletions fixca/utils.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
import time
from functools import wraps
from typing import Callable, Any, Tuple, Dict, Union, TypeVar
from typing import Callable, Any, Tuple, Dict, Union


def str_to_bool(s: Union[str, bool]) -> bool:
return str(s).lower() in ("true", "1", "yes")


RT = TypeVar("RT")
def memoize(
ttl: int = 60,
cleanup_interval: int = 600,
time_fn: Callable[[], float] = time.time,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
last_cleanup: float = 0.0
cache: Dict[Tuple[Callable[..., Any], Tuple[Any, ...], frozenset[Tuple[str, Any]]], Tuple[Any, float]] = {}


def memoize(ttl: int = 60, cleanup_interval: int = 600) -> Callable:
state = {"last_cleanup": 0}
cache: Dict[Tuple[Callable, Tuple, frozenset], Tuple[RT, float]] = {}

def decorating_function(user_function: Callable[..., RT]) -> Callable[..., RT]:
def decorating_function(user_function: Callable[..., Any]) -> Callable[..., Any]:
@wraps(user_function)
def wrapper(*args: Any, **kwargs: Any) -> RT:
nonlocal cache
now = time.time()
def wrapper(*args: Any, **kwargs: Any) -> Any:
now = time_fn()
key = (user_function, args, frozenset(kwargs.items()))
if key in cache:
result, timestamp = cache[key]
Expand All @@ -28,11 +28,11 @@ def wrapper(*args: Any, **kwargs: Any) -> RT:
result = user_function(*args, **kwargs)
cache[key] = (result, now)

nonlocal state
if now - state["last_cleanup"] > cleanup_interval:
nonlocal last_cleanup
if now - last_cleanup > cleanup_interval:
for k in [k for k, v in cache.items() if now - v[1] >= ttl]:
cache.pop(k)
state["last_cleanup"] = now
last_cleanup = now

return result

Expand Down
4 changes: 4 additions & 0 deletions genreq.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/bash
pip-compile --resolver=backtracking --upgrade --allow-unsafe --no-header --unsafe-package n/a --output-file requirements.txt
pip-compile --extra test --resolver=backtracking --upgrade --allow-unsafe --no-header --unsafe-package n/a --output-file requirements-test.txt

Loading
Loading