From f26a4f97c6f67dc655b7849275f6b395396b0513 Mon Sep 17 00:00:00 2001 From: Florent Yvon Date: Tue, 23 Apr 2024 16:23:33 +0100 Subject: [PATCH 1/2] Comparing study name to others in the import to avoid duplicates --- curation_tracker/litsuggest.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/curation_tracker/litsuggest.py b/curation_tracker/litsuggest.py index a7e1ddf7..f70abd42 100644 --- a/curation_tracker/litsuggest.py +++ b/curation_tracker/litsuggest.py @@ -170,11 +170,11 @@ def dict_to_annotation_import(d: dict) -> CurationPublicationAnnotationImport: model_import.annotation = model return model_import -def check_study_name(study_name: str) -> str: +def check_study_name(study_name: str, imported_study_names: list[str]) -> str: ''' Check that the study_name is unique. Otherwise it will add incremental number as suffix ''' queryset = CurationPublicationAnnotation.objects.using(curation_tracker_db).filter(study_name=study_name).count() if queryset: - sn_list = CurationPublicationAnnotation.objects.using(curation_tracker_db).values_list('study_name',flat=True) + sn_list = imported_study_names + list(CurationPublicationAnnotation.objects.using(curation_tracker_db).values_list('study_name', flat=True)) num = 2 new_study_name = f'{study_name}_{num}' while new_study_name in sn_list: @@ -186,6 +186,7 @@ def check_study_name(study_name: str) -> str: def _litsuggest_IO_to_annotation_imports(litsuggest_file) -> List[CurationPublicationAnnotationImport]: models = [] reader = csv.DictReader(litsuggest_file, delimiter='\t') + imported_study_names = [] for row in reader: if not row['pmid']: break # litsuggest files might contain a lot of empty rows after the relevant ones @@ -198,7 +199,9 @@ def _litsuggest_IO_to_annotation_imports(litsuggest_file) -> List[CurationPublic assert_study_doesnt_exist(pmid) study_epmc_info = get_publication_info_from_epmc(pmid) annotationModel = create_new_annotation(study_epmc_info) - annotationModel.study_name = check_study_name(annotationModel.study_name) + study_name = check_study_name(annotationModel.study_name, imported_study_names) + annotationModel.study_name = study_name + imported_study_names.append(study_name) triage_note = row['triage.note'] annotationModel.eligibility_description = triage_note From 8b10f94c13060fc86a1358728b75159ec7411a20 Mon Sep 17 00:00:00 2001 From: Florent Yvon Date: Tue, 23 Apr 2024 16:30:27 +0100 Subject: [PATCH 2/2] PEP8 reformatting --- curation_tracker/litsuggest.py | 75 ++++++++++++------- curation_tracker/scripts/import_litsuggest.py | 1 + 2 files changed, 47 insertions(+), 29 deletions(-) diff --git a/curation_tracker/litsuggest.py b/curation_tracker/litsuggest.py index f70abd42..002bd4b5 100644 --- a/curation_tracker/litsuggest.py +++ b/curation_tracker/litsuggest.py @@ -10,6 +10,7 @@ pgs_db = 'default' curation_tracker_db = 'curation_tracker' + class CurationPublicationAnnotationImport(): """Wrapper class for CurationPublicationAnnotation for providing additional transient attributes""" error: str @@ -18,7 +19,7 @@ class CurationPublicationAnnotationImport(): triage_info: dict def __init__(self, model: CurationPublicationAnnotation = CurationPublicationAnnotation()): - self.annotation = model# if model else CurationPublicationAnnotation() + self.annotation = model # if model else CurationPublicationAnnotation() self.error = None self.skip_reason = None self.triage_info = {} @@ -32,16 +33,16 @@ def to_dict(self) -> dict: def is_valid(self) -> bool: """Should be used before saving""" return self.error == None - + def is_importable(self) -> bool: return self.skip_reason == None - + def __next_id_number(self) -> int: assigned = 1 if len(CurationPublicationAnnotation.objects.using(curation_tracker_db).all()) != 0: assigned = CurationPublicationAnnotation.objects.using(curation_tracker_db).latest().pk + 1 return assigned - + def save(self, *args, **kwargs) -> None: """Set the identifiers and save the contained CurationPublicationAnnotation object""" annotation = self.annotation @@ -49,17 +50,20 @@ def save(self, *args, **kwargs) -> None: annotation.set_annotation_ids(self.__next_id_number()) return annotation.save(*args, **kwargs) + class ImportException(Exception): pass + def get_pgs_publication(pmid): publication = None try: - publication = Publication.objects.get(PMID=pmid) + publication = Publication.objects.get(PMID=pmid) except Publication.DoesNotExist: - publication = None + publication = None return publication + def assert_study_doesnt_exist(pmid): """ Checks if the study is already present as a Publication or CurationPublicationAnnotation. If yes, throws an ImportException. @@ -68,7 +72,7 @@ def assert_study_doesnt_exist(pmid): raise ImportException(f'Study {pmid} already exists in the PGS Catalog database') if CurationPublicationAnnotation.objects.using(curation_tracker_db).filter(PMID=pmid).exists(): raise ImportException(f'Study annotation {pmid} already exists in the Curation Tracker database') - + def get_publication_info_from_epmc(pmid) -> dict: payload = {'format': 'json'} @@ -90,6 +94,7 @@ def get_publication_info_from_epmc(pmid) -> dict: raise ImportException("This pubmed ID returned no result") return info + def get_publication_info_from_epmc_doi(doi) -> dict: payload = {'format': 'json'} query = f'doi:{doi}' @@ -110,9 +115,11 @@ def get_publication_info_from_epmc_doi(doi) -> dict: raise ImportException("This DOI returned no result") return info + def get_next_unique_study_name(study_name): unique_name = study_name - for existing_study_name in CurationPublicationAnnotation.objects.using(curation_tracker_db).filter(study_name__startswith=study_name): + for existing_study_name in CurationPublicationAnnotation.objects.using(curation_tracker_db).filter( + study_name__startswith=study_name): name_elements = existing_study_name.split('_') name_stub = name_elements[0] name_index = 0 @@ -121,40 +128,45 @@ def get_next_unique_study_name(study_name): if last_element.isnumeric(): name_index = int(last_element) name_index += 1 - unique_name = name_stub+'_'+name_index + unique_name = name_stub + '_' + name_index return unique_name + def create_new_annotation(publication_info) -> CurationPublicationAnnotation: model = CurationPublicationAnnotation() - for attr in ['PMID','journal','doi','title','year','publication_date']: - value = publication_info.get(attr, None) + for attr in ['PMID', 'journal', 'doi', 'title', 'year', 'publication_date']: + value = publication_info.get(attr, None) setattr(model, attr, value) - authors = publication_info.get('authors',None) + authors = publication_info.get('authors', None) if authors is None: - authors = 'NoAuthor N' # last char so the string has the same format as a regular authors string + authors = 'NoAuthor N' # last char so the string has the same format as a regular authors string + + model.study_name = '-'.join(authors.split(',')[0].split(' ')[:-1]) \ + + publication_info.get('year', 'NoDate') - model.study_name = '-'.join(authors.split(',')[0].split(' ')[:-1])\ - + publication_info.get('year','NoDate') - return model + def annotation_import_to_dict(annotation_import: CurationPublicationAnnotationImport) -> dict: d = dict() - for attr in ['error','skip_reason']: - d[attr] = getattr(annotation_import,attr) + for attr in ['error', 'skip_reason']: + d[attr] = getattr(annotation_import, attr) d['model'] = annotation_to_dict(annotation_import.annotation) return d + def annotation_to_dict(model: CurationPublicationAnnotation) -> dict: model_dict = dict() - for attr in ['PMID','study_name','doi','journal','title','year','eligibility','comment', - 'eligibility_dev_score','eligibility_eval_score','eligibility_description','first_level_curation_status','curation_status', + for attr in ['PMID', 'study_name', 'doi', 'journal', 'title', 'year', 'eligibility', 'comment', + 'eligibility_dev_score', 'eligibility_eval_score', 'eligibility_description', + 'first_level_curation_status', 'curation_status', 'publication_date']: - model_dict[attr] = getattr(model,attr) + model_dict[attr] = getattr(model, attr) return model_dict + def dict_to_annotation_import(d: dict) -> CurationPublicationAnnotationImport: model = CurationPublicationAnnotation() model_import = CurationPublicationAnnotationImport() @@ -166,15 +178,17 @@ def dict_to_annotation_import(d: dict) -> CurationPublicationAnnotationImport: del d['skip_reason'] model_dict = d['model'] for k in model_dict.keys(): - setattr(model,k,model_dict[k]) + setattr(model, k, model_dict[k]) model_import.annotation = model return model_import + def check_study_name(study_name: str, imported_study_names: list[str]) -> str: - ''' Check that the study_name is unique. Otherwise it will add incremental number as suffix ''' + """ Check that the study_name is unique. Otherwise, it will add incremental number as suffix """ queryset = CurationPublicationAnnotation.objects.using(curation_tracker_db).filter(study_name=study_name).count() if queryset: - sn_list = imported_study_names + list(CurationPublicationAnnotation.objects.using(curation_tracker_db).values_list('study_name', flat=True)) + sn_list = imported_study_names + list( + CurationPublicationAnnotation.objects.using(curation_tracker_db).values_list('study_name', flat=True)) num = 2 new_study_name = f'{study_name}_{num}' while new_study_name in sn_list: @@ -183,13 +197,14 @@ def check_study_name(study_name: str, imported_study_names: list[str]) -> str: study_name = new_study_name return study_name + def _litsuggest_IO_to_annotation_imports(litsuggest_file) -> List[CurationPublicationAnnotationImport]: models = [] reader = csv.DictReader(litsuggest_file, delimiter='\t') imported_study_names = [] for row in reader: if not row['pmid']: - break # litsuggest files might contain a lot of empty rows after the relevant ones + break # litsuggest files might contain a lot of empty rows after the relevant ones pmid = str(int(row['pmid'])) try: triage_decision = row['triage.decision'] @@ -222,7 +237,7 @@ def _litsuggest_IO_to_annotation_imports(litsuggest_file) -> List[CurationPublic annotationModel.first_level_curation_status = 'Contact author' annotationModel.curation_status = 'Awaiting L1' case 'TBD': - #annotationModel.eligibility = True + # annotationModel.eligibility = True annotationModel.first_level_curation_status = 'Awaiting access' case 'PGS Relevant': annotationModel.eligibility = False @@ -239,7 +254,7 @@ def _litsuggest_IO_to_annotation_imports(litsuggest_file) -> List[CurationPublic annotationModel.eligibility_eval_score = 'y' case _: raise ImportException(f'Unexpected triage decision: {triage_decision}') - + models.append(annotation_import) except ImportException as e: annotation_import = CurationPublicationAnnotationImport(CurationPublicationAnnotation()) @@ -249,10 +264,12 @@ def _litsuggest_IO_to_annotation_imports(litsuggest_file) -> List[CurationPublic return models + def litsuggest_filename_to_annotation_imports(litsuggest_file_name: str) -> List[CurationPublicationAnnotationImport]: with open(litsuggest_file_name, 'r') as litsuggest_file: return _litsuggest_IO_to_annotation_imports(litsuggest_file) - + + def litsuggest_fileupload_to_annotation_imports(litsuggest_file_upload: InMemoryUploadedFile) -> List[CurationPublicationAnnotationImport]: file_wrapper = TextIOWrapper(litsuggest_file_upload.file) - return _litsuggest_IO_to_annotation_imports(file_wrapper) \ No newline at end of file + return _litsuggest_IO_to_annotation_imports(file_wrapper) diff --git a/curation_tracker/scripts/import_litsuggest.py b/curation_tracker/scripts/import_litsuggest.py index 6f8985f7..13aae84c 100644 --- a/curation_tracker/scripts/import_litsuggest.py +++ b/curation_tracker/scripts/import_litsuggest.py @@ -3,6 +3,7 @@ litsuggest_dir = '/home/florent/PGS_Catalog/Curation/Litsuggest/' + def run(): files = [f for f in os.listdir(litsuggest_dir) if os.path.isfile(f'{litsuggest_dir}/{f}')]