Skip to content

Commit

Permalink
Bump neuro-config-client from 24.9.0 to 24.12.4 (#1002)
Browse files Browse the repository at this point in the history
  • Loading branch information
dependabot[bot] authored Dec 24, 2024
1 parent 3e3b1f1 commit 8ecae26
Show file tree
Hide file tree
Showing 11 changed files with 468 additions and 185 deletions.
26 changes: 5 additions & 21 deletions platform_monitoring/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
from collections.abc import AsyncIterator, Awaitable, Callable, Coroutine
from contextlib import AsyncExitStack, asynccontextmanager, suppress
from importlib.metadata import version
from pathlib import Path
from tempfile import mktemp
from typing import Any

import aiobotocore.session
Expand All @@ -33,11 +31,6 @@
from neuro_auth_client.security import AuthScheme, setup_security
from neuro_config_client import ConfigClient
from neuro_logging import init_logging, setup_sentry
from neuro_sdk import (
Client as PlatformApiClient,
Factory as PlatformClientFactory,
JobDescription as Job,
)
from yarl import URL

from .base import JobStats, Telemetry
Expand All @@ -60,6 +53,7 @@
S3LogsService,
s3_client_error,
)
from .platform_api_client import ApiClient, Job
from .user import untrusted_user
from .utils import JobsHelper, KubeHelper, parse_date
from .validators import (
Expand Down Expand Up @@ -638,19 +632,9 @@ async def create_monitoring_app(config: Config) -> aiohttp.web.Application:
@asynccontextmanager
async def create_platform_api_client(
url: URL, token: str, trace_configs: list[aiohttp.TraceConfig] | None = None
) -> AsyncIterator[PlatformApiClient]:
tmp_config = Path(mktemp())
platform_api_factory = PlatformClientFactory(
tmp_config, trace_configs=trace_configs
)
await platform_api_factory.login_with_token(url=url / "api/v1", token=token)
client = None
try:
client = await platform_api_factory.get()
) -> AsyncIterator[ApiClient]:
async with ApiClient(url=url, token=token, trace_configs=trace_configs) as client:
yield client
finally:
if client:
await client.close()


@asynccontextmanager
Expand Down Expand Up @@ -768,7 +752,7 @@ async def create_app(config: Config) -> aiohttp.web.Application:
async def _init_app(app: aiohttp.web.Application) -> AsyncIterator[None]:
async with AsyncExitStack() as exit_stack:
logger.info("Initializing Platform API client")
platform_client = await exit_stack.enter_async_context(
platform_api_client = await exit_stack.enter_async_context(
create_platform_api_client(
config.platform_api.url, config.platform_api.token
)
Expand Down Expand Up @@ -822,7 +806,7 @@ async def _init_app(app: aiohttp.web.Application) -> AsyncIterator[None]:
)
jobs_service = JobsService(
config_client=config_client,
jobs_client=platform_client.jobs,
jobs_client=platform_api_client,
kube_client=kube_client,
container_runtime_client_registry=container_runtime_client_registry,
cluster_name=config.cluster_name,
Expand Down
6 changes: 2 additions & 4 deletions platform_monitoring/container_runtime_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ async def attach(
stdout=_bool_to_str(stdout),
stderr=_bool_to_str(stderr),
),
timeout=None, # type: ignore
receive_timeout=None,
timeout=aiohttp.ClientWSTimeout(),
heartbeat=30,
) as ws:
yield ws
Expand Down Expand Up @@ -87,8 +86,7 @@ async def exec(
stdout=_bool_to_str(stdout),
stderr=_bool_to_str(stderr),
),
timeout=None, # type: ignore
receive_timeout=None,
timeout=aiohttp.ClientWSTimeout(),
heartbeat=30,
) as ws:
yield ws
Expand Down
22 changes: 8 additions & 14 deletions platform_monitoring/jobs_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import aiohttp
from neuro_config_client import ConfigClient, ResourcePoolType
from neuro_sdk import JobDescription as Job, Jobs as JobsClient

from .config import KubeConfig
from .container_runtime_client import (
Expand All @@ -22,6 +21,7 @@
NodeResources,
Pod,
)
from .platform_api_client import ApiClient, Job
from .user import User
from .utils import KubeHelper, asyncgeneratorcontextmanager

Expand Down Expand Up @@ -53,7 +53,7 @@ def __init__(
self,
*,
config_client: ConfigClient,
jobs_client: JobsClient,
jobs_client: ApiClient,
kube_client: KubeClient,
container_runtime_client_registry: ContainerRuntimeClientRegistry,
cluster_name: str,
Expand All @@ -68,25 +68,19 @@ def __init__(
self._kube_node_pool_label = kube_node_pool_label

async def get(self, job_id: str) -> Job:
return await self._jobs_client.status(job_id)
return await self._jobs_client.get_job(job_id)

def get_jobs_for_log_removal(
self,
) -> AbstractAsyncContextManager[AsyncIterator[Job]]:
return self._jobs_client.list(
) -> AbstractAsyncContextManager[AsyncGenerator[Job, None]]:
return self._jobs_client.get_jobs(
cluster_name=self._cluster_name,
_being_dropped=True,
_logs_removed=False,
being_dropped=True,
logs_removed=False,
)

async def mark_logs_dropped(self, job_id: str) -> None:
url = self._jobs_client._config.api_url / "jobs" / job_id / "drop_progress"
payload = {"logs_removed": True}
auth = await self._jobs_client._config._api_auth()
async with self._jobs_client._core.request(
"POST", url, auth=auth, json=payload
):
pass
await self._jobs_client.mark_job_logs_dropped(job_id)

@asyncgeneratorcontextmanager
async def save(
Expand Down
179 changes: 179 additions & 0 deletions platform_monitoring/platform_api_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
import json
from collections.abc import AsyncGenerator
from contextlib import AbstractAsyncContextManager, aclosing
from dataclasses import dataclass
from enum import StrEnum, auto, unique
from types import TracebackType
from typing import Any

import aiohttp
from multidict import MultiDict
from yarl import URL


class ClientError(Exception):
pass


class IllegalArgumentError(ValueError):
pass


class ResourceNotFoundError(ValueError):
pass


@dataclass(frozen=True)
class Job:
@unique
class Status(StrEnum):
PENDING = auto()
SUSPENDED = auto()
RUNNING = auto()
SUCCEEDED = auto()
FAILED = auto()
CANCELLED = auto()
UNKNOWN = auto()

id: str
uri: URL
status: Status
name: str | None = None


def _create_job(payload: dict[str, Any]) -> Job:
return Job(
id=payload["id"],
status=_create_job_status(payload["history"].get("status", "unknown")),
uri=URL(payload["uri"]),
name=payload.get("name"),
)


def _create_job_status(value: str) -> Job.Status:
try:
return Job.Status(value)
except Exception:
return Job.Status.UNKNOWN


class ApiClient:
_client: aiohttp.ClientSession

def __init__(
self,
url: URL,
token: str | None = None,
timeout: aiohttp.ClientTimeout = aiohttp.client.DEFAULT_TIMEOUT,
trace_configs: list[aiohttp.TraceConfig] | None = None,
):
super().__init__()

self._base_url = url / "api/v1"
self._token = token
self._timeout = timeout
self._trace_configs = trace_configs

async def __aenter__(self) -> "ApiClient":
self._client = self._create_http_client()
return self

async def __aexit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
await self.aclose()

def _create_http_client(self) -> aiohttp.ClientSession:
return aiohttp.ClientSession(
headers=self._create_default_headers(),
timeout=self._timeout,
trace_configs=self._trace_configs,
)

async def aclose(self) -> None:
assert self._client
await self._client.close()

def _create_default_headers(self) -> dict[str, str]:
result = {}
if self._token:
result["Authorization"] = f"Bearer {self._token}"
return result

def get_jobs(
self,
*,
cluster_name: str,
being_dropped: bool | None = None,
logs_removed: bool | None = None,
) -> AbstractAsyncContextManager[AsyncGenerator[Job, None]]:
return aclosing(
self._get_jobs(
cluster_name=cluster_name,
being_dropped=being_dropped,
logs_removed=logs_removed,
)
)

async def _get_jobs(
self,
*,
cluster_name: str,
being_dropped: bool | None = None,
logs_removed: bool | None = None,
) -> AsyncGenerator[Job, None]:
headers = {"Accept": "application/x-ndjson"}
params: MultiDict[str] = MultiDict()
params["cluster_name"] = cluster_name
if being_dropped is not None:
params.add("being_dropped", str(being_dropped))
if logs_removed is not None:
params.add("logs_removed", str(logs_removed))
async with self._client.get(
self._base_url / "jobs", headers=headers, params=params
) as response:
await self._raise_for_status(response)
if response.headers.get("Content-Type", "").startswith(
"application/x-ndjson"
):
async for line in response.content:
payload = json.loads(line)
if "error" in payload:
raise Exception(payload["error"])
yield _create_job(payload)
else:
response_json = await response.json()
for j in response_json["jobs"]:
yield _create_job(j)

async def get_job(self, job_id: str) -> Job:
async with self._client.get(self._base_url / "jobs" / job_id) as response:
await self._raise_for_status(response)
response_json = await response.json()
return _create_job(response_json)

async def mark_job_logs_dropped(self, job_id: str) -> None:
async with self._client.post(
self._base_url / "jobs" / job_id / "drop_progress",
json={"logs_removed": True},
) as response:
await self._raise_for_status(response)

async def _raise_for_status(self, response: aiohttp.ClientResponse) -> None:
if response.ok:
return

text = await response.text()
if response.status == 404:
raise ResourceNotFoundError(text)
if 400 <= response.status < 500:
raise IllegalArgumentError(text)

try:
response.raise_for_status()
except aiohttp.ClientResponseError as exc:
msg = f"{str(exc)}, body={text!r}"
raise ClientError(msg) from exc
11 changes: 6 additions & 5 deletions platform_monitoring/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from typing import Any, TypeVar

import iso8601
from neuro_sdk import JobDescription as Job, JobStatus

from .platform_api_client import Job


T_co = TypeVar("T_co", covariant=True)
Expand All @@ -25,13 +26,13 @@ def wrapper(

class JobsHelper:
def is_job_running(self, job: Job) -> bool:
return job.status == JobStatus.RUNNING
return job.status == Job.Status.RUNNING

def is_job_finished(self, job: Job) -> bool:
return job.status in (
JobStatus.SUCCEEDED,
JobStatus.FAILED,
JobStatus.CANCELLED,
Job.Status.SUCCEEDED,
Job.Status.FAILED,
Job.Status.CANCELLED,
)


Expand Down
6 changes: 3 additions & 3 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ classifiers =
packages = find:
install_requires =
aiobotocore==2.16.0
aiohttp[speedups]==3.10.9
aiohttp[speedups]==3.11.3
apolo-sdk==24.12.3
cachetools==5.5.0
docker-image-py==0.1.13
elasticsearch<8.0.0
iso8601==2.1.0
neuro-auth-client==24.8.0
neuro-config-client==24.9.0
neuro-config-client==24.12.4
neuro-logging==24.4.0
neuro-sdk==22.7.1
orjson
trafaret==2.1.1
uvloop
Expand Down
Loading

0 comments on commit 8ecae26

Please sign in to comment.