Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(prepro): Allow nextclade metadata fields to be per_segment #2108

Merged
merged 9 commits into from
Jun 10, 2024
7 changes: 6 additions & 1 deletion kubernetes/loculus/templates/_preprocessingFromValues.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@
{{- if and $use_segments .per_segment }}
{{- range $segment := $segments }}
{{ printf "%s_%s :" $currentItem.name $segment}}
{{- if $currentItem.type }}
{{- if or $currentItem.type $currentItem.per_segment}}
args:
{{- if $currentItem.per_segment }}
segment: {{ $segment }}
{{- end }}
{{- if $currentItem.type }}
type: {{ $currentItem.type }}
{{- end }}
{{- end }}
{{- if $currentItem.preprocessing }}
{{- if hasKey $currentItem.preprocessing "function" }}
Expand Down
23 changes: 16 additions & 7 deletions kubernetes/loculus/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -776,60 +776,69 @@ defaultOrganismConfig: &defaultOrganismConfig
header: "Alignment states and QC metrics"
noInput: true
rangeSearch: true
per_segment: true
preprocessing:
args: { type: int }
args: { type: int, per_segment: true }
corneliusroemer marked this conversation as resolved.
Show resolved Hide resolved
inputs: { input: nextclade.totalSubstitutions }
- name: total_inserted_nucs
type: int
header: "Alignment states and QC metrics"
noInput: true
rangeSearch: true
per_segment: true
corneliusroemer marked this conversation as resolved.
Show resolved Hide resolved
preprocessing:
args: { type: int }
args: { type: int, per_segment: true }
inputs: { input: nextclade.totalInsertions }
- name: total_deleted_nucs
type: int
header: "Alignment states and QC metrics"
noInput: true
rangeSearch: true
per_segment: true
preprocessing:
args: { type: int }
args: { type: int, per_segment: true }
inputs: { input: nextclade.totalDeletions }
- name: total_ambiguous_nucs
type: int
header: "Alignment states and QC metrics"
noInput: true
rangeSearch: true
per_segment: true
preprocessing:
args: { type: int }
args: { type: int, per_segment: true }
inputs: { input: "nextclade.totalNonACGTNs" }
- name: total_unknown_nucs
type: int
header: "Alignment states and QC metrics"
noInput: true
rangeSearch: true
per_segment: true
preprocessing:
args: { type: int }
args: { type: int, per_segment: true }
inputs: { input: nextclade.totalMissing }
- name: total_frame_shifts
type: int
rangeSearch: true
header: "Alignment states and QC metrics"
noInput: true
per_segment: true
preprocessing:
args: { type: int }
args: { type: int, per_segment: true }
inputs: { input: nextclade.totalFrameShifts }
- name: frame_shifts
header: "Alignment states and QC metrics"
noInput: true
per_segment: true
preprocessing:
args: { per_segment: true }
inputs: { input: nextclade.frameShifts }
- name: completeness
type: float
header: "Alignment states and QC metrics"
noInput: true
per_segment: true
preprocessing:
args: { type: float }
args: { type: float, per_segment: true }
inputs: { input: nextclade.coverage }
website: &website
tableColumns:
Expand Down
2 changes: 2 additions & 0 deletions preprocessing/nextclade/dev_dependencies.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ mypy
ruff
types-PyYAML
types-requests
types-pytz
types-python-dateutil
58 changes: 58 additions & 0 deletions preprocessing/nextclade/src/loculus_preprocessing/backend.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
"""Functions to interface with the backend"""

import dataclasses
import datetime as dt
import json
import logging
import time
from collections.abc import Sequence
from http import HTTPStatus
from pathlib import Path

import jwt
import pytz
import requests

from .config import Config
from .datatypes import (
ProcessedEntry,
)


class JwtCache:
Expand Down Expand Up @@ -55,3 +64,52 @@ def get_jwt(config: Config) -> str:
error_msg = f"Fetching JWT failed with status code {response.status_code}: {response.text}"
logging.error(error_msg)
raise Exception(error_msg)


def fetch_unprocessed_sequences(n: int, config: Config) -> str:
url = config.backend_host.rstrip("/") + "/extract-unprocessed-data"
logging.debug(f"Fetching {n} unprocessed sequences from {url}")
params = {"numberOfSequenceEntries": n, "pipelineVersion": config.pipeline_version}
headers = {"Authorization": "Bearer " + get_jwt(config)}
response = requests.post(url, data=params, headers=headers, timeout=10)
if not response.ok:
if response.status_code == HTTPStatus.UNPROCESSABLE_ENTITY:
logging.debug(f"{response.text}.\nSleeping for a while.")
time.sleep(60 * 1)
return ""
msg = f"Fetching unprocessed data failed. Status code: {
response.status_code}"
raise Exception(
msg,
response.text,
)
return response.text


def submit_processed_sequences(
processed: Sequence[ProcessedEntry], dataset_dir: str, config: Config
) -> None:
json_strings = [json.dumps(dataclasses.asdict(sequence)) for sequence in processed]
if config.keep_tmp_dir:
# For debugging: write all submit requests to submission_requests.json
with open(dataset_dir + "/submission_requests.json", "w", encoding="utf-8") as f:
for seq in processed:
json.dump(dataclasses.asdict(seq), f)
ndjson_string = "\n".join(json_strings)
url = config.backend_host.rstrip("/") + "/submit-processed-data"
headers = {
"Content-Type": "application/x-ndjson",
"Authorization": "Bearer " + get_jwt(config),
}
params = {"pipelineVersion": config.pipeline_version}
response = requests.post(url, data=ndjson_string, headers=headers, params=params, timeout=10)
if not response.ok:
Path("failed_submission.json").write_text(ndjson_string, encoding="utf-8")
msg = (
f"Submitting processed data failed. Status code: {
response.status_code}\n"
f"Response: {response.text}\n"
f"Data sent in request: {ndjson_string[0:1000]}...\n"
)
raise RuntimeError(msg)
logging.info("Processed data submitted successfully")
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
@dataclass
class Config:
organism: str = "mpox"
backend_host: str = None # Set default to None or similar placeholder
backend_host: str = ""
keycloak_host: str = "http://127.0.0.1:8083"
keycloak_user: str = "preprocessing_pipeline"
keycloak_password: str = "preprocessing_pipeline"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

AccessionVersion = str
GeneName = str
SegmentName = str
NucleotideSequence = str
AminoAcidSequence = str
NucleotideInsertion = str
Expand Down Expand Up @@ -66,10 +67,10 @@ class ProcessingSpec:
class UnprocessedAfterNextclade:
inputMetadata: InputMetadata
# Derived metadata produced by Nextclade
nextcladeMetadata: dict[str, Any] | None
unalignedNucleotideSequences: dict[str, NucleotideSequence | None]
alignedNucleotideSequences: dict[str, NucleotideSequence | None]
nucleotideInsertions: dict[str, list[NucleotideInsertion]]
nextcladeMetadata: dict[SegmentName, Any] | None
anna-parker marked this conversation as resolved.
Show resolved Hide resolved
unalignedNucleotideSequences: dict[SegmentName, NucleotideSequence | None]
alignedNucleotideSequences: dict[SegmentName, NucleotideSequence | None]
nucleotideInsertions: dict[SegmentName, list[NucleotideInsertion]]
alignedAminoAcidSequences: dict[GeneName, AminoAcidSequence | None]
aminoAcidInsertions: dict[GeneName, list[AminoAcidInsertion]]

Expand Down
Loading
Loading