diff --git a/pyproject.toml b/pyproject.toml index 3e240fd..3a90c07 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,6 +39,8 @@ py-modules = ["sidecar",] profile = "black" py_version=39 skip_glob=["server"] +extra_standard_library = ["tomllib"] + [tool.pylint.format] max-line-length = "88" diff --git a/sidecar/app/helpers.py b/sidecar/app/helpers.py index 0686aef..c4a5b6e 100644 --- a/sidecar/app/helpers.py +++ b/sidecar/app/helpers.py @@ -1,12 +1,16 @@ +import tomllib from dataclasses import dataclass from enum import IntEnum +from json import JSONDecodeError from pathlib import Path -from urllib.parse import ParseResult, urlparse +from urllib.parse import ParseResult, urlparse, urlunparse -import tomllib +import httpx from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey from cryptography.x509 import load_pem_x509_certificate +from .query.step import Status + class Role(IntEnum): COORDINATOR = 0 @@ -22,6 +26,67 @@ class Helper: sidecar_url: ParseResult public_key: EllipticCurvePublicKey + def query_status_url(self, query_id: str) -> str: + return str( + urlunparse( + self.sidecar_url._replace( + scheme="https", path=f"/start/{query_id}/status" + ), + ) + ) + + def query_finish_url(self, query_id: str) -> str: + return str( + urlunparse( + self.sidecar_url._replace( + scheme="https", path=f"/stop/finish/{query_id}" + ), + ) + ) + + def query_kill_url(self, query_id: str) -> str: + return str( + urlunparse( + self.sidecar_url._replace( + scheme="https", path=f"/stop/kill/{query_id}" + ), + ) + ) + + def get_current_query_status(self, query_id: str) -> Status: + try: + r = httpx.get(self.query_status_url(query_id)) + except httpx.RequestError: + return Status.UNKNOWN + try: + j = r.json() + except JSONDecodeError: + return Status.UNKNOWN + + return Status.from_json(j) + + def kill_query(self, query_id: str) -> str: + status = self.get_current_query_status(query_id) + if status >= Status.COMPLETE: + return ( + f"not sending kill signal. helper {self.role} " + f"already has status {status}" + ) + r = httpx.post(self.query_kill_url(query_id)) + return f"sent kill signal for query({query_id}) to helper {self.role}: {r.text}" + + def finish_query(self, query_id: str) -> str: + status = self.get_current_query_status(query_id) + if status >= Status.COMPLETE: + return ( + f"not sending finish signal. helper {self.role} " + f"already has status {status}" + ) + r = httpx.post(self.query_finish_url(query_id)) + return ( + f"sent finish signal for query({query_id}) to helper {self.role}: {r.text}" + ) + def load_helpers_from_network_config(network_config_path: Path) -> dict[Role, Helper]: with network_config_path.open("rb") as f: diff --git a/sidecar/app/query/ipa.py b/sidecar/app/query/ipa.py index 4963c8c..5a7cf12 100644 --- a/sidecar/app/query/ipa.py +++ b/sidecar/app/query/ipa.py @@ -5,12 +5,9 @@ from enum import StrEnum from pathlib import Path from typing import ClassVar -from urllib.parse import urlunparse -import httpx import loguru -from ..helpers import Role from ..local_paths import Paths from ..settings import get_settings from .base import Query @@ -31,17 +28,9 @@ class IPAQuery(Query): def send_kill_signals(self): self.logger.info("sending kill signals") settings = get_settings() - for helper in settings.helpers.values(): - if helper.role == self.role: - continue - finish_url = urlunparse( - helper.sidecar_url._replace( - scheme="https", path=f"/stop/kill/{self.query_id}" - ), - ) - - r = httpx.post(finish_url) - self.logger.info(f"sent post request: {r.text}") + for helper in settings.other_helpers: + response = helper.kill_query(self.query_id) + self.logger.info(response) def crash(self): super().crash() @@ -248,32 +237,31 @@ def build_from_query(cls, query: IPAQuery): def run(self): settings = get_settings() - sidecar_urls = [ - helper.sidecar_url - for helper in settings.helpers.values() - if helper.role != Role.COORDINATOR - ] - for sidecar_url in sidecar_urls: - url = urlunparse( - sidecar_url._replace( - scheme="https", path=f"/start/{self.query_id}/status" - ), - ) + for helper in settings.other_helpers: + max_unknonwn_status_wait_time = 100 + current_unknown_status_wait_time = 0 + loop_wait_time = 1 while True: - r = httpx.get(url).json() - status = r.get("status") + status = helper.get_current_query_status(self.query_id) match status: - case Status.IN_PROGRESS.name: + case Status.IN_PROGRESS: break - case Status.KILLED.name: - self.success = False - return - case Status.NOT_FOUND.name: - self.success = False - return - case Status.CRASHED.name: + case Status.KILLED | Status.NOT_FOUND | Status.CRASHED: self.success = False return + case Status.STARTING | Status.COMPILING | Status.WAITING_TO_START: + # keep waiting while it's in a startup state + continue + case Status.UNKNOWN | Status.NOT_FOUND: + # eventually fail if the status is unknown or not found + # for ~100 seconds + current_unknown_status_wait_time += loop_wait_time + if ( + current_unknown_status_wait_time + >= max_unknonwn_status_wait_time + ): + self.success = False + return time.sleep(1) time.sleep(3) # allow enough time for the command to start @@ -352,24 +340,16 @@ class IPACoordinatorQuery(IPAQuery): IPACoordinatorStartStep, ] - def send_terminate_signals(self): - self.logger.info("sending terminate signals") + def send_finish_signals(self): + self.logger.info("sending finish signals") settings = get_settings() - for helper in settings.helpers.values(): - if helper.role == self.role: - continue - finish_url = urlunparse( - helper.sidecar_url._replace( - scheme="https", path=f"/stop/finish/{self.query_id}" - ), - ) - - r = httpx.post(finish_url) - self.logger.info(f"sent post request: {finish_url}: {r.text}") + for helper in settings.other_helpers: + resp = helper.finish_query(self.query_id) + self.logger.info(resp) def finish(self): super().finish() - self.send_terminate_signals() + self.send_finish_signals() @dataclass(kw_only=True) diff --git a/sidecar/app/query/status.py b/sidecar/app/query/status.py index 14014fc..81b3f9e 100644 --- a/sidecar/app/query/status.py +++ b/sidecar/app/query/status.py @@ -20,6 +20,14 @@ class Status(IntEnum): KILLED = auto() CRASHED = auto() + @classmethod + def from_json(cls, response: dict[str, str]): + status_str = response.get("status", "") + try: + return cls[status_str] + except KeyError: + return cls.UNKNOWN + StatusChangeEvent = NamedTuple( "StatusChangeEvent", [("status", Status), ("timestamp", float)] diff --git a/sidecar/app/settings.py b/sidecar/app/settings.py index 912e896..44bd19b 100644 --- a/sidecar/app/settings.py +++ b/sidecar/app/settings.py @@ -62,6 +62,10 @@ def helper(self) -> Helper: def helpers(self) -> dict[Role, Helper]: return self._helpers + @property + def other_helpers(self) -> list[Helper]: + return [helper for helper in self._helpers.values() if helper.role != self.role] + @property def status_dir_path(self) -> Path: return self.root_path / Path("status") diff --git a/sidecar/tests/app/query/test_status.py b/sidecar/tests/app/query/test_status.py index 4a35c4e..72c87c8 100644 --- a/sidecar/tests/app/query/test_status.py +++ b/sidecar/tests/app/query/test_status.py @@ -113,3 +113,17 @@ def test_status_history_status_event_json( "start_time": now, "end_time": now2, } + + +@pytest.mark.parametrize( + "json_input,expected_status", + [ + ({"status": "STARTING"}, Status.STARTING), + ({"status": "UNKNOWN"}, Status.UNKNOWN), + ({"status": "not-a-status"}, Status.UNKNOWN), + ({}, Status.UNKNOWN), # Empty JSON + ({"other_key": "value"}, Status.UNKNOWN), # Missing "status" key + ], +) +def test_status_from_json(json_input, expected_status): + assert Status.from_json(json_input) == expected_status