Skip to content

Commit

Permalink
Merge pull request PGScatalog#356 from fyvon/bug/litsuggest_duplicate…
Browse files Browse the repository at this point in the history
…_study_names

Bug/litsuggest duplicate study names
  • Loading branch information
fyvon authored Apr 23, 2024
2 parents 53051bc + 8b10f94 commit d8515ff
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 31 deletions.
82 changes: 51 additions & 31 deletions curation_tracker/litsuggest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
pgs_db = 'default'
curation_tracker_db = 'curation_tracker'


class CurationPublicationAnnotationImport():
"""Wrapper class for CurationPublicationAnnotation for providing additional transient attributes"""
error: str
Expand All @@ -18,7 +19,7 @@ class CurationPublicationAnnotationImport():
triage_info: dict

def __init__(self, model: CurationPublicationAnnotation = CurationPublicationAnnotation()):
self.annotation = model# if model else CurationPublicationAnnotation()
self.annotation = model # if model else CurationPublicationAnnotation()
self.error = None
self.skip_reason = None
self.triage_info = {}
Expand All @@ -32,34 +33,37 @@ def to_dict(self) -> dict:
def is_valid(self) -> bool:
"""Should be used before saving"""
return self.error == None

def is_importable(self) -> bool:
return self.skip_reason == None

def __next_id_number(self) -> int:
assigned = 1
if len(CurationPublicationAnnotation.objects.using(curation_tracker_db).all()) != 0:
assigned = CurationPublicationAnnotation.objects.using(curation_tracker_db).latest().pk + 1
return assigned

def save(self, *args, **kwargs) -> None:
"""Set the identifiers and save the contained CurationPublicationAnnotation object"""
annotation = self.annotation
if annotation.num == None:
annotation.set_annotation_ids(self.__next_id_number())
return annotation.save(*args, **kwargs)


class ImportException(Exception):
pass


def get_pgs_publication(pmid):
publication = None
try:
publication = Publication.objects.get(PMID=pmid)
publication = Publication.objects.get(PMID=pmid)
except Publication.DoesNotExist:
publication = None
publication = None
return publication


def assert_study_doesnt_exist(pmid):
"""
Checks if the study is already present as a Publication or CurationPublicationAnnotation. If yes, throws an ImportException.
Expand All @@ -68,7 +72,7 @@ def assert_study_doesnt_exist(pmid):
raise ImportException(f'Study {pmid} already exists in the PGS Catalog database')
if CurationPublicationAnnotation.objects.using(curation_tracker_db).filter(PMID=pmid).exists():
raise ImportException(f'Study annotation {pmid} already exists in the Curation Tracker database')


def get_publication_info_from_epmc(pmid) -> dict:
payload = {'format': 'json'}
Expand All @@ -90,6 +94,7 @@ def get_publication_info_from_epmc(pmid) -> dict:
raise ImportException("This pubmed ID returned no result")
return info


def get_publication_info_from_epmc_doi(doi) -> dict:
payload = {'format': 'json'}
query = f'doi:{doi}'
Expand All @@ -110,9 +115,11 @@ def get_publication_info_from_epmc_doi(doi) -> dict:
raise ImportException("This DOI returned no result")
return info


def get_next_unique_study_name(study_name):
unique_name = study_name
for existing_study_name in CurationPublicationAnnotation.objects.using(curation_tracker_db).filter(study_name__startswith=study_name):
for existing_study_name in CurationPublicationAnnotation.objects.using(curation_tracker_db).filter(
study_name__startswith=study_name):
name_elements = existing_study_name.split('_')
name_stub = name_elements[0]
name_index = 0
Expand All @@ -121,40 +128,45 @@ def get_next_unique_study_name(study_name):
if last_element.isnumeric():
name_index = int(last_element)
name_index += 1
unique_name = name_stub+'_'+name_index
unique_name = name_stub + '_' + name_index

return unique_name


def create_new_annotation(publication_info) -> CurationPublicationAnnotation:
model = CurationPublicationAnnotation()
for attr in ['PMID','journal','doi','title','year','publication_date']:
value = publication_info.get(attr, None)
for attr in ['PMID', 'journal', 'doi', 'title', 'year', 'publication_date']:
value = publication_info.get(attr, None)
setattr(model, attr, value)

authors = publication_info.get('authors',None)
authors = publication_info.get('authors', None)
if authors is None:
authors = 'NoAuthor N' # last char so the string has the same format as a regular authors string
authors = 'NoAuthor N' # last char so the string has the same format as a regular authors string

model.study_name = '-'.join(authors.split(',')[0].split(' ')[:-1]) \
+ publication_info.get('year', 'NoDate')

model.study_name = '-'.join(authors.split(',')[0].split(' ')[:-1])\
+ publication_info.get('year','NoDate')

return model


def annotation_import_to_dict(annotation_import: CurationPublicationAnnotationImport) -> dict:
d = dict()
for attr in ['error','skip_reason']:
d[attr] = getattr(annotation_import,attr)
for attr in ['error', 'skip_reason']:
d[attr] = getattr(annotation_import, attr)
d['model'] = annotation_to_dict(annotation_import.annotation)
return d


def annotation_to_dict(model: CurationPublicationAnnotation) -> dict:
model_dict = dict()
for attr in ['PMID','study_name','doi','journal','title','year','eligibility','comment',
'eligibility_dev_score','eligibility_eval_score','eligibility_description','first_level_curation_status','curation_status',
for attr in ['PMID', 'study_name', 'doi', 'journal', 'title', 'year', 'eligibility', 'comment',
'eligibility_dev_score', 'eligibility_eval_score', 'eligibility_description',
'first_level_curation_status', 'curation_status',
'publication_date']:
model_dict[attr] = getattr(model,attr)
model_dict[attr] = getattr(model, attr)
return model_dict


def dict_to_annotation_import(d: dict) -> CurationPublicationAnnotationImport:
model = CurationPublicationAnnotation()
model_import = CurationPublicationAnnotationImport()
Expand All @@ -166,15 +178,17 @@ def dict_to_annotation_import(d: dict) -> CurationPublicationAnnotationImport:
del d['skip_reason']
model_dict = d['model']
for k in model_dict.keys():
setattr(model,k,model_dict[k])
setattr(model, k, model_dict[k])
model_import.annotation = model
return model_import

def check_study_name(study_name: str) -> str:
''' Check that the study_name is unique. Otherwise it will add incremental number as suffix '''

def check_study_name(study_name: str, imported_study_names: list[str]) -> str:
""" Check that the study_name is unique. Otherwise, it will add incremental number as suffix """
queryset = CurationPublicationAnnotation.objects.using(curation_tracker_db).filter(study_name=study_name).count()
if queryset:
sn_list = CurationPublicationAnnotation.objects.using(curation_tracker_db).values_list('study_name',flat=True)
sn_list = imported_study_names + list(
CurationPublicationAnnotation.objects.using(curation_tracker_db).values_list('study_name', flat=True))
num = 2
new_study_name = f'{study_name}_{num}'
while new_study_name in sn_list:
Expand All @@ -183,12 +197,14 @@ def check_study_name(study_name: str) -> str:
study_name = new_study_name
return study_name


def _litsuggest_IO_to_annotation_imports(litsuggest_file) -> List[CurationPublicationAnnotationImport]:
models = []
reader = csv.DictReader(litsuggest_file, delimiter='\t')
imported_study_names = []
for row in reader:
if not row['pmid']:
break # litsuggest files might contain a lot of empty rows after the relevant ones
break # litsuggest files might contain a lot of empty rows after the relevant ones
pmid = str(int(row['pmid']))
try:
triage_decision = row['triage.decision']
Expand All @@ -198,7 +214,9 @@ def _litsuggest_IO_to_annotation_imports(litsuggest_file) -> List[CurationPublic
assert_study_doesnt_exist(pmid)
study_epmc_info = get_publication_info_from_epmc(pmid)
annotationModel = create_new_annotation(study_epmc_info)
annotationModel.study_name = check_study_name(annotationModel.study_name)
study_name = check_study_name(annotationModel.study_name, imported_study_names)
annotationModel.study_name = study_name
imported_study_names.append(study_name)

triage_note = row['triage.note']
annotationModel.eligibility_description = triage_note
Expand All @@ -219,7 +237,7 @@ def _litsuggest_IO_to_annotation_imports(litsuggest_file) -> List[CurationPublic
annotationModel.first_level_curation_status = 'Contact author'
annotationModel.curation_status = 'Awaiting L1'
case 'TBD':
#annotationModel.eligibility = True
# annotationModel.eligibility = True
annotationModel.first_level_curation_status = 'Awaiting access'
case 'PGS Relevant':
annotationModel.eligibility = False
Expand All @@ -236,7 +254,7 @@ def _litsuggest_IO_to_annotation_imports(litsuggest_file) -> List[CurationPublic
annotationModel.eligibility_eval_score = 'y'
case _:
raise ImportException(f'Unexpected triage decision: {triage_decision}')

models.append(annotation_import)
except ImportException as e:
annotation_import = CurationPublicationAnnotationImport(CurationPublicationAnnotation())
Expand All @@ -246,10 +264,12 @@ def _litsuggest_IO_to_annotation_imports(litsuggest_file) -> List[CurationPublic

return models


def litsuggest_filename_to_annotation_imports(litsuggest_file_name: str) -> List[CurationPublicationAnnotationImport]:
with open(litsuggest_file_name, 'r') as litsuggest_file:
return _litsuggest_IO_to_annotation_imports(litsuggest_file)



def litsuggest_fileupload_to_annotation_imports(litsuggest_file_upload: InMemoryUploadedFile) -> List[CurationPublicationAnnotationImport]:
file_wrapper = TextIOWrapper(litsuggest_file_upload.file)
return _litsuggest_IO_to_annotation_imports(file_wrapper)
return _litsuggest_IO_to_annotation_imports(file_wrapper)
1 change: 1 addition & 0 deletions curation_tracker/scripts/import_litsuggest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

litsuggest_dir = '/home/florent/PGS_Catalog/Curation/Litsuggest/'


def run():

files = [f for f in os.listdir(litsuggest_dir) if os.path.isfile(f'{litsuggest_dir}/{f}')]
Expand Down

0 comments on commit d8515ff

Please sign in to comment.