Skip to content

Commit

Permalink
Reuse general submission structure from create_project.
Browse files Browse the repository at this point in the history
  • Loading branch information
anna-parker committed Jul 19, 2024
1 parent ce694a4 commit 70015ec
Show file tree
Hide file tree
Showing 4 changed files with 283 additions and 8 deletions.
8 changes: 6 additions & 2 deletions ena-submission/flyway/sql/V1__Initial_Schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ CREATE TABLE submission_table (
finished_at timestamp,
metadata jsonb,
aligned_nucleotide_sequences jsonb,
center_name text,
external_metadata jsonb,
primary key (accession, version)
);
Expand All @@ -22,6 +23,7 @@ CREATE TABLE project_table (
status text not null,
started_at timestamp not null,
finished_at timestamp,
center_name text,
result jsonb,
primary key (group_id, organism)
);
Expand All @@ -34,7 +36,8 @@ CREATE TABLE sample_table (
status text not null,
started_at timestamp not null,
finished_at timestamp,
sample_metadata jsonb,
center_name text,
result jsonb,
primary key (accession, version)
);

Expand All @@ -46,6 +49,7 @@ CREATE TABLE assembly_table (
status text not null,
started_at timestamp not null,
finished_at timestamp,
assembly_metadata jsonb,
center_name text,
result jsonb,
primary key (accession, version)
);
18 changes: 12 additions & 6 deletions ena-submission/scripts/create_project.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
# This script adds all approved sequences to the submission_table
# - this should trigger the submission process.

import json
import logging
from dataclasses import dataclass
Expand Down Expand Up @@ -124,7 +121,10 @@ def create_project(log_level, config_file):
)
if len(corresponding_project) == 1:
if corresponding_project[0]["status"] == Status.SUBMITTED.name:
update_values = {"status_all": StatusAll.SUBMITTED_PROJECT.name}
update_values = {
"status_all": StatusAll.SUBMITTED_PROJECT.name,
"center_name": corresponding_project[0]["center_name"],
}
number_rows_updated = update_db_where_conditions(
db_config,
table_name="submission_table",
Expand Down Expand Up @@ -176,7 +176,10 @@ def create_project(log_level, config_file):
len(corresponding_project) == 1
and corresponding_project[0]["status"] == Status.SUBMITTED.name
):
update_values = {"status_all": StatusAll.SUBMITTED_PROJECT.name}
update_values = {
"status_all": StatusAll.SUBMITTED_PROJECT.name,
"center_name": corresponding_project[0]["center_name"],
}
number_rows_updated = update_db_where_conditions(
db_config,
table_name="submission_table",
Expand Down Expand Up @@ -204,7 +207,10 @@ def create_project(log_level, config_file):
group_info = get_group_info(config, row["group_id"])[0]["group"]

project_set = construct_project_set_object(group_info, config, metadata_dict, row)
update_values = {"status": Status.SUBMITTING.name}
update_values = {
"status": Status.SUBMITTING.name,
"center_name": group_info["institution"],
}
number_rows_updated = update_db_where_conditions(
db_config,
table_name="project_table",
Expand Down
249 changes: 249 additions & 0 deletions ena-submission/scripts/create_sample.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
import json
import logging
from dataclasses import dataclass
from typing import Dict, List

import click
import yaml
from ena_submission_helper import CreationResults, create_ena_sample
from ena_types import (
OrganismType,
ProjectLink,
ProjectLinks,
ProjectSet,
ProjectType,
SubmissionProject,
XmlAttribute,
XrefType,
)
from submission_db_helper import (
SampleTableEntry,
Status,
StatusAll,
add_to_sample_table,
find_conditions_in_db,
find_errors_in_db,
get_db_config,
update_db_where_conditions,
)

logger = logging.getLogger(__name__)
logging.basicConfig(
encoding="utf-8",
level=logging.INFO,
format="%(asctime)s %(levelname)8s (%(filename)20s:%(lineno)4d) - %(message)s ",
datefmt="%H:%M:%S",
)


@dataclass
class Config:
organisms: List[Dict[str, str]]
backend_url: str
keycloak_token_url: str
keycloak_client_id: str
username: str
password: str
db_username: str
db_password: str
db_host: str
db_name: str
unique_project_suffix: str
ena_submission_url: str
ena_submission_password: str
ena_submission_username: str


def construct_sample_set_object(config, ingest_metadata, row):
# TODO: add function using https://www.ebi.ac.uk/ena/browser/view/ERC000033 as a template
pass


@click.command()
@click.option(
"--log-level",
default="INFO",
type=click.Choice(["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]),
)
@click.option(
"--config-file",
required=True,
type=click.Path(exists=True),
)
def create_sample(log_level, config_file):
logger.setLevel(log_level)
logging.getLogger("requests").setLevel(logging.INFO)

with open(config_file) as file:
full_config = yaml.safe_load(file)
relevant_config = {key: full_config.get(key, []) for key in Config.__annotations__}
config = Config(**relevant_config)
logger.info(f"Config: {config}")

db_config = get_db_config(config.db_password, config.db_username, config.db_host)

while True:
# Check submission_table for newly added sequences
conditions = {"status_all": StatusAll.READY_TO_SUBMIT.name}
ready_to_submit = find_conditions_in_db(
db_config, table_name="submission_table", conditions=conditions
)
logging.debug(
f"Found {len(ready_to_submit)} entries in submission_table in status READY_TO_SUBMIT"
)
for row in ready_to_submit:
seq_key = {"accession": row["accession"], "version": row["version"]}

# 1. check if there exists an entry in the sample table for seq_key
corresponding_sample = find_conditions_in_db(
db_config, table_name="sample_table", conditions=seq_key
)
if len(corresponding_sample) == 1:
if corresponding_sample[0]["status"] == Status.SUBMITTED.name:
update_values = {"status_all": StatusAll.SUBMITTED_SAMPLE.name}
number_rows_updated = update_db_where_conditions(
db_config,
table_name="submission_table",
conditions=seq_key,
update_values=update_values,
)
else:
update_values = {"status_all": StatusAll.SUBMITTING_SAMPLE.name}
number_rows_updated = update_db_where_conditions(
db_config,
table_name="submission_table",
conditions=seq_key,
update_values=update_values,
)
else:
# If not: create sample_entry, change status to SUBMITTING_SAMPLE
sample_table_entry = SampleTableEntry(**seq_key, center_name=row["center_name"])
add_to_sample_table(db_config, sample_table_entry)
update_values = {"status_all": StatusAll.SUBMITTING_SAMPLE.name}
number_rows_updated = update_db_where_conditions(
db_config,
table_name="submission_table",
conditions=seq_key,
update_values=update_values,
)
conditions = {"status_all": StatusAll.SUBMITTING_SAMPLE.name}
submitting_sample = find_conditions_in_db(
db_config, table_name="submission_table", conditions=conditions
)
logging.debug(
(
f"Found {len(submitting_sample)} entries in submission_table in",
" status SUBMITTING_SAMPLE",
)
)
for row in submitting_sample:
seq_key = {"accession": row["accession"], "version": row["version"]}

# 1. check if there exists an entry in the sample table for seq_key
corresponding_sample = find_conditions_in_db(
db_config, table_name="sample_table", conditions=seq_key
)
if (
len(corresponding_sample) == 1
and corresponding_sample[0]["status"] == Status.SUBMITTED.name
):
update_values = {"status_all": StatusAll.SUBMITTED_SAMPLE.name}
number_rows_updated = update_db_where_conditions(
db_config,
table_name="submission_table",
conditions=seq_key,
update_values=update_values,
)
if len(corresponding_sample) == 0:
error_msg = (
"Entry in submission_table in status SUBMITTING_SAMPLE",
" with no corresponding sample",
)
raise RuntimeError(error_msg)
# Check sample_table for newly added sequences
conditions = {"status": Status.READY.name}
ready_to_submit_sample = find_conditions_in_db(
db_config, table_name="sample_table", conditions=conditions
)
logging.debug(
f"Found {len(ready_to_submit_sample)} entries in sample_table in status READY"
)
for row in ready_to_submit_sample:
ingest_metadata = config.organisms[row["organism"]]["ingest"]

sample_set = construct_sample_set_object(config, ingest_metadata, row)
update_values = {"status": Status.SUBMITTING.name}
number_rows_updated = update_db_where_conditions(
db_config,
table_name="sample_table",
conditions=seq_key,
update_values=update_values,
)
if number_rows_updated != 1:
# state not correctly updated - do not start submission
logging.debug(
(
"sample_table: Status update from READY to SUBMITTING failed ",
"- not starting submission.",
)
)
continue
logging.debug(f"Starting sample creation for accession {row["accession"]}")
sample_creation_results: CreationResults = create_ena_sample(config, sample_set)
if sample_creation_results.results:
update_values = {
"status": Status.SUBMITTED.name,
"result": json.dumps(sample_creation_results.results),
}
number_rows_updated = 0
tries = 0
while number_rows_updated != 1 and tries < 3:
# If state not correctly added retry 3 times
logging.debug(
f"Sample created but DB update failed - reentry DB update #{tries}."
)
number_rows_updated = update_db_where_conditions(
db_config,
table_name="sample_table",
conditions=seq_key,
update_values=update_values,
)
tries += 1
else:
update_values = {
"status": Status.HAS_ERRORS.name,
"errors": json.dumps(sample_creation_results.errors),
}
number_rows_updated = 0
tries = 0
while number_rows_updated != 1 and tries < 3:
# If state not correctly added retry 3 times
logging.debug(
f"sample creation failed and DB update failed - reentry DB update #{tries}."
)
number_rows_updated = update_db_where_conditions(
db_config,
table_name="sample_table",
conditions=seq_key,
update_values=update_values,
)
tries += 1
# Check sample_table for sequences with errors or in submitting status for too long
time_threshold = 15
entries_with_errors = find_errors_in_db(
db_config, "sample_table", time_threshold=time_threshold
)
logging.debug(
f"Found {len(entries_with_errors)} entries in sample_table in status HAS_ERRORS",
f" for {time_threshold}m",
)
for row in ready_to_submit_sample:
# TODO: Query ENA to check if sample has in fact been created
# If created update sample_table
# If not retry 3 times, then raise for manual intervention
error_msg = "ENA Sample Creation failed - exiting as unable to handle errors"
raise RuntimeError(error_msg)


if __name__ == "__main__":
create_sample()
16 changes: 16 additions & 0 deletions ena-submission/scripts/submission_db_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,18 +70,34 @@ class SubmissionTableEntry:
finished_at: datetime | None = None
metadata: str | None = None
aligned_nucleotide_sequences: str | None = None
center_name: str | None = None
external_metadata: str | None = None


@dataclass
class ProjectTableEntry:
accession: str
version: str
organism: str
errors: str | None = None
warnings: str | None = None
status: Status = Status.READY
started_at: datetime | None = None
finished_at: datetime | None = None
center_name: str | None = None
result: str | None = None


@dataclass
class SampleTableEntry:
group_id: int
organism: str
errors: str | None = None
warnings: str | None = None
status: Status = Status.READY
started_at: datetime | None = None
finished_at: datetime | None = None
center_name: str | None = None
result: str | None = None


Expand Down

0 comments on commit 70015ec

Please sign in to comment.