Skip to content

Commit

Permalink
Add some basic error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
anna-parker committed Jul 18, 2024
1 parent 622078f commit 2de3d4d
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 115 deletions.
146 changes: 113 additions & 33 deletions ena-submission/scripts/create_project.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
# This script adds all approved sequences to the submission_table
# - this should trigger the submission process.

import json
import logging
from dataclasses import dataclass
from typing import Dict, List

import click
import psycopg2
import requests
import yaml
from call_loculus import get_group_info
from ena_submission_helper import create_project_xml
from ena_submission_helper import create_ena_project
from ena_types import (
OrganismType,
ProjectLink,
Expand Down Expand Up @@ -53,6 +55,29 @@ class Config:
unique_project_suffix: str


def construct_project_set_object(
group_info: Dict[str, str], config: Config, metadata_dict: Dict[str, str], row: Dict[str, str]
):
project_type = ProjectType(
center_name=XmlAttribute(group_info["institution"]),
alias=XmlAttribute(f"{row["group_id"]}:{row["organism"]}:{config.unique_project_suffix}"),
name=metadata_dict["scientific_name"],
title=f"{metadata_dict["scientific_name"]}: Genome sequencing",
description=(
f"Automated upload of {metadata_dict["scientific_name"]} sequences ",
f"submitted by {group_info["institution"]} from {config.db_name}",
),
submission_project=SubmissionProject(
organism=OrganismType(
taxon_id=metadata_dict["taxon_id"],
scientific_name=metadata_dict["scientific_name"],
)
),
project_links=[ProjectLink(xref_link=XrefType(db=config.db_name, id=row["group_id"]))],
)
return ProjectSet(project=[project_type])


@click.command()
@click.option(
"--log-level",
Expand Down Expand Up @@ -83,22 +108,23 @@ def create_project(log_level, config_file):
db_config, table_name="submission_table", conditions=conditions
)
for row in ready_to_submit:
group_key = {"group_id": row["group_id"], "organism": row["organism"]}
seq_key = {"accession": row["accession"], "version": row["version"]}

# 1. check if there exists an entry in the project table for (group_id, organism)
conditions = {"group_id": row["group_id"], "organism": row["organism"]}
corresponding_project = find_conditions_in_db(
db_config, table_name="project_table", conditions=conditions
db_config, table_name="project_table", conditions=group_key
)
if len(corresponding_project) == 1:
conditions = {"accession": row["accession"], "version": row["version"]}
if corresponding_project[0]["status"] == Status.SUBMITTED.name:
update_values = {"status_all": StatusAll.SUBMITTED_PROJECT.name}
number_rows_updated = update_db_where_conditions(
db_config,
table_name="submission_table",
conditions=conditions,
conditions=seq_key,
update_values=update_values,
)
if number_rows_updated != 1:
if number_rows_updated > 1:
raise psycopg2.DatabaseError(
"found multiple rows in submission_table with same primary key"
)
Expand All @@ -107,10 +133,10 @@ def create_project(log_level, config_file):
number_rows_updated = update_db_where_conditions(
db_config,
table_name="submission_table",
conditions=conditions,
conditions=seq_key,
update_values=update_values,
)
if number_rows_updated != 1:
if number_rows_updated > 1:
raise psycopg2.DatabaseError(
"found multiple rows in submission_table with same primary key"
)
Expand All @@ -130,46 +156,100 @@ def create_project(log_level, config_file):
number_rows_updated = update_db_where_conditions(
db_config,
table_name="submission_table",
conditions=conditions,
conditions=seq_key,
update_values=update_values,
)
if number_rows_updated > 1:
raise psycopg2.DatabaseError(
"found multiple rows in submission table with same primary key"
)
conditions = {"status_all": StatusAll.SUBMITTING_PROJECT.name}
submitting_project = find_conditions_in_db(
db_config, table_name="submission_table", conditions=conditions
)
for row in submitting_project:
group_key = {"group_id": row["group_id"], "organism": row["organism"]}
seq_key = {"accession": row["accession"], "version": row["version"]}

# 1. check if there exists an entry in the project table for (group_id, organism)
corresponding_project = find_conditions_in_db(
db_config, table_name="project_table", conditions=group_key
)
if len(corresponding_project) == 1:
if corresponding_project[0]["status"] == Status.SUBMITTED.name:
update_values = {"status_all": StatusAll.SUBMITTED_PROJECT.name}
number_rows_updated = update_db_where_conditions(
db_config,
table_name="submission_table",
conditions=seq_key,
update_values=update_values,
)
if number_rows_updated > 1:
raise psycopg2.DatabaseError(
"found multiple rows in submission_table with same primary key"
)
elif len(corresponding_project) > 1:
raise psycopg2.DatabaseError(
"found multiple rows in project_table with same primary key"
)
# Check project_table for newly added sequences
conditions = {"status": Status.READY.name}
ready_to_submit_project = find_conditions_in_db(
db_config, table_name="project_table", conditions=conditions
)
for row in ready_to_submit_project:
group_key = {"group_id": row["group_id"], "organism": row["organism"]}

metadata_dict = config.organisms[row["organism"]]["ingest"]
group_info = get_group_info(config, row["group_id"])[0]["group"]
submission_project = SubmissionProject(
organism=OrganismType(
taxon_id=metadata_dict["taxon_id"],
scientific_name=metadata_dict["scientific_name"],

project_set = construct_project_set_object(group_info, config, metadata_dict, row)
try:
update_values = {"status": Status.SUBMITTING.name}
number_rows_updated = update_db_where_conditions(
db_config,
table_name="project_table",
conditions=group_key,
update_values=update_values,
)
)
project_link = ProjectLink(xref_link=XrefType(db=config.db_name, id=row["group_id"]))
project_config = {
"center_name": XmlAttribute(group_info["institution"]),
"alias": XmlAttribute(
f"{row["group_id"]}:{row["organism"]}:{config.unique_project_suffix}"
),
"name": metadata_dict["scientific_name"],
"title": f"{metadata_dict["scientific_name"]}: Genome sequencing",
"description": (
f"Automated upload of {metadata_dict["scientific_name"]} sequences ",
f"submitted by {group_info["institution"]} from {config.db_name}",
),
"submission_project": submission_project,
"project_links": [project_link],
}
project_type = {"project": [ProjectType(**project_config)]}
project_set = ProjectSet(**project_type)
create_project_xml(config, project_set, root_name="project_set")
return
if number_rows_updated > 1:
raise psycopg2.DatabaseError(
"found multiple rows in project table with same primary key"
)
results = create_ena_project(config, project_set)
update_values = {"status": Status.SUBMITTED.name, "result": json.dumps(results)}
number_rows_updated = update_db_where_conditions(
db_config,
table_name="project_table",
conditions=group_key,
update_values=update_values,
)
if number_rows_updated > 1:
raise psycopg2.DatabaseError(
"found multiple rows in project table with same primary key"
)
except requests.exceptions.RequestException as e:
# set to has_errors, add errors
update_values = {"status": Status.HAS_ERRORS.name, "errors": json.dumps(e.response)}
number_rows_updated = update_db_where_conditions(
db_config,
table_name="project_table",
conditions=group_key,
update_values=update_values,
)
if number_rows_updated > 1:
raise psycopg2.DatabaseError(
"found multiple rows in project table with same primary key"
)
# Check project_table for sequences with errors or in submitting status for too long
entries_with_errors = find_conditions_in_db(
db_config, table_name="project_table", conditions={"status": Status.HAS_ERRORS.name}
)
# entries_taking_too_long = find_conditions_in_db(
# db_config, table_name="project_table", conditions={"status": Status.SUBMITTING.name, "started_at": }
# )
# Query ENA if data has already gotten there.
# Resubmit for x number of times.


if __name__ == "__main__":
Expand Down
143 changes: 62 additions & 81 deletions ena-submission/scripts/ena_submission_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,37 @@

import requests
import xmltodict
from ena_types import ProjectType, XmlAttribute
from ena_types import ProjectSet, XmlAttribute
from requests.auth import HTTPBasicAuth


def recursive_defaultdict():
return defaultdict(recursive_defaultdict)


def dataclass_to_xml(dataclass_instance, root_name="root"):
def dataclass_to_dict(dataclass_instance):
"""
Converts a dataclass instance to a dictionary, handling nested dataclasses.
"""
if not hasattr(dataclass_instance, "__dataclass_fields__"):
return dataclass_instance
result = {}
for field in dataclass_instance.__dataclass_fields__:
value = getattr(dataclass_instance, field)
is_xml_attribute = isinstance(value, XmlAttribute)
if value is None:
continue
if isinstance(value, list):
result[field] = [dataclass_to_dict(item) for item in value]
elif is_xml_attribute:
attribute_field = "@" + field
result[attribute_field] = value
else:
result[field] = dataclass_to_dict(value)
return result
def dataclass_to_dict(dataclass_instance):
"""
Converts a dataclass instance to a dictionary, handling nested dataclasses.
"""
if not hasattr(dataclass_instance, "__dataclass_fields__"):
return dataclass_instance
result = {}
for field in dataclass_instance.__dataclass_fields__:
value = getattr(dataclass_instance, field)
is_xml_attribute = isinstance(value, XmlAttribute)
if value is None:
continue
if isinstance(value, list):
result[field] = [dataclass_to_dict(item) for item in value]
elif is_xml_attribute:
attribute_field = "@" + field
result[attribute_field] = value
else:
result[field] = dataclass_to_dict(value)
return result


def dataclass_to_xml(dataclass_instance, root_name="root"):
dataclass_dict = dataclass_to_dict(dataclass_instance)
return xmltodict.unparse({root_name: dataclass_dict}, pretty=True)

Expand All @@ -43,71 +44,51 @@ def get_submission_dict():
return submission


def create_project_xml(config, project_type: ProjectType, root_name):
def get_project_xml(project_type):
def create_ena_project(config, project_set: ProjectSet):
def get_project_xml(project_set):
submission_set = get_submission_dict()
project_set = dataclass_to_xml(project_type, root_name=root_name)
print(project_set)
project_set = {"project_set": dataclass_to_dict(project_set)}
webin = {"WEBIN": {**submission_set, **project_set}}
return xmltodict.unparse(webin, pretty=True)

xml = get_project_xml(project_type)
print(xml)
return xml
# response = post_webin(xml, config)
# return response


def create_sample(config):
def get_sample_xml(alias, taxon_id, scientific_name, attributes):
submission_set = get_submission_dict()
sample_set = recursive_defaultdict()
sample = {
"@alias": f"{alias}{random.randint(1000, 9999)}",
"TITLE": "titleTBD",
"SAMPLE_NAME": {
"TAXON_ID": taxon_id,
"SCIENTIFIC_NAME": scientific_name,
"COMMON_NAME": None,
},
"SAMPLE_ATTRIBUTES": {
"SAMPLE_ATTRIBUTE": [
{"TAG": key, "VALUE": value} for key, value in attributes.items()
]
},
}
sample_set["SAMPLE_SET"]["SAMPLE"] = sample
webin = {"WEBIN": {**submission_set, **sample_set}}
return xmltodict.unparse(webin, pretty=True)

xml = get_sample_xml(
"aliasTBD",
1284369,
"nameTBD",
{
"collection date": "not collected",
"geographic location (country and/or sea)": "not collected",
"ENA-CHECKLIST": "ERC000011",
},
)
xml = get_project_xml(project_set)
response = post_webin(xml, config)
return response


def create_assembly(config):
# Your code for create_assembly would go here
pass
valid = (
response["RECEIPT"]["@success"] == "true"
and response["RECEIPT"]["PROJECT"]["@accession"]
and response["RECEIPT"]["SUBMISSION"]["@accession"]
)
if not valid:
raise requests.exceptions.RequestException
project_results = {
"ena_project_accession": response["RECEIPT"]["PROJECT"]["@accession"],
"ena_submission_accession": response["RECEIPT"]["SUBMISSION"]["@accession"],
}
return project_results


def post_webin(xml, config):
headers = {"Accept": "application/xml", "Content-Type": "application/xml"}
response = requests.post(
config.url,
auth=HTTPBasicAuth(config.username, config.password),
data=xml,
headers=headers,
)
if response.status_code == 200:
return xmltodict.parse(response.text)
else:
raise Exception("Error:", response.status_code, response.text)
try:
# response = requests.post(
# config.url,
# auth=HTTPBasicAuth(config.username, config.password),
# data=xml,
# headers=headers,
# timeout=10, # wait a full 10 seconds for a response incase slow
# )
# response.raise_for_status()
response = """<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="receipt.xsl"?>
<RECEIPT receiptDate="2017-05-09T16:58:08.634+01:00" submissionFile="submission.xml" success="true">
<PROJECT accession="PRJEB20767" alias="cheddar_cheese" status="PRIVATE" />
<SUBMISSION accession="ERA912529" alias="cheese" />
<MESSAGES>
<INFO>This submission is a TEST submission and will be discarded within 24 hours</INFO>
</MESSAGES>
<ACTIONS>ADD</ACTIONS>
</RECEIPT>"""
except requests.exceptions.RequestException as e:
raise (e)
# return xmltodict.parse(response.text)
return xmltodict.parse(response)
2 changes: 1 addition & 1 deletion ena-submission/scripts/submission_db_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def update_db_where_conditions(db_config, table_name, conditions, update_values)
updated_row_count = 0
try:
query = f"UPDATE {table_name} SET "
query += " AND ".join([f"{key}='{value}'" for key, value in update_values.items()])
query += ", ".join([f"{key}='{value}'" for key, value in update_values.items()])
query += " WHERE "
query += " AND ".join([f"{key}=%s" for key in conditions])

Expand Down

0 comments on commit 2de3d4d

Please sign in to comment.