Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(APS): Author parsing through xml files #238

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 32 additions & 12 deletions dags/aps/aps_process_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import pendulum
from airflow.decorators import dag, task
from aps.parser import APSParser
from aps.xml_parser import APSParserXML
from aps.repository import APSRepository
from common.enhancer import Enhancer
from common.enricher import Enricher
Expand All @@ -29,6 +30,12 @@ def enrich_aps(enhanced_file):
return Enricher()(enhanced_file)


def replace_authors(parsed_file, parsed_xml):
parsed_file["authors"] = parsed_xml["authors"]

return parsed_file


@dag(schedule=None, start_date=pendulum.today("UTC").add(days=-1))
def aps_process_file():
s3_client = APSRepository()
Expand All @@ -45,12 +52,6 @@ def enhance(parsed_file):
raise EmptyOutputFromPreviousTask("parse")
return enhance_aps(parsed_file)

@task()
def enrich(enhanced_file):
if not enhanced_file:
raise EmptyOutputFromPreviousTask("enhance")
return enrich_aps(enhanced_file)

@task()
def populate_files(parsed_file):
if "dois" not in parsed_file:
Expand All @@ -68,21 +69,40 @@ def populate_files(parsed_file):
parsed_file["files"] = downloaded_files
logger.info("Files populated", files=parsed_file["files"])
return parsed_file

@task()
def enrich(enhanced_file):
if not enhanced_file:
raise EmptyOutputFromPreviousTask("enhance")
return enrich_aps(enhanced_file)

@task()
def parse_xml(xml_file):
parser = APSParserXML()
parsed = parser.parse(xml_file)

return parsed

@task
def replace_authors(parsed_file, parsed_xml):
replace_authors(parsed_file, parsed_xml)

@task()
def save_to_s3(enriched_file):
upload_json_to_s3(json_record=enriched_file, repo=s3_client)
def save_to_s3(complete_file):
upload_json_to_s3(json_record=complete_file, repo=s3_client)

@task()
def create_or_update(enriched_file):
create_or_update_article(enriched_file)
def create_or_update(complete_file):
create_or_update_article(complete_file)

parsed_file = parse()
enhanced_file = enhance(parsed_file)
enhanced_file_with_files = populate_files(enhanced_file)
enriched_file = enrich(enhanced_file_with_files)
save_to_s3(enriched_file=enriched_file)
create_or_update(enriched_file)
parsed_xml = parse_xml(enriched_file["files"]["xml"])
complete_file = replace_authors(enriched_file, parsed_xml)
save_to_s3(complete_file)
create_or_update(complete_file)


dag_for_aps_files_processing = aps_process_file()
86 changes: 86 additions & 0 deletions dags/aps/xml_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from common.parsing.parser import IParser
from common.parsing.xml_extractors import (
CustomExtractor,
)
from structlog import get_logger
import re


class APSParserXML(IParser):
def __init__(self, file_path=None):
self.file_path = file_path
self.logger = get_logger().bind(class_name=type(self).__name__)
self.dois = None

extractors = [
CustomExtractor("authors", self._get_authors),
]
super().__init__(extractors)

def _get_affiliations(self, article, contrib):
affiliations = []

xref_elements = contrib.findall(".//xref[@ref-type='aff']")

if not xref_elements:
self.logger.msg("No affiliations found for this author.")
return affiliations

for xref in xref_elements:
ref_id = xref.get("rid")

if ref_id:
affiliation_node = article.find(f".//aff[@id='{ref_id}']")

if affiliation_node is not None:
full_text_parts = []
ror = None

for child in affiliation_node.iter():
if child.tag == "institution-id" and child.get("institution-id-type") == "ror":
ror = child.text
elif child.tag not in ["label", "sup", "institution-id"]:
if child.text:
full_text_parts.append(child.text.strip())
if child.tail:
full_text_parts.append(child.tail.strip())

raw_aff_text = " ".join(filter(None, full_text_parts))
aff_text = re.sub(r'\s*,\s*,*', ', ', raw_aff_text)
aff_text = re.sub(r'\s+', ' ', aff_text).strip()

affiliations.append({"value": aff_text, "ror": ror})
else:
self.logger.msg(f"Affiliation with id '{ref_id}' not found.")

return affiliations


def _get_authors(self, article):
authors = []

contrib_group = article.find("./front/article-meta/contrib-group")
if contrib_group is None:
return authors

for contrib in contrib_group.findall("./contrib[@contrib-type='author']"):
author = {}

name = contrib.find("./name")
if name is not None:
given_names = name.find("given-names")
surname = name.find("surname")
author["given_names"] = given_names.text if given_names is not None else ""
author["surname"] = surname.text if surname is not None else ""
author["full_name"] = f"{author['given_names']} {author['surname']}".strip()

orcid = contrib.find("./contrib-id[@contrib-id-type='orcid']")
if orcid is not None:
author["orcid"] = orcid.text

author["affiliations"] = self._get_affiliations(article, contrib)

authors.append(author)

return authors

1 change: 1 addition & 0 deletions tests/integration/aps/data/expected_json_output.json

Large diffs are not rendered by default.

Loading
Loading