From f860b28a3e07f6c85aab843113f06f3bbfeb6b06 Mon Sep 17 00:00:00 2001 From: Matthias Veit Date: Mon, 13 Nov 2023 09:21:06 +0100 Subject: [PATCH] [feat] Emit messages to the collect-done queue --- collect_coordinator/__main__.py | 17 +++++++--- collect_coordinator/job_coordinator.py | 42 +++++++++++++++++++----- requirements-dev.txt | 44 +++++++++++++------------- requirements.txt | 12 +++---- tests/worker_queue_test.py | 2 ++ 5 files changed, 76 insertions(+), 41 deletions(-) diff --git a/collect_coordinator/__main__.py b/collect_coordinator/__main__.py index 7f0d302..4b69f38 100644 --- a/collect_coordinator/__main__.py +++ b/collect_coordinator/__main__.py @@ -29,6 +29,7 @@ from arq.connections import RedisSettings from kubernetes_asyncio import config from kubernetes_asyncio.client import ApiClient +from redis.asyncio import Redis from collect_coordinator.api import Api from collect_coordinator.job_coordinator import KubernetesJobCoordinator @@ -73,17 +74,23 @@ async def load_kube_config() -> None: async def on_start() -> None: await load_kube_config() - arq_redis = await create_pool( - replace( - RedisSettings.from_dsn(deps.redis_worker_url), ssl_ca_certs=args.ca_cert, password=args.redis_password - ) + redis_args = dict(ssl_ca_certs=args.ca_cert) if args.redis_url_nodb.startswith("rediss") else {} + arq_redis_settings = replace( + RedisSettings.from_dsn(deps.redis_worker_url), password=args.redis_password, **redis_args ) + arq_redis = deps.add("arq_redis", await create_pool(arq_redis_settings)) + redis = deps.add( + "redis", + Redis.from_url(deps.redis_event_url, decode_responses=True, password=args.redis_password, **redis_args), + ) + api_client = deps.add("api_client", ApiClient(pool_threads=10)) coordinator = deps.add( "job_coordinator", KubernetesJobCoordinator( coordinator_id=hostname, - redis=arq_redis, + redis=redis, + arq_redis=arq_redis, api_client=api_client, namespace=args.namespace, max_parallel=args.max_parallel_jobs, diff --git a/collect_coordinator/job_coordinator.py b/collect_coordinator/job_coordinator.py index 17e258b..738cba0 100644 --- a/collect_coordinator/job_coordinator.py +++ b/collect_coordinator/job_coordinator.py @@ -23,6 +23,7 @@ from contextlib import suppress from enum import Enum from typing import Dict, Any, Optional, List, Tuple +from fixcloudutils.asyncio import stop_running_task from arq.connections import ArqRedis from attr import evolve @@ -31,6 +32,7 @@ from fixcloudutils.asyncio.timed import timed from fixcloudutils.service import Service from fixcloudutils.types import Json +from fixcloudutils.redis.event_stream import RedisStreamPublisher from kubernetes_asyncio import client as k8s from kubernetes_asyncio.client import ( V1Volume, @@ -51,6 +53,7 @@ from kubernetes_asyncio.client.api_client import ApiClient from kubernetes_asyncio.watch import Watch from prometheus_client import Counter +from redis.asyncio import Redis log = logging.getLogger("collect.coordinator") @@ -149,14 +152,15 @@ class KubernetesJobCoordinator(JobCoordinator): def __init__( self, coordinator_id: str, - redis: ArqRedis, + redis: Redis, + arq_redis: ArqRedis, api_client: ApiClient, namespace: str, max_parallel: int, env: Dict[str, str], ) -> None: self.coordinator_id = coordinator_id - self.redis = redis + self.arq_redis = arq_redis self.api_client = api_client self.batch = k8s.BatchV1Api(api_client) self.namespace = namespace @@ -167,6 +171,7 @@ def __init__( self.job_queue_lock = asyncio.Lock() self.job_queue: List[Tuple[JobDefinition, Future[bool]]] = [] self.watcher: Optional[asyncio.Task[Any]] = None + self.collect_done_publisher = RedisStreamPublisher(redis, "collect-events", "collect-coordinator") async def start_job(self, definition: JobDefinition) -> Future[bool]: async with self.job_queue_lock: @@ -264,8 +269,15 @@ async def __mark_future_done(self, job: RunningJob, error_message: Optional[str] job.future.set_result(True) else: job.future.set_exception(RuntimeError("Job failed!")) + # increment prometheus counter succ_str = "success" if success else "failed" JobRuns.labels(coordinator_id=self.coordinator_id, image=job.definition.image, success=succ_str).inc() + # emit a message + await self.__done_event(success, job.definition.id, error_message) + + async def __done_event(self, success: bool, job_id: str, error: Optional[str] = None) -> None: + kind = "job-finished" if success else "job-failed" + await self.collect_done_publisher.publish(kind, {"job_id": job_id, "error": error}) async def __reconcile(self) -> None: res = await self.batch.list_namespaced_job( @@ -277,6 +289,10 @@ async def __reconcile(self) -> None: ref = JobReference.from_job(item) running_jobs[ref.name] = RunningJob(definition=definition, ref=ref, future=Future()) async with self.running_jobs_lock: + # all jobs not in the running list are done + for k, v in self.running_jobs.items(): + if k not in running_jobs: + await self.__mark_future_done(v) self.running_jobs = running_jobs async def __clean_done_jobs(self) -> None: @@ -289,6 +305,16 @@ async def __clean_done_jobs(self) -> None: if ref.status.is_done(): await self.__delete_job(ref) self.running_jobs.pop(ref.name, None) + job_def = JobDefinition.from_job(item) + await self.__done_event(ref.status == JobStatus.succeeded, job_def.id) + + async def __watch_jobs_continuously(self) -> None: + while True: + try: + await self.__reconcile() + await self.__watch_jobs() + except Exception as ex: + log.exception(f"Error while watching jobs: {ex}") async def __watch_jobs(self) -> None: watch = Watch() @@ -298,6 +324,7 @@ async def __watch_jobs(self) -> None: label_selector=f"app=collect-coordinator,coordinator-id={self.coordinator_id}", ): try: + log.info(f"Job changed: {event}") change_type = event["type"] # ADDED, MODIFIED, DELETED job = event["object"] name = job.metadata.name @@ -336,16 +363,14 @@ async def __delete_job(self, ref: JobReference) -> None: await self.batch.delete_namespaced_job(name=ref.name, namespace=self.namespace, propagation_policy="Foreground") async def start(self) -> Any: + await self.collect_done_publisher.start() await self.__clean_done_jobs() await self.__reconcile() - self.watcher = asyncio.create_task(self.__watch_jobs()) + self.watcher = asyncio.create_task(self.__watch_jobs_continuously()) async def stop(self) -> None: - if self.watcher and not self.watcher.done(): - self.watcher.cancel() - with suppress(asyncio.CancelledError): - await self.watcher - self.watcher = None + await stop_running_task(self.watcher) + self.watcher = None for name, running_job in self.running_jobs.items(): # The process stops and the job is still running. # Choices: @@ -361,3 +386,4 @@ async def stop(self) -> None: # For simplicity, we choose c) for now. log.info(f"Tear down. Job is still running. Mark job {name} as failed.") await self.__mark_future_done(running_job, "Coordinator stopped. Job is still running.") + await self.collect_done_publisher.stop() diff --git a/requirements-dev.txt b/requirements-dev.txt index c950265..d386051 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,37 +1,37 @@ -aiohttp==3.8.5 +aiohttp==3.8.6 aiosignal==1.3.1 arq==0.25.0 -astroid==3.0.0 +astroid==3.0.1 async-timeout==4.0.3 attrs==23.1.0 bitmath==1.3.3.1 -black==23.9.1 +black==23.11.0 build==1.0.3 -cachetools==5.3.1 +cachetools==5.3.2 cattrs==23.1.2 certifi==2023.7.22 cffi==1.16.0 chardet==5.2.0 -charset-normalizer==3.3.0 +charset-normalizer==3.3.2 click==8.1.7 colorama==0.4.6 coverage[toml]==7.3.2 -cryptography==41.0.4 +cryptography==41.0.5 dill==0.3.7 distlib==0.3.7 -filelock==3.12.4 -fixcloudutils[prometheus,redis]==1.7.1 +filelock==3.13.1 +fixcloudutils[prometheus,redis]==1.10.0 flake8==6.1.0 frozenlist==1.4.0 hiredis==2.2.3 -hypothesis==6.87.1 +hypothesis==6.88.4 idna==3.4 iniconfig==2.0.0 isort==5.12.0 -kubernetes-asyncio==27.6.0 +kubernetes-asyncio==28.2.0 mccabe==0.7.0 multidict==6.0.4 -mypy==1.5.1 +mypy==1.7.0 mypy-extensions==1.0.0 packaging==23.2 pathspec==0.11.2 @@ -39,14 +39,14 @@ pep8-naming==0.13.3 pip-tools==7.3.0 platformdirs==3.11.0 pluggy==1.3.0 -prometheus-client==0.17.1 -pycodestyle==2.11.0 +prometheus-client==0.18.0 +pycodestyle==2.11.1 pycparser==2.21 pyflakes==3.1.0 -pylint==3.0.0 +pylint==3.0.2 pyproject-api==1.6.1 pyproject-hooks==1.0.0 -pytest==7.4.2 +pytest==7.4.3 pytest-asyncio==0.21.1 pytest-cov==4.1.0 python-dateutil==2.8.2 @@ -54,16 +54,16 @@ pyyaml==6.0.1 redis[hiredis]==5.0.1 six==1.16.0 sortedcontainers==2.4.0 -tomlkit==0.12.1 +tomlkit==0.12.2 tox==4.11.3 -types-pyopenssl==23.2.0.2 +types-pyopenssl==23.3.0.0 types-pytz==2023.3.1.1 -types-requests==2.31.0.7 -types-setuptools==68.2.0.0 +types-requests==2.31.0.10 +types-setuptools==68.2.0.1 typing-extensions==4.8.0 -urllib3==2.0.6 -virtualenv==20.24.5 -wheel==0.41.2 +urllib3==2.0.7 +virtualenv==20.24.6 +wheel==0.41.3 yarl==1.9.2 # The following packages are considered to be unsafe in a requirements file: diff --git a/requirements.txt b/requirements.txt index 815b652..7b1aa71 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -aiohttp==3.8.5 +aiohttp==3.8.6 aiosignal==1.3.1 arq==0.25.0 async-timeout==4.0.3 @@ -6,21 +6,21 @@ attrs==23.1.0 bitmath==1.3.3.1 cattrs==23.1.2 certifi==2023.7.22 -charset-normalizer==3.3.0 +charset-normalizer==3.3.2 click==8.1.7 -fixcloudutils==1.7.1 +fixcloudutils==1.10.0 frozenlist==1.4.0 hiredis==2.2.3 idna==3.4 -kubernetes-asyncio==27.6.0 +kubernetes-asyncio==28.2.0 multidict==6.0.4 -prometheus-client==0.17.1 +prometheus-client==0.18.0 python-dateutil==2.8.2 pyyaml==6.0.1 redis==5.0.1 six==1.16.0 typing-extensions==4.8.0 -urllib3==2.0.6 +urllib3==2.0.7 yarl==1.9.2 # The following packages are considered to be unsafe in a requirements file: diff --git a/tests/worker_queue_test.py b/tests/worker_queue_test.py index 9fc8854..f547bb2 100644 --- a/tests/worker_queue_test.py +++ b/tests/worker_queue_test.py @@ -73,6 +73,8 @@ def test_read_job_definition(worker_queue: WorkerQueue, example_definition: Json "/etc/ssl/certs/ca.crt", "--write", ".aws/credentials=AWS_CREDENTIALS", + "--account-id", + "123456789012", "---", "--graphdb-bootstrap-do-not-secure", "--graphdb-server",