diff --git a/ena-submission/scripts/create_project.py b/ena-submission/scripts/create_project.py index f11fb9f441..121cbea05b 100644 --- a/ena-submission/scripts/create_project.py +++ b/ena-submission/scripts/create_project.py @@ -1,15 +1,17 @@ # This script adds all approved sequences to the submission_table # - this should trigger the submission process. +import json import logging from dataclasses import dataclass from typing import Dict, List import click import psycopg2 +import requests import yaml from call_loculus import get_group_info -from ena_submission_helper import create_project_xml +from ena_submission_helper import create_ena_project from ena_types import ( OrganismType, ProjectLink, @@ -53,6 +55,29 @@ class Config: unique_project_suffix: str +def construct_project_set_object( + group_info: Dict[str, str], config: Config, metadata_dict: Dict[str, str], row: Dict[str, str] +): + project_type = ProjectType( + center_name=XmlAttribute(group_info["institution"]), + alias=XmlAttribute(f"{row["group_id"]}:{row["organism"]}:{config.unique_project_suffix}"), + name=metadata_dict["scientific_name"], + title=f"{metadata_dict["scientific_name"]}: Genome sequencing", + description=( + f"Automated upload of {metadata_dict["scientific_name"]} sequences ", + f"submitted by {group_info["institution"]} from {config.db_name}", + ), + submission_project=SubmissionProject( + organism=OrganismType( + taxon_id=metadata_dict["taxon_id"], + scientific_name=metadata_dict["scientific_name"], + ) + ), + project_links=[ProjectLink(xref_link=XrefType(db=config.db_name, id=row["group_id"]))], + ) + return ProjectSet(project=[project_type]) + + @click.command() @click.option( "--log-level", @@ -83,22 +108,23 @@ def create_project(log_level, config_file): db_config, table_name="submission_table", conditions=conditions ) for row in ready_to_submit: + group_key = {"group_id": row["group_id"], "organism": row["organism"]} + seq_key = {"accession": row["accession"], "version": row["version"]} + # 1. check if there exists an entry in the project table for (group_id, organism) - conditions = {"group_id": row["group_id"], "organism": row["organism"]} corresponding_project = find_conditions_in_db( - db_config, table_name="project_table", conditions=conditions + db_config, table_name="project_table", conditions=group_key ) if len(corresponding_project) == 1: - conditions = {"accession": row["accession"], "version": row["version"]} if corresponding_project[0]["status"] == Status.SUBMITTED.name: update_values = {"status_all": StatusAll.SUBMITTED_PROJECT.name} number_rows_updated = update_db_where_conditions( db_config, table_name="submission_table", - conditions=conditions, + conditions=seq_key, update_values=update_values, ) - if number_rows_updated != 1: + if number_rows_updated > 1: raise psycopg2.DatabaseError( "found multiple rows in submission_table with same primary key" ) @@ -107,10 +133,10 @@ def create_project(log_level, config_file): number_rows_updated = update_db_where_conditions( db_config, table_name="submission_table", - conditions=conditions, + conditions=seq_key, update_values=update_values, ) - if number_rows_updated != 1: + if number_rows_updated > 1: raise psycopg2.DatabaseError( "found multiple rows in submission_table with same primary key" ) @@ -130,46 +156,100 @@ def create_project(log_level, config_file): number_rows_updated = update_db_where_conditions( db_config, table_name="submission_table", - conditions=conditions, + conditions=seq_key, update_values=update_values, ) if number_rows_updated > 1: raise psycopg2.DatabaseError( "found multiple rows in submission table with same primary key" ) + conditions = {"status_all": StatusAll.SUBMITTING_PROJECT.name} + submitting_project = find_conditions_in_db( + db_config, table_name="submission_table", conditions=conditions + ) + for row in submitting_project: + group_key = {"group_id": row["group_id"], "organism": row["organism"]} + seq_key = {"accession": row["accession"], "version": row["version"]} + + # 1. check if there exists an entry in the project table for (group_id, organism) + corresponding_project = find_conditions_in_db( + db_config, table_name="project_table", conditions=group_key + ) + if len(corresponding_project) == 1: + if corresponding_project[0]["status"] == Status.SUBMITTED.name: + update_values = {"status_all": StatusAll.SUBMITTED_PROJECT.name} + number_rows_updated = update_db_where_conditions( + db_config, + table_name="submission_table", + conditions=seq_key, + update_values=update_values, + ) + if number_rows_updated > 1: + raise psycopg2.DatabaseError( + "found multiple rows in submission_table with same primary key" + ) + elif len(corresponding_project) > 1: + raise psycopg2.DatabaseError( + "found multiple rows in project_table with same primary key" + ) # Check project_table for newly added sequences conditions = {"status": Status.READY.name} ready_to_submit_project = find_conditions_in_db( db_config, table_name="project_table", conditions=conditions ) for row in ready_to_submit_project: + group_key = {"group_id": row["group_id"], "organism": row["organism"]} + metadata_dict = config.organisms[row["organism"]]["ingest"] group_info = get_group_info(config, row["group_id"])[0]["group"] - submission_project = SubmissionProject( - organism=OrganismType( - taxon_id=metadata_dict["taxon_id"], - scientific_name=metadata_dict["scientific_name"], + + project_set = construct_project_set_object(group_info, config, metadata_dict, row) + try: + update_values = {"status": Status.SUBMITTING.name} + number_rows_updated = update_db_where_conditions( + db_config, + table_name="project_table", + conditions=group_key, + update_values=update_values, ) - ) - project_link = ProjectLink(xref_link=XrefType(db=config.db_name, id=row["group_id"])) - project_config = { - "center_name": XmlAttribute(group_info["institution"]), - "alias": XmlAttribute( - f"{row["group_id"]}:{row["organism"]}:{config.unique_project_suffix}" - ), - "name": metadata_dict["scientific_name"], - "title": f"{metadata_dict["scientific_name"]}: Genome sequencing", - "description": ( - f"Automated upload of {metadata_dict["scientific_name"]} sequences ", - f"submitted by {group_info["institution"]} from {config.db_name}", - ), - "submission_project": submission_project, - "project_links": [project_link], - } - project_type = {"project": [ProjectType(**project_config)]} - project_set = ProjectSet(**project_type) - create_project_xml(config, project_set, root_name="project_set") - return + if number_rows_updated > 1: + raise psycopg2.DatabaseError( + "found multiple rows in project table with same primary key" + ) + results = create_ena_project(config, project_set) + update_values = {"status": Status.SUBMITTED.name, "result": json.dumps(results)} + number_rows_updated = update_db_where_conditions( + db_config, + table_name="project_table", + conditions=group_key, + update_values=update_values, + ) + if number_rows_updated > 1: + raise psycopg2.DatabaseError( + "found multiple rows in project table with same primary key" + ) + except requests.exceptions.RequestException as e: + # set to has_errors, add errors + update_values = {"status": Status.HAS_ERRORS.name, "errors": json.dumps(e.response)} + number_rows_updated = update_db_where_conditions( + db_config, + table_name="project_table", + conditions=group_key, + update_values=update_values, + ) + if number_rows_updated > 1: + raise psycopg2.DatabaseError( + "found multiple rows in project table with same primary key" + ) + # Check project_table for sequences with errors or in submitting status for too long + entries_with_errors = find_conditions_in_db( + db_config, table_name="project_table", conditions={"status": Status.HAS_ERRORS.name} + ) + # entries_taking_too_long = find_conditions_in_db( + # db_config, table_name="project_table", conditions={"status": Status.SUBMITTING.name, "started_at": } + # ) + # Query ENA if data has already gotten there. + # Resubmit for x number of times. if __name__ == "__main__": diff --git a/ena-submission/scripts/ena_submission_helper.py b/ena-submission/scripts/ena_submission_helper.py index c60e46ea2c..8f21cd0ac4 100644 --- a/ena-submission/scripts/ena_submission_helper.py +++ b/ena-submission/scripts/ena_submission_helper.py @@ -3,7 +3,7 @@ import requests import xmltodict -from ena_types import ProjectType, XmlAttribute +from ena_types import ProjectSet, XmlAttribute from requests.auth import HTTPBasicAuth @@ -11,28 +11,29 @@ def recursive_defaultdict(): return defaultdict(recursive_defaultdict) -def dataclass_to_xml(dataclass_instance, root_name="root"): - def dataclass_to_dict(dataclass_instance): - """ - Converts a dataclass instance to a dictionary, handling nested dataclasses. - """ - if not hasattr(dataclass_instance, "__dataclass_fields__"): - return dataclass_instance - result = {} - for field in dataclass_instance.__dataclass_fields__: - value = getattr(dataclass_instance, field) - is_xml_attribute = isinstance(value, XmlAttribute) - if value is None: - continue - if isinstance(value, list): - result[field] = [dataclass_to_dict(item) for item in value] - elif is_xml_attribute: - attribute_field = "@" + field - result[attribute_field] = value - else: - result[field] = dataclass_to_dict(value) - return result +def dataclass_to_dict(dataclass_instance): + """ + Converts a dataclass instance to a dictionary, handling nested dataclasses. + """ + if not hasattr(dataclass_instance, "__dataclass_fields__"): + return dataclass_instance + result = {} + for field in dataclass_instance.__dataclass_fields__: + value = getattr(dataclass_instance, field) + is_xml_attribute = isinstance(value, XmlAttribute) + if value is None: + continue + if isinstance(value, list): + result[field] = [dataclass_to_dict(item) for item in value] + elif is_xml_attribute: + attribute_field = "@" + field + result[attribute_field] = value + else: + result[field] = dataclass_to_dict(value) + return result + +def dataclass_to_xml(dataclass_instance, root_name="root"): dataclass_dict = dataclass_to_dict(dataclass_instance) return xmltodict.unparse({root_name: dataclass_dict}, pretty=True) @@ -43,71 +44,51 @@ def get_submission_dict(): return submission -def create_project_xml(config, project_type: ProjectType, root_name): - def get_project_xml(project_type): +def create_ena_project(config, project_set: ProjectSet): + def get_project_xml(project_set): submission_set = get_submission_dict() - project_set = dataclass_to_xml(project_type, root_name=root_name) - print(project_set) + project_set = {"project_set": dataclass_to_dict(project_set)} webin = {"WEBIN": {**submission_set, **project_set}} return xmltodict.unparse(webin, pretty=True) - xml = get_project_xml(project_type) - print(xml) - return xml - # response = post_webin(xml, config) - # return response - - -def create_sample(config): - def get_sample_xml(alias, taxon_id, scientific_name, attributes): - submission_set = get_submission_dict() - sample_set = recursive_defaultdict() - sample = { - "@alias": f"{alias}{random.randint(1000, 9999)}", - "TITLE": "titleTBD", - "SAMPLE_NAME": { - "TAXON_ID": taxon_id, - "SCIENTIFIC_NAME": scientific_name, - "COMMON_NAME": None, - }, - "SAMPLE_ATTRIBUTES": { - "SAMPLE_ATTRIBUTE": [ - {"TAG": key, "VALUE": value} for key, value in attributes.items() - ] - }, - } - sample_set["SAMPLE_SET"]["SAMPLE"] = sample - webin = {"WEBIN": {**submission_set, **sample_set}} - return xmltodict.unparse(webin, pretty=True) - - xml = get_sample_xml( - "aliasTBD", - 1284369, - "nameTBD", - { - "collection date": "not collected", - "geographic location (country and/or sea)": "not collected", - "ENA-CHECKLIST": "ERC000011", - }, - ) + xml = get_project_xml(project_set) response = post_webin(xml, config) - return response - - -def create_assembly(config): - # Your code for create_assembly would go here - pass + valid = ( + response["RECEIPT"]["@success"] == "true" + and response["RECEIPT"]["PROJECT"]["@accession"] + and response["RECEIPT"]["SUBMISSION"]["@accession"] + ) + if not valid: + raise requests.exceptions.RequestException + project_results = { + "ena_project_accession": response["RECEIPT"]["PROJECT"]["@accession"], + "ena_submission_accession": response["RECEIPT"]["SUBMISSION"]["@accession"], + } + return project_results def post_webin(xml, config): headers = {"Accept": "application/xml", "Content-Type": "application/xml"} - response = requests.post( - config.url, - auth=HTTPBasicAuth(config.username, config.password), - data=xml, - headers=headers, - ) - if response.status_code == 200: - return xmltodict.parse(response.text) - else: - raise Exception("Error:", response.status_code, response.text) + try: + # response = requests.post( + # config.url, + # auth=HTTPBasicAuth(config.username, config.password), + # data=xml, + # headers=headers, + # timeout=10, # wait a full 10 seconds for a response incase slow + # ) + # response.raise_for_status() + response = """ + + + + + + This submission is a TEST submission and will be discarded within 24 hours + + ADD + """ + except requests.exceptions.RequestException as e: + raise (e) + # return xmltodict.parse(response.text) + return xmltodict.parse(response) diff --git a/ena-submission/scripts/submission_db_helper.py b/ena-submission/scripts/submission_db_helper.py index e1e54e0f5a..205f6c0d9b 100644 --- a/ena-submission/scripts/submission_db_helper.py +++ b/ena-submission/scripts/submission_db_helper.py @@ -128,7 +128,7 @@ def update_db_where_conditions(db_config, table_name, conditions, update_values) updated_row_count = 0 try: query = f"UPDATE {table_name} SET " - query += " AND ".join([f"{key}='{value}'" for key, value in update_values.items()]) + query += ", ".join([f"{key}='{value}'" for key, value in update_values.items()]) query += " WHERE " query += " AND ".join([f"{key}=%s" for key in conditions])