From 373de17ffb5ea7ecafedbef92df0843e8185b7b0 Mon Sep 17 00:00:00 2001 From: Erik Taubeneck Date: Thu, 26 Sep 2024 13:55:43 -0700 Subject: [PATCH 1/4] check to make sure helpers need to be killed before sending kill signal --- sidecar/app/query/ipa.py | 31 ++++++++++++++++++------------- sidecar/app/query/status.py | 8 ++++++++ 2 files changed, 26 insertions(+), 13 deletions(-) diff --git a/sidecar/app/query/ipa.py b/sidecar/app/query/ipa.py index 4963c8c..0ff304d 100644 --- a/sidecar/app/query/ipa.py +++ b/sidecar/app/query/ipa.py @@ -34,14 +34,25 @@ def send_kill_signals(self): for helper in settings.helpers.values(): if helper.role == self.role: continue - finish_url = urlunparse( + status_url = urlunparse( helper.sidecar_url._replace( - scheme="https", path=f"/stop/kill/{self.query_id}" + scheme="https", path=f"/start/{self.query_id}/status" ), ) + r = httpx.get(status_url).json() + status = Status.from_json(r) + self.logger.info( + f"Helper {helper.role} status for query {self.query_id}: {status.name}" + ) + if status < Status.COMPLETE: + finish_url = urlunparse( + helper.sidecar_url._replace( + scheme="https", path=f"/stop/kill/{self.query_id}" + ), + ) - r = httpx.post(finish_url) - self.logger.info(f"sent post request: {r.text}") + r = httpx.post(finish_url) + self.logger.info(f"sent post request: {r.text}") def crash(self): super().crash() @@ -261,17 +272,11 @@ def run(self): ) while True: r = httpx.get(url).json() - status = r.get("status") + status = Status.from_json(r) match status: - case Status.IN_PROGRESS.name: + case Status.IN_PROGRESS: break - case Status.KILLED.name: - self.success = False - return - case Status.NOT_FOUND.name: - self.success = False - return - case Status.CRASHED.name: + case Status.KILLED | Status.NOT_FOUND | Status.CRASHED: self.success = False return diff --git a/sidecar/app/query/status.py b/sidecar/app/query/status.py index 14014fc..9768cc6 100644 --- a/sidecar/app/query/status.py +++ b/sidecar/app/query/status.py @@ -20,6 +20,14 @@ class Status(IntEnum): KILLED = auto() CRASHED = auto() + @classmethod + def from_json(cls, response: dict[str, str]): + status_str = response.get("status", "") + try: + return cls[status_str] + except ValueError: + return cls.UNKNOWN + StatusChangeEvent = NamedTuple( "StatusChangeEvent", [("status", Status), ("timestamp", float)] From bbfa40967909790497bbbdc8593eab26d7797737 Mon Sep 17 00:00:00 2001 From: Erik Taubeneck Date: Thu, 26 Sep 2024 14:40:09 -0700 Subject: [PATCH 2/4] add test for new Status.from_json classmethod --- sidecar/app/query/status.py | 2 +- sidecar/tests/app/query/test_status.py | 16 ++++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/sidecar/app/query/status.py b/sidecar/app/query/status.py index 9768cc6..81b3f9e 100644 --- a/sidecar/app/query/status.py +++ b/sidecar/app/query/status.py @@ -25,7 +25,7 @@ def from_json(cls, response: dict[str, str]): status_str = response.get("status", "") try: return cls[status_str] - except ValueError: + except KeyError: return cls.UNKNOWN diff --git a/sidecar/tests/app/query/test_status.py b/sidecar/tests/app/query/test_status.py index 4a35c4e..a77afcd 100644 --- a/sidecar/tests/app/query/test_status.py +++ b/sidecar/tests/app/query/test_status.py @@ -113,3 +113,19 @@ def test_status_history_status_event_json( "start_time": now, "end_time": now2, } + + +def test_status_from_json(): + # matching string + status = Status.from_json({"status": "STARTING"}) + assert status == Status.STARTING + status = Status.from_json({"status": "UNKNOWN"}) + assert status == Status.UNKNOWN + + # non-mathcing string + status = Status.from_json({"status": "not-a-status"}) + assert status == Status.UNKNOWN + + # empty json + status = Status.from_json({}) + assert status == Status.UNKNOWN From 1b0f4d916d9458a8e924c083330c1b56a198bce2 Mon Sep 17 00:00:00 2001 From: Erik Taubeneck Date: Thu, 26 Sep 2024 16:10:04 -0700 Subject: [PATCH 3/4] refactor repeated url building, move status/kill/finish methods to Helper class --- pyproject.toml | 2 + sidecar/app/helpers.py | 69 +++++++++++++++++++++- sidecar/app/query/ipa.py | 79 +++++++++----------------- sidecar/app/settings.py | 4 ++ sidecar/tests/app/query/test_status.py | 26 ++++----- 5 files changed, 112 insertions(+), 68 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 3e240fd..3a90c07 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,6 +39,8 @@ py-modules = ["sidecar",] profile = "black" py_version=39 skip_glob=["server"] +extra_standard_library = ["tomllib"] + [tool.pylint.format] max-line-length = "88" diff --git a/sidecar/app/helpers.py b/sidecar/app/helpers.py index 0686aef..608762c 100644 --- a/sidecar/app/helpers.py +++ b/sidecar/app/helpers.py @@ -1,12 +1,16 @@ +import tomllib from dataclasses import dataclass from enum import IntEnum +from json import JSONDecodeError from pathlib import Path -from urllib.parse import ParseResult, urlparse +from urllib.parse import ParseResult, urlparse, urlunparse -import tomllib +import httpx from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey from cryptography.x509 import load_pem_x509_certificate +from .query.step import Status + class Role(IntEnum): COORDINATOR = 0 @@ -22,6 +26,67 @@ class Helper: sidecar_url: ParseResult public_key: EllipticCurvePublicKey + def query_status_url(self, query_id: str) -> str: + return str( + urlunparse( + self.sidecar_url._replace( + scheme="https", path=f"/start/{query_id}/status" + ), + ) + ) + + def query_finish_url(self, query_id: str) -> str: + return str( + urlunparse( + self.sidecar_url._replace( + scheme="https", path=f"/stop/finish/{query_id}" + ), + ) + ) + + def query_kill_url(self, query_id: str) -> str: + return str( + urlunparse( + self.sidecar_url._replace( + scheme="https", path=f"/stop/kill/{query_id}" + ), + ) + ) + + def get_current_query_status(self, query_id: str) -> Status: + try: + r = httpx.get(self.query_kill_url(query_id)) + except httpx.RequestError: + return Status.UNKNOWN + try: + j = r.json() + except JSONDecodeError: + return Status.UNKNOWN + + return Status.from_json(j) + + def kill_query(self, query_id: str) -> str: + status = self.get_current_query_status(query_id) + if status >= Status.COMPLETE: + return ( + f"not sending kill signal. helper {self.role} " + f"already has status {status}" + ) + r = httpx.post(self.query_kill_url(query_id)) + return f"sent kill signal for query({query_id}) to helper {self.role}: {r.text}" + + def finish_query(self, query_id: str) -> str: + status = self.get_current_query_status(query_id) + if status >= Status.COMPLETE: + return ( + f"not sending finish signal. helper {self.role} " + f"already has status {status}" + ) + r = httpx.post(self.query_finish_url(query_id)) + return ( + f"sent finish signal for query({query_id}) to helper {self.role}: {r.text}" + ) + def load_helpers_from_network_config(network_config_path: Path) -> dict[Role, Helper]: with network_config_path.open("rb") as f: diff --git a/sidecar/app/query/ipa.py b/sidecar/app/query/ipa.py index 0ff304d..5a7cf12 100644 --- a/sidecar/app/query/ipa.py +++ b/sidecar/app/query/ipa.py @@ -5,12 +5,9 @@ from enum import StrEnum from pathlib import Path from typing import ClassVar -from urllib.parse import urlunparse -import httpx import loguru -from ..helpers import Role from ..local_paths import Paths from ..settings import get_settings from .base import Query @@ -31,28 +28,9 @@ class IPAQuery(Query): def send_kill_signals(self): self.logger.info("sending kill signals") settings = get_settings() - for helper in settings.helpers.values(): - if helper.role == self.role: - continue - status_url = urlunparse( - helper.sidecar_url._replace( - scheme="https", path=f"/start/{self.query_id}/status" - ), - ) - r = httpx.get(status_url).json() - status = Status.from_json(r) - self.logger.info( - f"Helper {helper.role} status for query {self.query_id}: {status.name}" - ) - if status < Status.COMPLETE: - finish_url = urlunparse( - helper.sidecar_url._replace( - scheme="https", path=f"/stop/kill/{self.query_id}" - ), - ) - - r = httpx.post(finish_url) - self.logger.info(f"sent post request: {r.text}") + for helper in settings.other_helpers: + response = helper.kill_query(self.query_id) + self.logger.info(response) def crash(self): super().crash() @@ -259,26 +237,31 @@ def build_from_query(cls, query: IPAQuery): def run(self): settings = get_settings() - sidecar_urls = [ - helper.sidecar_url - for helper in settings.helpers.values() - if helper.role != Role.COORDINATOR - ] - for sidecar_url in sidecar_urls: - url = urlunparse( - sidecar_url._replace( - scheme="https", path=f"/start/{self.query_id}/status" - ), - ) + for helper in settings.other_helpers: + max_unknonwn_status_wait_time = 100 + current_unknown_status_wait_time = 0 + loop_wait_time = 1 while True: - r = httpx.get(url).json() - status = Status.from_json(r) + status = helper.get_current_query_status(self.query_id) match status: case Status.IN_PROGRESS: break case Status.KILLED | Status.NOT_FOUND | Status.CRASHED: self.success = False return + case Status.STARTING | Status.COMPILING | Status.WAITING_TO_START: + # keep waiting while it's in a startup state + continue + case Status.UNKNOWN | Status.NOT_FOUND: + # eventually fail if the status is unknown or not found + # for ~100 seconds + current_unknown_status_wait_time += loop_wait_time + if ( + current_unknown_status_wait_time + >= max_unknonwn_status_wait_time + ): + self.success = False + return time.sleep(1) time.sleep(3) # allow enough time for the command to start @@ -357,24 +340,16 @@ class IPACoordinatorQuery(IPAQuery): IPACoordinatorStartStep, ] - def send_terminate_signals(self): - self.logger.info("sending terminate signals") + def send_finish_signals(self): + self.logger.info("sending finish signals") settings = get_settings() - for helper in settings.helpers.values(): - if helper.role == self.role: - continue - finish_url = urlunparse( - helper.sidecar_url._replace( - scheme="https", path=f"/stop/finish/{self.query_id}" - ), - ) - - r = httpx.post(finish_url) - self.logger.info(f"sent post request: {finish_url}: {r.text}") + for helper in settings.other_helpers: + resp = helper.finish_query(self.query_id) + self.logger.info(resp) def finish(self): super().finish() - self.send_terminate_signals() + self.send_finish_signals() @dataclass(kw_only=True) diff --git a/sidecar/app/settings.py b/sidecar/app/settings.py index 912e896..44bd19b 100644 --- a/sidecar/app/settings.py +++ b/sidecar/app/settings.py @@ -62,6 +62,10 @@ def helper(self) -> Helper: def helpers(self) -> dict[Role, Helper]: return self._helpers + @property + def other_helpers(self) -> list[Helper]: + return [helper for helper in self._helpers.values() if helper.role != self.role] + @property def status_dir_path(self) -> Path: return self.root_path / Path("status") diff --git a/sidecar/tests/app/query/test_status.py b/sidecar/tests/app/query/test_status.py index a77afcd..72c87c8 100644 --- a/sidecar/tests/app/query/test_status.py +++ b/sidecar/tests/app/query/test_status.py @@ -115,17 +115,15 @@ def test_status_history_status_event_json( } -def test_status_from_json(): - # matching string - status = Status.from_json({"status": "STARTING"}) - assert status == Status.STARTING - status = Status.from_json({"status": "UNKNOWN"}) - assert status == Status.UNKNOWN - - # non-mathcing string - status = Status.from_json({"status": "not-a-status"}) - assert status == Status.UNKNOWN - - # empty json - status = Status.from_json({}) - assert status == Status.UNKNOWN +@pytest.mark.parametrize( + "json_input,expected_status", + [ + ({"status": "STARTING"}, Status.STARTING), + ({"status": "UNKNOWN"}, Status.UNKNOWN), + ({"status": "not-a-status"}, Status.UNKNOWN), + ({}, Status.UNKNOWN), # Empty JSON + ({"other_key": "value"}, Status.UNKNOWN), # Missing "status" key + ], +) +def test_status_from_json(json_input, expected_status): + assert Status.from_json(json_input) == expected_status From 8a9dcb21fe060f1a00190c6ced70e2886648c1e9 Mon Sep 17 00:00:00 2001 From: Erik Taubeneck Date: Thu, 26 Sep 2024 20:55:20 -0700 Subject: [PATCH 4/4] Update sidecar/app/helpers.py --- sidecar/app/helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sidecar/app/helpers.py b/sidecar/app/helpers.py index 608762c..c4a5b6e 100644 --- a/sidecar/app/helpers.py +++ b/sidecar/app/helpers.py @@ -55,7 +55,7 @@ def query_kill_url(self, query_id: str) -> str: def get_current_query_status(self, query_id: str) -> Status: try: - r = httpx.get(self.query_kill_url(query_id)) + r = httpx.get(self.query_status_url(query_id)) except httpx.RequestError: return Status.UNKNOWN try: