diff --git a/backend/src/predicTCR_server/app.py b/backend/src/predicTCR_server/app.py
index 07df9d4..0822a1e 100644
--- a/backend/src/predicTCR_server/app.py
+++ b/backend/src/predicTCR_server/app.py
@@ -13,10 +13,13 @@
from flask_jwt_extended import JWTManager
from flask_cors import cross_origin
from predicTCR_server.logger import get_logger
+from predicTCR_server.utils import timestamp_now
from predicTCR_server.model import (
db,
Sample,
User,
+ Job,
+ Status,
Settings,
add_new_user,
add_new_runner_user,
@@ -281,6 +284,16 @@ def admin_users():
)
return jsonify(users=[user.as_dict() for user in users])
+ @app.route("/api/admin/jobs", methods=["GET"])
+ @jwt_required()
+ def admin_jobs():
+ if not current_user.is_admin:
+ return jsonify(message="Admin account required"), 400
+ jobs = (
+ db.session.execute(db.select(Job).order_by(db.desc(Job.id))).scalars().all()
+ )
+ return jsonify(jobs=[job.as_dict() for job in jobs])
+
@app.route("/api/admin/runner_token", methods=["GET"])
@jwt_required()
def admin_runner_token():
@@ -305,7 +318,17 @@ def runner_request_job():
sample_id = request_job()
if sample_id is None:
return jsonify(message="No job available"), 204
- return {"sample_id": sample_id}
+ new_job = Job(
+ id=None,
+ sample_id=sample_id,
+ timestamp_start=timestamp_now(),
+ timestamp_end=0,
+ status=Status.RUNNING,
+ error_message="",
+ )
+ db.session.add(new_job)
+ db.session.commit()
+ return {"job_id": new_job.id, "sample_id": sample_id}
@app.route("/api/runner/result", methods=["POST"])
@cross_origin()
@@ -317,6 +340,9 @@ def runner_result():
sample_id = form_as_dict.get("sample_id", None)
if sample_id is None:
return jsonify(message="Missing key: sample_id"), 400
+ job_id = form_as_dict.get("job_id", None)
+ if job_id is None:
+ return jsonify(message="Missing key: job_id"), 400
success = form_as_dict.get("success", None)
if success is None or success.lower() not in ["true", "false"]:
logger.info(" -> missing success key")
@@ -328,12 +354,14 @@ def runner_result():
return jsonify(message="Result has success=True but no file"), 400
runner_hostname = form_as_dict.get("runner_hostname", "")
logger.info(
- f"Result upload for '{sample_id}' from runner {current_user.email} / {runner_hostname}"
+ f"Job '{job_id}' uploaded result for '{sample_id}' from runner {current_user.email} / {runner_hostname}"
)
- error_message = form_as_dict.get("error_message", None)
- if error_message is not None:
+ error_message = form_as_dict.get("error_message", "")
+ if error_message != "":
logger.info(f" -> error message: {error_message}")
- message, code = process_result(sample_id, success, zipfile)
+ message, code = process_result(
+ int(job_id), int(sample_id), success, error_message, zipfile
+ )
return jsonify(message=message), code
with app.app_context():
@@ -341,6 +369,7 @@ def runner_result():
if db.session.get(Settings, 1) is None:
db.session.add(
Settings(
+ id=None,
default_personal_submission_quota=10,
default_personal_submission_interval_mins=30,
global_quota=1000,
diff --git a/backend/src/predicTCR_server/model.py b/backend/src/predicTCR_server/model.py
index 827d44c..258868c 100644
--- a/backend/src/predicTCR_server/model.py
+++ b/backend/src/predicTCR_server/model.py
@@ -2,12 +2,14 @@
import re
import flask
-from enum import Enum
+import enum
import argon2
import pathlib
from flask_sqlalchemy import SQLAlchemy
+from sqlalchemy.orm import DeclarativeBase, MappedAsDataclass, Mapped, mapped_column
from werkzeug.datastructures import FileStorage
from sqlalchemy.inspection import inspect
+from sqlalchemy import Integer, String, Boolean, Enum
from dataclasses import dataclass
from predicTCR_server.email import send_email
from predicTCR_server.settings import predicTCR_url
@@ -20,12 +22,17 @@
decode_password_reset_token,
)
-db = SQLAlchemy()
+
+class Base(DeclarativeBase, MappedAsDataclass):
+ pass
+
+
+db = SQLAlchemy(model_class=Base)
ph = argon2.PasswordHasher()
logger = get_logger()
-class Status(str, Enum):
+class Status(str, enum.Enum):
QUEUED = "queued"
RUNNING = "running"
COMPLETED = "completed"
@@ -34,34 +41,45 @@ class Status(str, Enum):
@dataclass
class Settings(db.Model):
- id: int = db.Column(db.Integer, primary_key=True)
- default_personal_submission_quota: int = db.Column(db.Integer, nullable=False)
- default_personal_submission_interval_mins: int = db.Column(
- db.Integer, nullable=False
+ id: Mapped[int] = mapped_column(Integer, primary_key=True)
+ default_personal_submission_quota: Mapped[int] = mapped_column(
+ Integer, nullable=False
+ )
+ default_personal_submission_interval_mins: Mapped[int] = mapped_column(
+ Integer, nullable=False
)
- global_quota: int = db.Column(db.Integer, nullable=False)
- tumor_types: str = db.Column(db.String, nullable=False)
- sources: str = db.Column(db.String, nullable=False)
- csv_required_columns: str = db.Column(db.String, nullable=False)
+ global_quota: Mapped[int] = mapped_column(Integer, nullable=False)
+ tumor_types: Mapped[str] = mapped_column(String, nullable=False)
+ sources: Mapped[str] = mapped_column(String, nullable=False)
+ csv_required_columns: Mapped[str] = mapped_column(String, nullable=False)
def as_dict(self):
- return {
- c: getattr(self, c)
- for c in inspect(self).attrs.keys()
- if c != "password_hash"
- }
+ return {c: getattr(self, c) for c in inspect(self).attrs.keys()}
+
+
+@dataclass
+class Job(db.Model):
+ id: Mapped[int] = mapped_column(Integer, primary_key=True)
+ sample_id: Mapped[int] = mapped_column(Integer, nullable=False)
+ timestamp_start: Mapped[int] = mapped_column(Integer, nullable=False)
+ timestamp_end: Mapped[int] = mapped_column(Integer, nullable=False)
+ status: Mapped[Status] = mapped_column(Enum(Status), nullable=False)
+ error_message: Mapped[str] = mapped_column(String, nullable=False)
+
+ def as_dict(self):
+ return {c: getattr(self, c) for c in inspect(self).attrs.keys()}
@dataclass
class Sample(db.Model):
- id: int = db.Column(db.Integer, primary_key=True)
- email: str = db.Column(db.String(256), nullable=False)
- name: str = db.Column(db.String(128), nullable=False)
- tumor_type: str = db.Column(db.String(128), nullable=False)
- source: str = db.Column(db.String(128), nullable=False)
- timestamp: int = db.Column(db.Integer, nullable=False)
- status: Status = db.Column(db.Enum(Status), nullable=False)
- has_results_zip: bool = db.Column(db.Boolean, nullable=False)
+ id: Mapped[int] = mapped_column(Integer, primary_key=True)
+ email: Mapped[str] = mapped_column(String(256), nullable=False)
+ name: Mapped[str] = mapped_column(String(128), nullable=False)
+ tumor_type: Mapped[str] = mapped_column(String(128), nullable=False)
+ source: Mapped[str] = mapped_column(String(128), nullable=False)
+ timestamp: Mapped[int] = mapped_column(Integer, nullable=False)
+ status: Mapped[Status] = mapped_column(Enum(Status), nullable=False)
+ has_results_zip: Mapped[bool] = mapped_column(Boolean, nullable=False)
def _base_path(self) -> pathlib.Path:
data_path = flask.current_app.config["PREDICTCR_DATA_PATH"]
@@ -79,17 +97,17 @@ def result_file_path(self) -> pathlib.Path:
@dataclass
class User(db.Model):
- id: int = db.Column(db.Integer, primary_key=True)
- email: str = db.Column(db.Text, nullable=False, unique=True)
- password_hash: str = db.Column(db.Text, nullable=False)
- activated: bool = db.Column(db.Boolean, nullable=False)
- enabled: bool = db.Column(db.Boolean, nullable=False)
- quota: int = db.Column(db.Integer, nullable=False)
- submission_interval_minutes: int = db.Column(db.Integer, nullable=False)
- last_submission_timestamp: int = db.Column(db.Integer, nullable=False)
- is_admin: bool = db.Column(db.Boolean, nullable=False)
- is_runner: bool = db.Column(db.Boolean, nullable=False)
- full_results: bool = db.Column(db.Boolean, nullable=False)
+ id: int = mapped_column(Integer, primary_key=True)
+ email: str = mapped_column(String, nullable=False, unique=True)
+ password_hash: str = mapped_column(String, nullable=False)
+ activated: bool = mapped_column(Boolean, nullable=False)
+ enabled: bool = mapped_column(Boolean, nullable=False)
+ quota: int = mapped_column(Integer, nullable=False)
+ submission_interval_minutes: int = mapped_column(Integer, nullable=False)
+ last_submission_timestamp: int = mapped_column(Integer, nullable=False)
+ is_admin: bool = mapped_column(Boolean, nullable=False)
+ is_runner: bool = mapped_column(Boolean, nullable=False)
+ full_results: bool = mapped_column(Boolean, nullable=False)
def set_password_nocheck(self, new_password: str):
self.password_hash = ph.hash(new_password)
@@ -145,17 +163,26 @@ def request_job() -> int | None:
def process_result(
- sample_id: str, success: bool, result_zip_file: FileStorage | None
+ job_id: int,
+ sample_id: int,
+ success: bool,
+ error_message: str,
+ result_zip_file: FileStorage | None,
) -> tuple[str, int]:
- sample = db.session.execute(
- db.select(Sample).filter_by(id=sample_id)
- ).scalar_one_or_none()
+ sample = db.session.get(Sample, sample_id)
if sample is None:
logger.warning(f" --> Unknown sample id {sample_id}")
return f"Unknown sample id {sample_id}", 400
+ job = db.session.get(Job, job_id)
+ if job is None:
+ logger.warning(f" --> Unknown job id {job_id}")
+ return f"Unknown job id {job_id}", 400
+ job.timestamp_end = timestamp_now()
if success is False:
sample.has_results_zip = False
sample.status = Status.FAILED
+ job.status = Status.FAILED
+ job.error_message = error_message
db.session.commit()
return "Result processed", 200
if result_zip_file is None:
@@ -165,6 +192,7 @@ def process_result(
result_zip_file.save(sample.result_file_path())
sample.has_results_zip = True
sample.status = Status.COMPLETED
+ job.status = Status.COMPLETED
db.session.commit()
return "Result processed", 200
@@ -244,6 +272,7 @@ def add_new_user(email: str, password: str, is_admin: bool) -> tuple[str, int]:
try:
db.session.add(
User(
+ id=None,
email=email,
password_hash=ph.hash(password),
activated=False,
@@ -282,6 +311,7 @@ def add_new_runner_user() -> User | None:
runner_name = f"runner{runner_number}"
db.session.add(
User(
+ id=None,
email=runner_name,
password_hash="",
activated=False,
@@ -419,6 +449,7 @@ def add_new_sample(
settings = db.session.get(Settings, 1)
settings.global_quota -= 1
new_sample = Sample(
+ id=None,
email=email,
name=name,
tumor_type=tumor_type,
diff --git a/backend/tests/helpers/flask_test_utils.py b/backend/tests/helpers/flask_test_utils.py
index 405cb5d..a79e1b9 100644
--- a/backend/tests/helpers/flask_test_utils.py
+++ b/backend/tests/helpers/flask_test_utils.py
@@ -14,6 +14,7 @@ def add_test_users(app):
email = f"{name}@abc.xy"
db.session.add(
User(
+ id=None,
email=email,
password_hash=ph.hash(name),
activated=True,
@@ -46,6 +47,7 @@ def add_test_samples(app, data_path: pathlib.Path):
with open(f"{ref_dir}/input.{input_file_type}", "w") as f:
f.write(input_file_type)
new_sample = Sample(
+ id=None,
email="user@abc.xy",
name=name,
tumor_type=f"tumor_type{sample_id}",
diff --git a/backend/tests/test_app.py b/backend/tests/test_app.py
index dfddbfd..bc6edc9 100644
--- a/backend/tests/test_app.py
+++ b/backend/tests/test_app.py
@@ -210,12 +210,13 @@ def test_result_invalid(client):
assert "No results available" in response.json["message"]
-def _upload_result(client, result_zipfile: pathlib.Path, sample_id: int):
+def _upload_result(client, result_zipfile: pathlib.Path, job_id: int, sample_id: int):
headers = _get_auth_headers(client, "runner@abc.xy", "runner")
with open(result_zipfile, "rb") as f:
response = client.post(
"/api/runner/result",
data={
+ "job_id": job_id,
"sample_id": sample_id,
"success": True,
"file": (io.BytesIO(f.read()), result_zipfile.name),
@@ -225,19 +226,57 @@ def _upload_result(client, result_zipfile: pathlib.Path, sample_id: int):
return response
-def test_result_valid(client, result_zipfile):
- headers = _get_auth_headers(client, "user@abc.xy", "user")
- sample_id = 1
- assert _upload_result(client, result_zipfile, sample_id).status_code == 200
+def test_runner_valid_success(client, result_zipfile):
+ headers = _get_auth_headers(client, "runner@abc.xy", "runner")
+ # request job
+ request_job_response = client.post(
+ "/api/runner/request_job",
+ json={"runner_hostname": "me"},
+ headers=headers,
+ )
+ assert request_job_response.status_code == 200
+ assert request_job_response.json == {"sample_id": 1, "job_id": 1}
+ # upload successful result
+ assert _upload_result(client, result_zipfile, 1, 1).status_code == 200
response = client.post(
"/api/result",
- json={"sample_id": sample_id},
- headers=headers,
+ json={"sample_id": 1},
+ headers=_get_auth_headers(client, "user@abc.xy", "user"),
)
assert response.status_code == 200
assert len(response.data) > 1
+def test_runner_valid_failure(client, result_zipfile):
+ headers = _get_auth_headers(client, "runner@abc.xy", "runner")
+ # request job
+ request_job_response = client.post(
+ "/api/runner/request_job",
+ json={"runner_hostname": "me"},
+ headers=headers,
+ )
+ assert request_job_response.status_code == 200
+ assert request_job_response.json == {"sample_id": 1, "job_id": 1}
+ # upload failure result
+ result_response = client.post(
+ "/api/runner/result",
+ data={
+ "job_id": 1,
+ "sample_id": 1,
+ "success": False,
+ "error_message": "Something went wrong",
+ },
+ headers=headers,
+ )
+ assert result_response.status_code == 200
+ response = client.post(
+ "/api/result",
+ json={"sample_id": 1},
+ headers=_get_auth_headers(client, "user@abc.xy", "user"),
+ )
+ assert response.status_code == 400
+
+
def test_admin_samples_valid(client):
headers = _get_auth_headers(client, "admin@abc.xy", "admin")
response = client.get("/api/admin/samples", headers=headers)
@@ -288,12 +327,6 @@ def test_admin_users_valid(client):
assert "users" in response.json
-def test_runner_result_valid(client, result_zipfile):
- response = _upload_result(client, result_zipfile, 1)
- assert response.status_code == 200
- assert "result processed" in response.json["message"].lower()
-
-
def test_admin_update_user_valid(client):
headers = _get_auth_headers(client, "admin@abc.xy", "admin")
user = client.get("/api/admin/users", headers=headers).json["users"][0]
diff --git a/frontend/src/components/JobsTable.vue b/frontend/src/components/JobsTable.vue
new file mode 100644
index 0000000..063fea7
--- /dev/null
+++ b/frontend/src/components/JobsTable.vue
@@ -0,0 +1,62 @@
+
+
+
+
+
+ Id
+ SampleId
+ Start
+ Runtime
+ Status
+ Error message
+
+
+
+ {{ job.id }}
+ {{ job.sample_id }}
+ {{
+ new Date(job.timestamp_start * 1000).toISOString()
+ }}
+ {{ (job.timestamp_end - job.timestamp_start) / 60 }}m
+ {{ job.status }}
+ {{ job.error_message }}
+
+
+
+
diff --git a/frontend/src/utils/types.ts b/frontend/src/utils/types.ts
index 6dab111..9a61ff2 100644
--- a/frontend/src/utils/types.ts
+++ b/frontend/src/utils/types.ts
@@ -31,3 +31,12 @@ export type Settings = {
sources: string;
csv_required_columns: string;
};
+
+export type Job = {
+ id: number;
+ sample_id: number;
+ timestamp_start: number;
+ timestamp_end: number;
+ status: string;
+ error_message: string;
+};
diff --git a/frontend/src/views/AdminView.vue b/frontend/src/views/AdminView.vue
index 8ae35c4..57455e2 100644
--- a/frontend/src/views/AdminView.vue
+++ b/frontend/src/views/AdminView.vue
@@ -3,6 +3,7 @@ import SamplesTable from "@/components/SamplesTable.vue";
import SettingsTable from "@/components/SettingsTable.vue";
import UsersTable from "@/components/UsersTable.vue";
import ListComponent from "@/components/ListComponent.vue";
+import JobsTable from "@/components/JobsTable.vue";
import ListItem from "@/components/ListItem.vue";
import { FwbButton } from "flowbite-vue";
import { ref } from "vue";
@@ -72,6 +73,9 @@ get_samples();
+
+
+
diff --git a/runner/docker-compose.yml b/runner/docker-compose.yml
index 2a8778b..1d3f060 100644
--- a/runner/docker-compose.yml
+++ b/runner/docker-compose.yml
@@ -10,9 +10,11 @@ services:
deploy:
mode: replicated
replicas: ${PREDICTCR_RUNNER_JOBS:-1}
+ restart: always
networks:
- predictcr-network
networks:
predictcr-network:
name: predictcr
+ external: true
diff --git a/runner/src/predicTCR_runner/runner.py b/runner/src/predicTCR_runner/runner.py
index 806b6f7..3bebc26 100644
--- a/runner/src/predicTCR_runner/runner.py
+++ b/runner/src/predicTCR_runner/runner.py
@@ -16,9 +16,13 @@ def __init__(self, api_url: str, jwt_token: str, poll_interval: int = 5):
self.poll_interval = poll_interval
self.runner_hostname = os.environ.get("HOSTNAME", "unknown")
self.logger = logging.getLogger(__name__)
+ self.job_id: int | None = None
+ self.sample_id: int | None = None
- def _request_job(self) -> int | None:
+ def _request_job(self) -> bool:
self.logger.debug(f"Requesting job from {self.api_url}...")
+ self.job_id = None
+ self.sample_id = None
response = requests.post(
url=f"{self.api_url}/runner/request_job",
json={"runner_hostname": self.runner_hostname},
@@ -27,23 +31,28 @@ def _request_job(self) -> int | None:
)
if response.status_code == 204:
self.logger.debug(" -> no job available.")
- return None
+ return False
elif response.status_code == 200:
- sample_id = response.json().get("sample_id", None)
- self.logger.debug(f" -> sample id {sample_id} available.")
- return sample_id
+ self.job_id = response.json().get("job_id", None)
+ self.sample_id = response.json().get("sample_id", None)
+ self.logger.debug(
+ f" -> job id {self.job_id} for sample id {self.sample_id}."
+ )
+ if self.job_id is not None and self.sample_id is not None:
+ return True
else:
self.logger.error(
f"request_job failed with {response.status_code}: {response.content}"
)
- return None
+ return False
- def _report_job_failed(self, sample_id: int, message: str):
- self.logger.info(f"...job failed for sample id {sample_id}.")
+ def _report_job_failed(self, message: str):
+ self.logger.info(f"...job {self.job_id} failed for sample id {self.sample_id}.")
response = requests.post(
url=f"{self.api_url}/runner/result",
data={
- "sample_id": sample_id,
+ "job_id": self.job_id,
+ "sample_id": self.sample_id,
"runner_id": self.runner_hostname,
"success": "false",
"error_message": message,
@@ -54,16 +63,17 @@ def _report_job_failed(self, sample_id: int, message: str):
if response.status_code != 200:
self.logger.error(f"result with {response.status_code}: {response.content}")
- def _upload_result(self, sample_id: int, result_file: str):
+ def _upload_result(self, result_file: str):
self.logger.info(
- f"...job finished for sample id {sample_id}, uploading {result_file}..."
+ f"...job {self.job_id} finished for sample id {self.sample_id}, uploading {result_file}..."
)
with open(result_file) as result_file:
response = requests.post(
url=f"{self.api_url}/runner/result",
files={"file": result_file},
data={
- "sample_id": sample_id,
+ "job_id": self.job_id,
+ "sample_id": self.sample_id,
"runner_hostname": self.runner_hostname,
"success": True,
},
@@ -73,14 +83,16 @@ def _upload_result(self, sample_id: int, result_file: str):
if response.status_code != 200:
self.logger.error(f"Failed to upload result: {response.content}")
- def _run_job(self, sample_id: int):
- self.logger.info(f"Starting job for sample id {sample_id}...")
+ def _run_job(self):
+ self.logger.info(
+ f"Starting job {self.job_id} for sample id {self.sample_id}..."
+ )
self.logger.debug("Downloading input files...")
with tempfile.TemporaryDirectory(delete=False) as tmpdir:
for input_file_type in ["h5", "csv"]:
response = requests.post(
url=f"{self.api_url}/input_{input_file_type}_file",
- json={"sample_id": sample_id},
+ json={"sample_id": self.sample_id},
headers=self.auth_header,
timeout=30,
)
@@ -89,7 +101,6 @@ def _run_job(self, sample_id: int):
f"Failed to download {input_file_type}: {response.content}"
)
return self._report_job_failed(
- sample_id,
f"Failed to download {input_file_type} on {self.runner_hostname}",
)
input_file_name = f"input.{input_file_type}"
@@ -104,20 +115,20 @@ def _run_job(self, sample_id: int):
self.logger.debug(f" - running {tmpdir}/scripts.sh...")
subprocess.run(["sh", "./script.sh"], cwd=tmpdir, check=True)
self.logger.debug(f" ...{tmpdir}/script.sh finished.")
- self._upload_result(sample_id, f"{tmpdir}/result.zip")
+ self._upload_result(f"{tmpdir}/result.zip")
except Exception as e:
self.logger.exception(e)
- self.logger.error(f"Failed to run job for sample {sample_id}: {e}")
+ self.logger.error(
+ f"Failed to run job {self.job_id} for sample {self.sample_id}: {e}"
+ )
return self._report_job_failed(
- sample_id,
f"Error during job execution on {self.runner_hostname}: {e}",
)
def start(self):
self.logger.info(f"Polling {self.api_url} for jobs...")
while True:
- job_id = self._request_job()
- if job_id is not None:
- self._run_job(job_id)
+ if self._request_job():
+ self._run_job()
else:
time.sleep(self.poll_interval)
diff --git a/runner/tests/test_runner.py b/runner/tests/test_runner.py
index f044eaa..7e7ddb8 100644
--- a/runner/tests/test_runner.py
+++ b/runner/tests/test_runner.py
@@ -4,6 +4,8 @@
def test_runner_request_job(requests_mock):
requests_mock.post("http://api/runner/request_job", status_code=204)
runner = Runner(api_url="http://api", jwt_token="abc")
- assert runner._request_job() is None
- requests_mock.post("http://api/runner/request_job", json={"sample_id": 44})
- assert runner._request_job() == 44
+ assert runner._request_job() is False
+ requests_mock.post(
+ "http://api/runner/request_job", json={"job_id": 22, "sample_id": 44}
+ )
+ assert runner._request_job() is True