Skip to content

Commit

Permalink
fix(ingest, multisegmented): Handle segment grouping changes during r…
Browse files Browse the repository at this point in the history
…eingest by also allowing revocation (#2372)

* Add revocation option if grouping changes, this will not be used yet and we will not modify sequences where the grouping has changed yet.

* Add some basic tests for revise, submit and unchanged

* Add tests as part of github workflow
  • Loading branch information
anna-parker authored Aug 12, 2024
1 parent 1f7aaf2 commit 699880e
Show file tree
Hide file tree
Showing 15 changed files with 415 additions and 38 deletions.
36 changes: 36 additions & 0 deletions .github/workflows/ingest-tests.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: ingest-tests
on:
# test
pull_request:
paths:
- "ingest/**"
- ".github/workflows/ingest-tests.yml"
push:
branches:
- main
workflow_dispatch:
concurrency:
group: ci-${{ github.ref == 'refs/heads/main' && github.run_id || github.ref }}-ingest-tests
cancel-in-progress: true
jobs:
unitTests:
name: Unit Tests
runs-on: codebuild-loculus-ci-${{ github.run_id }}-${{ github.run_attempt }}
timeout-minutes: 15
steps:
- uses: actions/checkout@v4
- name: Set up micromamba
uses: mamba-org/setup-micromamba@v1
with:
environment-file: ingest/environment.yml
micromamba-version: 'latest'
init-shell: >-
bash powershell
cache-environment: true
post-cleanup: 'all'
- name: Run tests
run: |
micromamba activate loculus-ingest
pytest tests/
shell: micromamba-shell {0}
working-directory: ingest
33 changes: 33 additions & 0 deletions ingest/Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ rule get_previous_submissions:
Produces mapping from INSDC accession to loculus id/version/hash
insdc_accession:
loculus_accession: abcd
joint_accession: xx.seg1/xy.seg2
versions:
- version: 1
hash: abcd
Expand Down Expand Up @@ -302,6 +303,7 @@ rule compare_hashes:
output:
to_submit="results/to_submit.json",
to_revise="results/to_revise.json",
to_revoke="results/to_revoke.json",
unchanged="results/unchanged.json",
blocked="results/blocked.json",
sampled_out="results/sampled_out.json",
Expand All @@ -316,6 +318,7 @@ rule compare_hashes:
--metadata {input.metadata} \
--to-submit {output.to_submit} \
--to-revise {output.to_revise} \
--to-revoke {output.to_revoke} \
--unchanged {output.unchanged} \
--output-blocked {output.blocked} \
--sampled-out-file {output.sampled_out} \
Expand All @@ -340,11 +343,14 @@ rule prepare_files:
),
to_submit="results/to_submit.json",
to_revise="results/to_revise.json",
to_revoke="results/to_revoke.json",
output:
sequences_submit="results/submit_sequences.fasta",
sequences_revise="results/revise_sequences.fasta",
sequences_revoke="results/sequences_to_submit_prior_to_revoke.fasta",
metadata_submit="results/submit_metadata.tsv",
metadata_revise="results/revise_metadata.tsv",
metadata_revoke="results/metadata_to_submit_prior_to_revoke.tsv",
shell:
"""
python {input.script} \
Expand All @@ -353,10 +359,13 @@ rule prepare_files:
--sequences-path {input.sequences} \
--to-submit-path {input.to_submit} \
--to-revise-path {input.to_revise} \
--to-revoke-path {input.to_revoke} \
--metadata-submit-path {output.metadata_submit} \
--metadata-revise-path {output.metadata_revise} \
--metadata-submit-prior-to-revoke-path {output.metadata_revoke} \
--sequences-submit-path {output.sequences_submit} \
--sequences-revise-path {output.sequences_revise} \
--sequences-submit-prior-to-revoke-path {output.sequences_revoke} \
"""


Expand Down Expand Up @@ -405,6 +414,30 @@ rule revise:
fi
"""

rule regroup_and_revoke:
input:
script="scripts/call_loculus.py",
metadata="results/metadata_to_submit_prior_to_revoke.tsv",
sequences="results/sequences_to_submit_prior_to_revoke.fasta",
map="results/to_revoke.json",
config="results/config.yaml",
output:
revoked=touch("results/revoked"),
params:
log_level=LOG_LEVEL,
shell:
"""
if [ -s {input.metadata} ]; then
python {input.script} \
--mode regroup-and-revoke \
--metadata {input.metadata} \
--sequences {input.sequences} \
--revoke-map {input.map} \
--config-file {input.config} \
--log-level {params.log_level}
fi
"""


rule approve:
input:
Expand Down
100 changes: 74 additions & 26 deletions ingest/scripts/call_loculus.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,28 @@ def submit_or_revise(
return response.json()


def regroup_and_revoke(metadata, sequences, map, config: Config, group_id):
"""
Submit segments in new sequence groups and revoke segments in old (incorrect) groups in Loculus.
"""
response = submit_or_revise(metadata, sequences, config, group_id, mode="submit")
new_accessions = response[0]["accession"] # Will be later added as version comment

url = f"{organism_url(config)}/revoke"

to_revoke = json.load(open(map, encoding="utf-8"))

loc_values = {loc for seq in to_revoke.values() for loc in seq.keys()}
loculus_accessions = set(loc_values)

accessions = {"accessions": list(loculus_accessions)}

response = make_request(HTTPMethod.POST, url, config, json_body=accessions)
logger.debug(f"revocation response: {response.json()}")

return response.json()


def approve(config: Config):
"""
Approve all sequences
Expand Down Expand Up @@ -312,32 +334,43 @@ def get_submitted(config: Config):
original_metadata: dict[str, str] = entry["originalMetadata"]
hash_value = original_metadata.get("hash", "")
if config.segmented:
insdc_accession = "".join([original_metadata[key] for key in insdc_key])
insdc_accessions = [original_metadata[key] for key in insdc_key]
joint_accession = "/".join(
[
f"{original_metadata[key]}.{segment}"
for key, segment in zip(insdc_key, config.nucleotide_sequences)
if original_metadata[key]
]
)
else:
insdc_accession = original_metadata.get("insdc_accession_base", "")

if insdc_accession not in submitted_dict:
submitted_dict[insdc_accession] = {
"loculus_accession": loculus_accession,
"versions": [],
}
elif loculus_accession != submitted_dict[insdc_accession]["loculus_accession"]:
# For now to be forgiving, just move on, but log the error
# This should not happen in production
message = (
f"INSDC accession {insdc_accession} has multiple loculus accessions: "
f"{loculus_accession} and {submitted_dict[insdc_accession]['loculus_accession']}"
insdc_accessions = [original_metadata.get("insdc_accession_base", "")]
joint_accession = original_metadata.get("insdc_accession_base", "")

for insdc_accession in insdc_accessions:
if insdc_accession not in submitted_dict:
submitted_dict[insdc_accession] = {
"loculus_accession": loculus_accession,
"versions": [],
"joint_accession": joint_accession,
}
elif loculus_accession != submitted_dict[insdc_accession]["loculus_accession"]:
# For now to be forgiving, just move on, but log the error
# This should not happen in production
message = (
f"INSDC accession {insdc_accession} has multiple loculus accessions: "
f"{loculus_accession} and {submitted_dict[insdc_accession]['loculus_accession']}"
)
logger.error(message)
continue

submitted_dict[insdc_accession]["versions"].append(
{
"version": loculus_version,
"hash": hash_value,
"status": statuses[loculus_accession][loculus_version],
"joint_accession": joint_accession,
}
)
logger.error(message)
continue

submitted_dict[insdc_accession]["versions"].append(
{
"version": loculus_version,
"hash": hash_value,
"status": statuses[loculus_accession][loculus_version],
}
)

logger.info(f"Got info on {len(submitted_dict)} previously submitted sequences/accessions")

Expand All @@ -358,7 +391,7 @@ def get_submitted(config: Config):
@click.option(
"--mode",
required=True,
type=click.Choice(["submit", "revise", "approve", "get-submitted"]),
type=click.Choice(["submit", "revise", "approve", "regroup-and-revoke", "get-submitted"]),
)
@click.option(
"--log-level",
Expand All @@ -375,7 +408,12 @@ def get_submitted(config: Config):
required=False,
type=click.Path(),
)
def submit_to_loculus(metadata, sequences, mode, log_level, config_file, output):
@click.option(
"--revoke-map",
required=False,
type=click.Path(exists=True),
)
def submit_to_loculus(metadata, sequences, mode, log_level, config_file, output, revoke_map):
"""
Submit data to Loculus.
"""
Expand Down Expand Up @@ -416,6 +454,16 @@ def record_factory(*args, **kwargs):
logger.info(f"Approved: {len(response)} sequences")
sleep(30)

if mode == "regroup-and-revoke":
try:
group_id = get_or_create_group(config, allow_creation=mode == "submit")
except ValueError as e:
logger.error(f"Aborting {mode} due to error: {e}")
return
logger.info("Submitting new segment groups and revoking old segment groups")
response = regroup_and_revoke(metadata, sequences, revoke_map, config, group_id)
logger.info(f"Revoked: {len(response)} sequence entries of old segment groups")

if mode == "get-submitted":
logger.info("Getting submitted sequences")
response = get_submitted(config)
Expand Down
75 changes: 64 additions & 11 deletions ingest/scripts/compare_hashes.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def md5_float(string: str) -> float:
@click.option("--metadata", required=True, type=click.Path(exists=True))
@click.option("--to-submit", required=True, type=click.Path())
@click.option("--to-revise", required=True, type=click.Path())
@click.option("--to-revoke", required=True, type=click.Path())
@click.option("--unchanged", required=True, type=click.Path())
@click.option("--sampled-out-file", required=True, type=click.Path())
@click.option("--output-blocked", required=True, type=click.Path())
Expand All @@ -51,6 +52,7 @@ def main(
metadata: str,
to_submit: str,
to_revise: str,
to_revoke: str,
unchanged: str,
output_blocked: str,
sampled_out_file: str,
Expand Down Expand Up @@ -85,19 +87,15 @@ def main(
blocked = defaultdict(dict) # Mapping for sequences that cannot be updated due to status
sampled_out = [] # INSDC accessions that were sampled out
hashes = [] # Hashes of all INSDC accessions, for debugging
revoke = {} # Map of new grouping joint insdc accessions to map of previous state
# i.e. loculus accessions (to be revoked) and their corresponding old joint insdc accessions

for fasta_id, record in new_metadata.items():
if config.segmented:
insdc_keys = [
f"insdc_accession_base_{segment}" for segment in config.nucleotide_sequences
]
else:
insdc_keys = ["insdc_accession_base"]
has_insdc_key = any(record[key] is not None or record[key] for key in insdc_keys)
if has_insdc_key:
insdc_accession_base = "".join(
["" if record[key] is None else record[key] for key in insdc_keys]
)
if not config.segmented:
insdc_accession_base = record["insdc_accession_base"]
if not insdc_accession_base:
msg = "Ingested sequences without INSDC accession base - potential internal error"
raise ValueError(msg)
hash_float = md5_float(insdc_accession_base)
if config.debug_hashes:
hashes.append(hash_float)
Expand All @@ -119,13 +117,68 @@ def main(
]
else:
noop[fasta_id] = submitted[insdc_accession_base]["loculus_accession"]
continue

insdc_keys = [f"insdc_accession_base_{segment}" for segment in config.nucleotide_sequences]
insdc_accession_base_list = [record[key] for key in insdc_keys if record[key]]
if len(insdc_accession_base_list) == 0:
msg = (
"Ingested multi-segmented sequences without INSDC accession base(s) "
"- potential internal error"
)
raise ValueError(msg)
insdc_accession_base = "/".join(
[
f"{record[key]}.{segment}"
for key, segment in zip(insdc_keys, config.nucleotide_sequences)
if record[key]
]
)
hash_float = md5_float(insdc_accession_base)
if config.debug_hashes:
hashes.append(hash_float)
keep = hash_float <= subsample_fraction
if not keep:
sampled_out.append({fasta_id: insdc_accession_base, "hash": hash_float})
continue
if all(accession not in submitted for accession in insdc_accession_base_list):
submit.append(fasta_id)
continue
if all(accession in submitted for accession in insdc_accession_base_list) and all(
submitted[accession]["joint_accession"] == insdc_accession_base
for accession in insdc_accession_base_list
):
# grouping is the same, can just look at first segment in group
accession = insdc_accession_base_list[0]
latest = submitted[accession]["versions"][-1]
if latest["hash"] != record["hash"]:
status = latest["status"]
if status == "APPROVED_FOR_RELEASE":
revise[fasta_id] = submitted[accession]["loculus_accession"]
else:
blocked[status][fasta_id] = submitted[accession]["loculus_accession"]
else:
noop[fasta_id] = submitted[accession]["loculus_accession"]
continue
old_accessions = {}
for accession in insdc_accession_base_list:
if accession in submitted:
old_accessions[submitted[accession]["loculus_accession"]] = submitted[accession][
"joint_accession"
]
logger.warn(
"Grouping has changed. Ingest would like to group INSDC samples:"
f"{insdc_accession_base}, however these were previously grouped as {old_accessions}"
)
revoke[fasta_id] = old_accessions

outputs = [
(submit, to_submit, "Sequences to submit"),
(revise, to_revise, "Sequences to revise"),
(noop, unchanged, "Unchanged sequences"),
(blocked, output_blocked, "Blocked sequences"),
(sampled_out, sampled_out_file, "Sampled out sequences"),
(revoke, to_revoke, "Sequences to revoke"),
]

if config.debug_hashes:
Expand Down
2 changes: 1 addition & 1 deletion ingest/scripts/group_segments.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def main(
number_of_segmented_records = len(segment_metadata.keys())
logger.info(f"Found {number_of_segmented_records} individual segments in metadata file")

# Group sequences according to isolate, collection date and isolate specific values
# Group segments according to isolate, collection date and isolate specific values
# These are the fields that are expected to be identical across all segments for a given isolate
shared_fields = config.shared_fields
logger.info(f"Fields required to be identical for grouping: {shared_fields}")
Expand Down
Loading

0 comments on commit 699880e

Please sign in to comment.