Skip to content

Commit

Permalink
[feat] Emit messages to the collect-done queue
Browse files Browse the repository at this point in the history
  • Loading branch information
aquamatthias committed Nov 13, 2023
1 parent b1fdf96 commit f860b28
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 41 deletions.
17 changes: 12 additions & 5 deletions collect_coordinator/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from arq.connections import RedisSettings
from kubernetes_asyncio import config
from kubernetes_asyncio.client import ApiClient
from redis.asyncio import Redis

from collect_coordinator.api import Api
from collect_coordinator.job_coordinator import KubernetesJobCoordinator
Expand Down Expand Up @@ -73,17 +74,23 @@ async def load_kube_config() -> None:

async def on_start() -> None:
await load_kube_config()
arq_redis = await create_pool(
replace(
RedisSettings.from_dsn(deps.redis_worker_url), ssl_ca_certs=args.ca_cert, password=args.redis_password
)
redis_args = dict(ssl_ca_certs=args.ca_cert) if args.redis_url_nodb.startswith("rediss") else {}
arq_redis_settings = replace(
RedisSettings.from_dsn(deps.redis_worker_url), password=args.redis_password, **redis_args
)
arq_redis = deps.add("arq_redis", await create_pool(arq_redis_settings))
redis = deps.add(
"redis",
Redis.from_url(deps.redis_event_url, decode_responses=True, password=args.redis_password, **redis_args),
)

api_client = deps.add("api_client", ApiClient(pool_threads=10))
coordinator = deps.add(
"job_coordinator",
KubernetesJobCoordinator(
coordinator_id=hostname,
redis=arq_redis,
redis=redis,
arq_redis=arq_redis,
api_client=api_client,
namespace=args.namespace,
max_parallel=args.max_parallel_jobs,
Expand Down
42 changes: 34 additions & 8 deletions collect_coordinator/job_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from contextlib import suppress
from enum import Enum
from typing import Dict, Any, Optional, List, Tuple
from fixcloudutils.asyncio import stop_running_task

from arq.connections import ArqRedis
from attr import evolve
Expand All @@ -31,6 +32,7 @@
from fixcloudutils.asyncio.timed import timed
from fixcloudutils.service import Service
from fixcloudutils.types import Json
from fixcloudutils.redis.event_stream import RedisStreamPublisher
from kubernetes_asyncio import client as k8s
from kubernetes_asyncio.client import (
V1Volume,
Expand All @@ -51,6 +53,7 @@
from kubernetes_asyncio.client.api_client import ApiClient
from kubernetes_asyncio.watch import Watch
from prometheus_client import Counter
from redis.asyncio import Redis

log = logging.getLogger("collect.coordinator")

Expand Down Expand Up @@ -149,14 +152,15 @@ class KubernetesJobCoordinator(JobCoordinator):
def __init__(
self,
coordinator_id: str,
redis: ArqRedis,
redis: Redis,
arq_redis: ArqRedis,
api_client: ApiClient,
namespace: str,
max_parallel: int,
env: Dict[str, str],
) -> None:
self.coordinator_id = coordinator_id
self.redis = redis
self.arq_redis = arq_redis
self.api_client = api_client
self.batch = k8s.BatchV1Api(api_client)
self.namespace = namespace
Expand All @@ -167,6 +171,7 @@ def __init__(
self.job_queue_lock = asyncio.Lock()
self.job_queue: List[Tuple[JobDefinition, Future[bool]]] = []
self.watcher: Optional[asyncio.Task[Any]] = None
self.collect_done_publisher = RedisStreamPublisher(redis, "collect-events", "collect-coordinator")

async def start_job(self, definition: JobDefinition) -> Future[bool]:
async with self.job_queue_lock:
Expand Down Expand Up @@ -264,8 +269,15 @@ async def __mark_future_done(self, job: RunningJob, error_message: Optional[str]
job.future.set_result(True)
else:
job.future.set_exception(RuntimeError("Job failed!"))
# increment prometheus counter
succ_str = "success" if success else "failed"
JobRuns.labels(coordinator_id=self.coordinator_id, image=job.definition.image, success=succ_str).inc()
# emit a message
await self.__done_event(success, job.definition.id, error_message)

async def __done_event(self, success: bool, job_id: str, error: Optional[str] = None) -> None:
kind = "job-finished" if success else "job-failed"
await self.collect_done_publisher.publish(kind, {"job_id": job_id, "error": error})

async def __reconcile(self) -> None:
res = await self.batch.list_namespaced_job(
Expand All @@ -277,6 +289,10 @@ async def __reconcile(self) -> None:
ref = JobReference.from_job(item)
running_jobs[ref.name] = RunningJob(definition=definition, ref=ref, future=Future())
async with self.running_jobs_lock:
# all jobs not in the running list are done
for k, v in self.running_jobs.items():
if k not in running_jobs:
await self.__mark_future_done(v)
self.running_jobs = running_jobs

async def __clean_done_jobs(self) -> None:
Expand All @@ -289,6 +305,16 @@ async def __clean_done_jobs(self) -> None:
if ref.status.is_done():
await self.__delete_job(ref)
self.running_jobs.pop(ref.name, None)
job_def = JobDefinition.from_job(item)
await self.__done_event(ref.status == JobStatus.succeeded, job_def.id)

async def __watch_jobs_continuously(self) -> None:
while True:
try:
await self.__reconcile()
await self.__watch_jobs()
except Exception as ex:
log.exception(f"Error while watching jobs: {ex}")

async def __watch_jobs(self) -> None:
watch = Watch()
Expand All @@ -298,6 +324,7 @@ async def __watch_jobs(self) -> None:
label_selector=f"app=collect-coordinator,coordinator-id={self.coordinator_id}",
):
try:
log.info(f"Job changed: {event}")
change_type = event["type"] # ADDED, MODIFIED, DELETED
job = event["object"]
name = job.metadata.name
Expand Down Expand Up @@ -336,16 +363,14 @@ async def __delete_job(self, ref: JobReference) -> None:
await self.batch.delete_namespaced_job(name=ref.name, namespace=self.namespace, propagation_policy="Foreground")

async def start(self) -> Any:
await self.collect_done_publisher.start()
await self.__clean_done_jobs()
await self.__reconcile()
self.watcher = asyncio.create_task(self.__watch_jobs())
self.watcher = asyncio.create_task(self.__watch_jobs_continuously())

async def stop(self) -> None:
if self.watcher and not self.watcher.done():
self.watcher.cancel()
with suppress(asyncio.CancelledError):
await self.watcher
self.watcher = None
await stop_running_task(self.watcher)
self.watcher = None
for name, running_job in self.running_jobs.items():
# The process stops and the job is still running.
# Choices:
Expand All @@ -361,3 +386,4 @@ async def stop(self) -> None:
# For simplicity, we choose c) for now.
log.info(f"Tear down. Job is still running. Mark job {name} as failed.")
await self.__mark_future_done(running_job, "Coordinator stopped. Job is still running.")
await self.collect_done_publisher.stop()
44 changes: 22 additions & 22 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,69 +1,69 @@
aiohttp==3.8.5
aiohttp==3.8.6
aiosignal==1.3.1
arq==0.25.0
astroid==3.0.0
astroid==3.0.1
async-timeout==4.0.3
attrs==23.1.0
bitmath==1.3.3.1
black==23.9.1
black==23.11.0
build==1.0.3
cachetools==5.3.1
cachetools==5.3.2
cattrs==23.1.2
certifi==2023.7.22
cffi==1.16.0
chardet==5.2.0
charset-normalizer==3.3.0
charset-normalizer==3.3.2
click==8.1.7
colorama==0.4.6
coverage[toml]==7.3.2
cryptography==41.0.4
cryptography==41.0.5
dill==0.3.7
distlib==0.3.7
filelock==3.12.4
fixcloudutils[prometheus,redis]==1.7.1
filelock==3.13.1
fixcloudutils[prometheus,redis]==1.10.0
flake8==6.1.0
frozenlist==1.4.0
hiredis==2.2.3
hypothesis==6.87.1
hypothesis==6.88.4
idna==3.4
iniconfig==2.0.0
isort==5.12.0
kubernetes-asyncio==27.6.0
kubernetes-asyncio==28.2.0
mccabe==0.7.0
multidict==6.0.4
mypy==1.5.1
mypy==1.7.0
mypy-extensions==1.0.0
packaging==23.2
pathspec==0.11.2
pep8-naming==0.13.3
pip-tools==7.3.0
platformdirs==3.11.0
pluggy==1.3.0
prometheus-client==0.17.1
pycodestyle==2.11.0
prometheus-client==0.18.0
pycodestyle==2.11.1
pycparser==2.21
pyflakes==3.1.0
pylint==3.0.0
pylint==3.0.2
pyproject-api==1.6.1
pyproject-hooks==1.0.0
pytest==7.4.2
pytest==7.4.3
pytest-asyncio==0.21.1
pytest-cov==4.1.0
python-dateutil==2.8.2
pyyaml==6.0.1
redis[hiredis]==5.0.1
six==1.16.0
sortedcontainers==2.4.0
tomlkit==0.12.1
tomlkit==0.12.2
tox==4.11.3
types-pyopenssl==23.2.0.2
types-pyopenssl==23.3.0.0
types-pytz==2023.3.1.1
types-requests==2.31.0.7
types-setuptools==68.2.0.0
types-requests==2.31.0.10
types-setuptools==68.2.0.1
typing-extensions==4.8.0
urllib3==2.0.6
virtualenv==20.24.5
wheel==0.41.2
urllib3==2.0.7
virtualenv==20.24.6
wheel==0.41.3
yarl==1.9.2

# The following packages are considered to be unsafe in a requirements file:
Expand Down
12 changes: 6 additions & 6 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,26 +1,26 @@
aiohttp==3.8.5
aiohttp==3.8.6
aiosignal==1.3.1
arq==0.25.0
async-timeout==4.0.3
attrs==23.1.0
bitmath==1.3.3.1
cattrs==23.1.2
certifi==2023.7.22
charset-normalizer==3.3.0
charset-normalizer==3.3.2
click==8.1.7
fixcloudutils==1.7.1
fixcloudutils==1.10.0
frozenlist==1.4.0
hiredis==2.2.3
idna==3.4
kubernetes-asyncio==27.6.0
kubernetes-asyncio==28.2.0
multidict==6.0.4
prometheus-client==0.17.1
prometheus-client==0.18.0
python-dateutil==2.8.2
pyyaml==6.0.1
redis==5.0.1
six==1.16.0
typing-extensions==4.8.0
urllib3==2.0.6
urllib3==2.0.7
yarl==1.9.2

# The following packages are considered to be unsafe in a requirements file:
Expand Down
2 changes: 2 additions & 0 deletions tests/worker_queue_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ def test_read_job_definition(worker_queue: WorkerQueue, example_definition: Json
"/etc/ssl/certs/ca.crt",
"--write",
".aws/credentials=AWS_CREDENTIALS",
"--account-id",
"123456789012",
"---",
"--graphdb-bootstrap-do-not-secure",
"--graphdb-server",
Expand Down

0 comments on commit f860b28

Please sign in to comment.