Skip to content

Commit

Permalink
[fix] Counter (#35)
Browse files Browse the repository at this point in the history
  • Loading branch information
aquamatthias authored May 2, 2024
1 parent 7b48e68 commit b56386c
Showing 1 changed file with 7 additions and 7 deletions.
14 changes: 7 additions & 7 deletions collect_coordinator/job_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,17 +288,17 @@ async def __mark_future_done(self, job: RunningJob, error_message: Optional[str]
job.future.set_result(True)
else:
job.future.set_exception(RuntimeError("Job failed!"))
# increment prometheus counter
succ_str = "success" if success else "failed"
JobRuns.labels(coordinator_id=self.coordinator_id, image=job.definition.image, success=succ_str).inc()
if started_at := job.started_at:
JobCollectionTimes.labels(coordinator_id=self.coordinator_id).observe(perf_now() - started_at)
# emit a message
await self.__done_event(success, job.definition.id, error_message)
await self.__done_event(success, job.definition, error_message)

async def __done_event(self, success: bool, job_id: str, error: Optional[str] = None) -> None:
async def __done_event(self, success: bool, job: JobDefinition, error: Optional[str] = None) -> None:
kind = "job-finished" if success else "job-failed"
await self.collect_done_publisher.publish(kind, {"job_id": job_id, "error": error})
await self.collect_done_publisher.publish(kind, {"job_id": job.id, "error": error})
# increment prometheus counter
succ_str = "success" if success else "failed"
JobRuns.labels(coordinator_id=self.coordinator_id, image=job.image, success=succ_str).inc()

async def __reconcile(self) -> None:
res = await self.batch.list_namespaced_job(
Expand Down Expand Up @@ -327,7 +327,7 @@ async def __clean_done_jobs(self) -> None:
await self.__delete_job(ref)
self.running_jobs.pop(ref.name, None)
job_def = JobDefinition.from_job(item)
await self.__done_event(ref.status == JobStatus.succeeded, job_def.id)
await self.__done_event(ref.status == JobStatus.succeeded, job_def)

async def __watch_jobs_continuously(self) -> None:
while True:
Expand Down

0 comments on commit b56386c

Please sign in to comment.