diff --git a/dags/aps/aps_process_file.py b/dags/aps/aps_process_file.py index 5689e3e8..834d1853 100644 --- a/dags/aps/aps_process_file.py +++ b/dags/aps/aps_process_file.py @@ -51,12 +51,6 @@ def enrich(enhanced_file): raise EmptyOutputFromPreviousTask("enhance") return enrich_aps(enhanced_file) - @task() - def validate_record(enriched_file): - if not enriched_file: - raise EmptyOutputFromPreviousTask("enrich") - return aps_validate_record(enriched_file) - @task() def create_or_update(enriched_file): create_or_update_article(enriched_file) @@ -64,8 +58,7 @@ def create_or_update(enriched_file): parsed_file = parse() enhanced_file = enhance(parsed_file) enriched_file = enrich(enhanced_file) - validated_record = validate_record(enriched_file) - create_or_update(validated_record) + create_or_update(enriched_file) dag_for_aps_files_processing = aps_process_file() diff --git a/dags/aps/parser.py b/dags/aps/parser.py index 43b263a4..3a8f3b9d 100644 --- a/dags/aps/parser.py +++ b/dags/aps/parser.py @@ -6,6 +6,8 @@ from inspire_utils.record import get_value from structlog import get_logger +logger = get_logger() + class APSParser(IParser): def __init__(self) -> None: @@ -74,7 +76,10 @@ def __init__(self) -> None: extraction_function=lambda x: ["HEP", "Citeable", "Published"], ), CustomExtractor("field_categories", self._get_field_categories), - CustomExtractor("files", self._build_files_data), + CustomExtractor( + destination="files", + extraction_function=self._build_files_data, + ), ] super().__init__(extractors) @@ -122,20 +127,12 @@ def _get_field_categories(self, article): def _build_files_data(self, article): doi = get_value(article, "identifiers.doi") - return [ - { - "url": "http://harvest.aps.org/v2/journals/articles/{0}".format(doi), - "headers": {"Accept": "application/pdf"}, - "name": "{0}.pdf".format(doi), - "filetype": "pdf", - }, - { - "url": "http://harvest.aps.org/v2/journals/articles/{0}".format(doi), - "headers": {"Accept": "text/xml"}, - "name": "{0}.xml".format(doi), - "filetype": "xml", - }, - ] + files = { + "pdf": f"http://harvest.aps.org/v2/journals/articles/{doi}", + "xml": f"http://harvest.aps.org/v2/journals/articles/{doi}", + } + logger.info("Files data", files=files) + return files def _get_licenses(self, x): try: diff --git a/dags/common/cleanup.py b/dags/common/cleanup.py index 446bce46..ce63f35a 100644 --- a/dags/common/cleanup.py +++ b/dags/common/cleanup.py @@ -59,7 +59,6 @@ def remove_unnecessary_fields(obj): fieldnames = [ "curated", "citeable", - "files", "date_published", "source_file_path", "local_files", diff --git a/dags/common/enricher.py b/dags/common/enricher.py index 77093266..04c30f8d 100644 --- a/dags/common/enricher.py +++ b/dags/common/enricher.py @@ -111,7 +111,6 @@ def __call__(self, article): enriched_article = article.copy() enriched_article.update( { - "$schema": self._get_schema(), "arxiv_eprints": [ self._set_categories(eprint) for eprint in enriched_article.get("arxiv_eprints", []) @@ -120,4 +119,6 @@ def __call__(self, article): ) enriched_article = remove_empty_values(enriched_article) enriched_article = remove_unnecessary_fields(enriched_article) + + self.logger.info("Enriched article.", article=enriched_article) return enriched_article diff --git a/dags/common/pull_ftp.py b/dags/common/pull_ftp.py index 590ab23d..7a315ce7 100644 --- a/dags/common/pull_ftp.py +++ b/dags/common/pull_ftp.py @@ -15,7 +15,7 @@ def migrate_files( s_ftp, repo, logger: PrintLogger, - process_archives = True, + process_archives=True, ): logger.msg("Processing files.", filenames=archives_names) extracted_or_downloaded_filenames = [] @@ -58,7 +58,6 @@ def migrate_from_ftp( s_ftp, repo, logger, - publisher=None, **kwargs, ): params = kwargs["params"] @@ -74,10 +73,10 @@ def migrate_from_ftp( ) if force_pull_all_files: - return _force_pull(s_ftp, repo, logger, publisher, **kwargs) + return _force_pull(s_ftp, repo, logger, **kwargs) elif force_pull_specific_files: - return _filenames_pull(s_ftp, repo, logger, publisher, **kwargs) - return _differential_pull(s_ftp, repo, logger, publisher, **kwargs) + return _filenames_pull(s_ftp, repo, logger, **kwargs) + return _differential_pull(s_ftp, repo, logger, **kwargs) def reprocess_files(repo, logger, **kwargs): @@ -97,26 +96,19 @@ def _force_pull( logger.msg("Force Pulling from SFTP.") excluded_directories = kwargs["params"]["excluded_directories"] filenames = s_ftp.list_files(excluded_directories=excluded_directories) - process_archives = publisher != "elsevier" - return migrate_files( - filenames, s_ftp, repo, logger, process_archives=process_archives - ) + return migrate_files(filenames, s_ftp, repo, logger) def _filenames_pull( s_ftp, repo, logger, - publisher, **kwargs, ): filenames_pull_params = kwargs["params"]["filenames_pull"] filenames = filenames_pull_params["filenames"] logger.msg("Pulling specified filenames from SFTP") - process_archives = publisher != "elsevier" - return migrate_files( - filenames, s_ftp, repo, logger, process_archives=process_archives - ) + return migrate_files(filenames, s_ftp, repo, logger) def _find_files_in_zip(filenames, repo): @@ -137,7 +129,6 @@ def _differential_pull( s_ftp, repo, logger, - publisher, **kwargs, ): logger.msg("Pulling missing files only.") @@ -145,10 +136,7 @@ def _differential_pull( sftp_files = s_ftp.list_files(excluded_directories=excluded_directories) s3_files = repo.get_all_raw_filenames() diff_files = list(filter(lambda x: x not in s3_files, sftp_files)) - process_archives = publisher != "elsevier" - return migrate_files( - diff_files, s_ftp, repo, logger, process_archives=process_archives - ) + return migrate_files(diff_files, s_ftp, repo, logger) def trigger_file_processing( diff --git a/dags/common/scoap3_s3.py b/dags/common/scoap3_s3.py new file mode 100644 index 00000000..c04baa55 --- /dev/null +++ b/dags/common/scoap3_s3.py @@ -0,0 +1,99 @@ +import os +from uuid import uuid4 + +import requests +from common.repository import IRepository +from common.s3_service import S3Service +from structlog import get_logger + +logger = get_logger() + + +class Scoap3Repository(IRepository): + def __init__(self): + super().__init__() + self.bucket = os.getenv("SCOAP3_BUCKET_NAME", "scoap3") + self.s3 = S3Service(self.bucket) + self.client = self.s3.meta.client + + def copy_file(self, source_bucket, source_key, prefix=None): + if not prefix: + prefix = str(uuid4()) + + copy_source = {"Bucket": source_bucket, "Key": source_key} + filename = os.path.basename(source_key) + destination_key = f"{prefix}/{filename}" + + logger.info("Copying file from", copy_source=copy_source) + self.client.copy( + copy_source, + self.bucket, + destination_key, + ExtraArgs={ + "Metadata": { + "source_bucket": source_bucket, + "source_key": source_key, + }, + "MetadataDirective": "REPLACE", + }, + ) + logger.info( + f"Copied file from {source_bucket}/{source_key} to {self.bucket}/{destination_key}" + ) + return f"{self.bucket}/{destination_key}" + + def copy_files(self, bucket, files, prefix=None): + copied_files = {} + for type, path in files.items(): + try: + copied_files[type] = self.copy_file(bucket, path, prefix=prefix) + except Exception as e: + logger.error("Failed to copy file.", error=str(e), type=type, path=path) + return copied_files + + def download_files(self, files, prefix=None): + downloaded_files = {} + for type, url in files.items(): + try: + downloaded_files[type] = self.download_and_upload_to_s3( + url, prefix=prefix + ) + logger.info("Downloaded file", type=type, url=url) + except Exception as e: + logger.error( + "Failed to download file.", error=str(e), type=type, url=url + ) + return downloaded_files + + def download_and_upload_to_s3(self, url, prefix=None): + if not prefix: + prefix = str(uuid4()) + + filename = os.path.basename(url) + destination_key = f"{prefix}/{filename}" + + response = requests.get(url) + try: + response.raise_for_status() + except requests.exceptions.HTTPError as e: + logger.error("Failed to download file", error=str(e), url=url) + return + + try: + # Upload the file to S3 + self.client.put_object( + Body=response.content, + Bucket=self.bucket, + Key=destination_key, + Metadata={ + "source_url": url, + }, + ) + return f"{self.bucket}/{destination_key}" + except Exception as e: + logger.error( + "Failed to upload file", + error=str(e), + bucket=self.bucket, + key=destination_key, + ) diff --git a/dags/common/utils.py b/dags/common/utils.py index 3a060067..3d8376cc 100644 --- a/dags/common/utils.py +++ b/dags/common/utils.py @@ -169,7 +169,10 @@ def parse_to_ET_element(article): def parse_without_names_spaces(xml): - it = ET.iterparse(StringIO(xml)) + if type(xml) == str: + it = ET.iterparse(StringIO(xml)) + else: + it = ET.iterparse(StringIO(xml.getvalue().decode("utf-8"))) for _, el in it: el.tag = el.tag.rpartition("}")[-1] root = it.root @@ -243,6 +246,7 @@ def process_archive(file_bytes, file_name, **kwargs): max_tries=5, ) def create_or_update_article(data): + logger.info("Sending data to the backend", data=data) backend_url = os.getenv( "BACKEND_URL", "http://localhost:8000/api/article-workflow-import/" ) diff --git a/dags/elsevier/elsevier_file_processing.py b/dags/elsevier/elsevier_file_processing.py index 4dddefad..e64bff8b 100644 --- a/dags/elsevier/elsevier_file_processing.py +++ b/dags/elsevier/elsevier_file_processing.py @@ -1,23 +1,22 @@ -import base64 - import pendulum -import requests from airflow.decorators import dag, task from common.enhancer import Enhancer from common.enricher import Enricher from common.exceptions import EmptyOutputFromPreviousTask +from common.scoap3_s3 import Scoap3Repository from common.utils import create_or_update_article, parse_without_names_spaces from elsevier.parser import ElsevierParser -from jsonschema import validate +from elsevier.repository import ElsevierRepository +from inspire_utils.record import get_value +from structlog import get_logger + +logger = get_logger() def parse_elsevier(**kwargs): - try: - encoded_xml = kwargs["params"]["file_content"] - except KeyError: - raise Exception("There was no 'file_content' parameter. Exiting run.") - xml_bytes = base64.b64decode(encoded_xml) - xml = parse_without_names_spaces(xml_bytes.decode("utf-8")) + xml_content_bytes = kwargs["params"]["file_content"] + + xml = parse_without_names_spaces(xml_content_bytes) parser = ElsevierParser() parsed = parser.parse(xml) try: @@ -35,15 +34,16 @@ def enrich_elsevier(enhanced_file): return Enricher()(enhanced_file) -def elsevier_validate_record(file_with_metadata): - schema = requests.get(file_with_metadata["$schema"]).json() - validate(file_with_metadata, schema) - - @dag(schedule=None, start_date=pendulum.today("UTC").add(days=-1)) def elsevier_process_file(): + + s3_client = ElsevierRepository() + @task() def parse(**kwargs): + xml_path = kwargs["params"]["file_name"] + xml_content_bytes = s3_client.get_by_id(xml_path) + kwargs["params"]["file_content"] = xml_content_bytes return parse_elsevier(**kwargs) @task() @@ -52,26 +52,38 @@ def enhance(parsed_file): return parsed_file and enhance_elsevier(parsed_file) raise EmptyOutputFromPreviousTask("parse_metadata") + @task() + def populate_files(parsed_file): + if "files" not in parsed_file: + logger.info("No files to populate") + return parsed_file + + logger.info("Populating files", files=parsed_file["files"]) + + s3_client_bucket = s3_client.bucket + s3_scoap3_client = Scoap3Repository() + doi = get_value(parsed_file, "dois.value[0]") + files = s3_scoap3_client.copy_files( + s3_client_bucket, parsed_file["files"], prefix=doi + ) + parsed_file["files"] = files + logger.info("Files populated", files=parsed_file["files"]) + return parsed_file + @task() def enrich(enhanced_file): if enhanced_file: return enrich_elsevier(enhanced_file) raise EmptyOutputFromPreviousTask("enhanced_file_with_metadata") - @task() - def validate_record(enriched_file): - if enriched_file: - return elsevier_validate_record(enriched_file) - raise EmptyOutputFromPreviousTask("enriched_file_with_metadata") - @task() def create_or_update(enriched_file): create_or_update_article(enriched_file) parsed_file = parse() enhanced_file = enhance(parsed_file) - enriched_file = enrich(enhanced_file) - validate_record(enriched_file) + enhanced_file_with_files = populate_files(enhanced_file) + enriched_file = enrich(enhanced_file_with_files) create_or_update(enriched_file) diff --git a/dags/elsevier/metadata_parser.py b/dags/elsevier/metadata_parser.py index 4c20abe8..080888e6 100644 --- a/dags/elsevier/metadata_parser.py +++ b/dags/elsevier/metadata_parser.py @@ -64,7 +64,9 @@ def parse(self, article): for extractor in self.extractors if (value := extractor.extract(journal_issue)) is not None } - extracted_value["journal_volume"] = self._get_journal_volume(article.find("dataset-content")) + extracted_value["journal_volume"] = self._get_journal_volume( + article.find("dataset-content") + ) parsed_articles.append(self._generic_parsing(extracted_value)) return parsed_articles @@ -125,13 +127,13 @@ def _get_journal_volume(self, article): article=article, path="journal-issue/journal-issue-properties/volume-issue-number/vol-first", field_name="volume_vol_first", - dois=None + dois=None, ) suppl = extract_text( article=article, path="journal-issue/journal-issue-properties/volume-issue-number/suppl", field_name="volume_suppl", - dois=None + dois=None, ) return f"{vol_first} {suppl}" @@ -151,14 +153,16 @@ def _get_local_files(self, article): if self.file_path.startswith("raw"): self.file_path = self.file_path.replace("raw/", "") + files_dir_base = os.path.dirname(self.file_path) + pdf_file_path = os.path.join( - self.file_path, + files_dir_base, article.find("files-info/web-pdf/pathname").text, ) return { "pdf": pdf_file_path, "pdfa": os.path.join(os.path.split(pdf_file_path)[0], "main_a-2b.pdf"), "xml": os.path.join( - self.file_path, article.find("files-info/ml/pathname").text + files_dir_base, article.find("files-info/ml/pathname").text ), } diff --git a/dags/elsevier/repository.py b/dags/elsevier/repository.py index f5b41fd8..a6c95e73 100644 --- a/dags/elsevier/repository.py +++ b/dags/elsevier/repository.py @@ -5,6 +5,9 @@ from common.repository import IRepository from common.s3_service import S3Service from common.utils import find_extension +from structlog import get_logger + +logger = get_logger() class ElsevierRepository(IRepository): @@ -13,7 +16,8 @@ class ElsevierRepository(IRepository): def __init__(self): super().__init__() - self.s3 = S3Service(os.getenv("ELSEVIER_BUCKET_NAME", "elsevier")) + self.bucket = os.getenv("ELSEVIER_BUCKET_NAME", "elsevier") + self.s3 = S3Service(self.bucket) def get_all_raw_filenames(self): return [ @@ -45,13 +49,15 @@ def get_by_id(self, id): self.s3.download_fileobj(id, retfile) return retfile - def save(self, filename, obj): + def save(self, filename, obj, prefix=None): + logger.info("Saving file.", filename=filename) obj.seek(0) - prefix = ( - self.ZIPED_DIR - if ".tar" in filename or ".zip" in filename - else self.EXTRACTED_DIR - ) + if not prefix: + prefix = ( + self.ZIPED_DIR + if ".tar" in filename or ".zip" in filename + else self.EXTRACTED_DIR + ) self.s3.upload_fileobj(obj, prefix + filename) def delete_all(self): diff --git a/dags/elsevier/trigger_file_processing.py b/dags/elsevier/trigger_file_processing.py index 15e3634f..5b590016 100644 --- a/dags/elsevier/trigger_file_processing.py +++ b/dags/elsevier/trigger_file_processing.py @@ -1,13 +1,12 @@ -import base64 -import io import os from airflow.api.common import trigger_dag from common.pull_ftp import _generate_id -from common.repository import IRepository -from common.utils import parse_without_names_spaces, process_archive +from common.utils import parse_without_names_spaces from elsevier.metadata_parser import ElsevierMetadataParser -from structlog import PrintLogger +from structlog import get_logger + +logger = get_logger() def trigger_file_processing_elsevier( @@ -18,41 +17,23 @@ def trigger_file_processing_elsevier( ): files = [] for filename in filenames: + + if "dataset.xml" != os.path.basename(filename): + continue logger.msg("Running processing.", filename=filename) file_bytes = repo.get_by_id(filename) - (archive_file_content, s3_filename) = next( - process_archive( - file_bytes=file_bytes, - file_name=filename, - only_specific_file="dataset.xml", - ) - ) - dataset_file = parse_without_names_spaces(archive_file_content.decode("utf-8")) + + dataset_file = parse_without_names_spaces(file_bytes) parser = ElsevierMetadataParser(file_path=filename) parsed_articles = parser.parse(dataset_file) for parsed_article in parsed_articles: full_file_path = parsed_article["files"]["xml"] - (file_content, _) = next( - process_archive( - file_bytes=file_bytes, - file_name=s3_filename, - only_specific_file=full_file_path, - ) - ) - - repo.save(full_file_path, io.BytesIO(file_content)) - logger.msg("Running processing.", filename=full_file_path) - - file_bytes_extracted = repo.get_by_id( - os.path.join(*["extracted", full_file_path]) - ) + logger.msg("Processing file", file=full_file_path) _id = _generate_id(publisher) - encoded_article = base64.b64encode(file_bytes_extracted.getvalue()).decode() trigger_dag.trigger_dag( dag_id=f"{publisher}_process_file", run_id=_id, conf={ - "file_content": encoded_article, "file_name": full_file_path, "metadata": parsed_article, }, diff --git a/dags/hindawi/hindawi_file_processing.py b/dags/hindawi/hindawi_file_processing.py index a32b45f0..b1bfcdb7 100644 --- a/dags/hindawi/hindawi_file_processing.py +++ b/dags/hindawi/hindawi_file_processing.py @@ -1,14 +1,17 @@ import xml.etree.ElementTree as ET import pendulum -import requests from airflow.decorators import dag, task from common.enhancer import Enhancer from common.enricher import Enricher from common.exceptions import EmptyOutputFromPreviousTask +from common.scoap3_s3 import Scoap3Repository from common.utils import create_or_update_article from hindawi.parser import HindawiParser -from jsonschema import validate +from inspire_utils.record import get_value +from structlog import get_logger + +logger = get_logger() def parse_hindawi(xml): @@ -24,12 +27,6 @@ def enrich_hindawi(enhanced_file): return Enricher()(enhanced_file) -def hindawi_validate_record(enriched_file): - schema = requests.get(enriched_file["$schema"]).json() - validate(enriched_file, schema) - return enriched_file - - @dag(schedule=None, start_date=pendulum.today("UTC").add(days=-1)) def hindawi_file_processing(): @task() @@ -53,10 +50,23 @@ def enrich(enhanced_file): return enrich_hindawi(enhanced_file) @task() - def validate_record(enriched_file): - if not enriched_file: - raise EmptyOutputFromPreviousTask("enrich") - return hindawi_validate_record(enriched_file) + def populate_files(parsed_file): + if "dois" not in parsed_file: + return parsed_file + + doi = get_value(parsed_file, "dois.value[0]") + logger.info("Populating files", doi=doi) + doi_part = doi.split("10.1155/")[1] + files = { + "pdf": f"http://downloads.hindawi.com/journals/ahep/{doi_part}.pdf", + "pdfa": f"http://downloads.hindawi.com/journals/ahep/{doi_part}.a.pdf", + "xml": f"http://downloads.hindawi.com/journals/ahep/{doi_part}.xml", + } + s3_scoap3_client = Scoap3Repository() + downloaded_files = s3_scoap3_client.download_files(files, prefix=doi) + parsed_file["files"] = downloaded_files + logger.info("Files populated", files=parsed_file["files"]) + return parsed_file @task() def create_or_update(enriched_file): @@ -64,9 +74,9 @@ def create_or_update(enriched_file): parsed_file = parse() enhanced_file = enhance(parsed_file) - enriched_file = enrich(enhanced_file) - validated_record = validate_record(enriched_file) - create_or_update(validated_record) + enhanced_file_with_files = populate_files(enhanced_file) + enriched_file = enrich(enhanced_file_with_files) + create_or_update(enriched_file) Hindawi_file_processing = hindawi_file_processing() diff --git a/dags/iop/iop_process_file.py b/dags/iop/iop_process_file.py index 2c7d389b..fd8550e3 100644 --- a/dags/iop/iop_process_file.py +++ b/dags/iop/iop_process_file.py @@ -2,24 +2,29 @@ import xml.etree.ElementTree as ET import pendulum -import requests from airflow.decorators import dag, task from common.enhancer import Enhancer from common.enricher import Enricher from common.exceptions import EmptyOutputFromPreviousTask +from common.scoap3_s3 import Scoap3Repository from common.utils import create_or_update_article +from inspire_utils.record import get_value from iop.parser import IOPParser -from jsonschema import validate +from iop.repository import IOPRepository +from structlog import get_logger + +logger = get_logger() def iop_parse_file(**kwargs): if "params" not in kwargs or "file" not in kwargs["params"]: raise Exception("There was no 'file' parameter. Exiting run.") encoded_xml = kwargs["params"]["file"] + file_name = kwargs["params"]["file_name"] xml_bytes = base64.b64decode(encoded_xml) xml = ET.fromstring(xml_bytes.decode("utf-8")) - parser = IOPParser() + parser = IOPParser(file_path=file_name) parsed = parser.parse(xml) return parsed @@ -33,12 +38,6 @@ def iop_enrich_file(enhanced_file): return Enricher()(enhanced_file) -def iop_validate_record(enriched_file): - schema = requests.get(enriched_file["$schema"]).json() - validate(enriched_file, schema) - return enriched_file - - @dag(schedule=None, start_date=pendulum.today("UTC").add(days=-1)) def iop_process_file(): @task() @@ -58,10 +57,22 @@ def enrich_file(enhanced_file): return iop_enrich_file(enhanced_file) @task() - def validate_record(enriched_file): - if not enriched_file: - raise EmptyOutputFromPreviousTask("enrich_file") - return iop_validate_record(enriched_file) + def populate_files(parsed_file): + if "files" not in parsed_file: + logger.info("No files to populate") + return parsed_file + + logger.info("Populating files", files=parsed_file["files"]) + + s3_client_bucket = IOPRepository().bucket + s3_scoap3_client = Scoap3Repository() + doi = get_value(parsed_file, "dois.value[0]") + files = s3_scoap3_client.copy_files( + s3_client_bucket, parsed_file["files"], prefix=doi + ) + parsed_file["files"] = files + logger.info("Files populated", files=parsed_file["files"]) + return parsed_file @task() def create_or_update(enriched_file): @@ -69,9 +80,9 @@ def create_or_update(enriched_file): parsed_file = parse_file() enhanced_file = enhance_file(parsed_file) - enriched_file = enrich_file(enhanced_file) - validated_record = validate_record(enriched_file) - create_or_update(validated_record) + enhanced_file_with_files = populate_files(enhanced_file) + enriched_file = enrich_file(enhanced_file_with_files) + create_or_update(enriched_file) dag_taskflow = iop_process_file() diff --git a/dags/iop/parser.py b/dags/iop/parser.py index 373de22a..e1e462fc 100644 --- a/dags/iop/parser.py +++ b/dags/iop/parser.py @@ -1,4 +1,4 @@ -import xml.etree.ElementTree as ET +import os from common.constants import ( ARXIV_EXTRACTION_PATTERN, @@ -34,7 +34,8 @@ class IOPParser(IParser): "editorial": "editorial", } - def __init__(self): + def __init__(self, file_path): + self.file_path = file_path self.dois = None self.year = None self.journal_doctype = None @@ -129,6 +130,11 @@ def __init__(self): all_content_between_tags=True, extra_function=lambda x: x, ), + CustomExtractor( + destination="files", + extraction_function=self._get_local_files, + required=True, + ), ] super().__init__(extractors) @@ -391,6 +397,20 @@ def _get_license(self, article): type_and_version = get_license_type_and_version_from_url(url=url) if type_and_version: licenses.append(type_and_version) - except (KeyError): + except KeyError: self.logger.error("License is not found in XML.") return licenses + + def _get_local_files(self, article): + self.logger.msg("Parsing local files", file=self.file_path) + + dir_path = os.path.dirname(self.file_path) + file_name = os.path.basename(self.file_path).split(".")[0] + pdf_path = os.path.join(dir_path, f"{file_name}.pdf") + + files = { + "xml": self.file_path, + "pdf": pdf_path, + } + self.logger.msg("Local files parsed", files=files) + return files diff --git a/dags/iop/repository.py b/dags/iop/repository.py index 8defd1b2..726ae281 100644 --- a/dags/iop/repository.py +++ b/dags/iop/repository.py @@ -4,6 +4,9 @@ from common.repository import IRepository from common.s3_service import S3Service from common.utils import find_extension +from structlog import get_logger + +logger = get_logger() class IOPRepository(IRepository): @@ -12,7 +15,8 @@ class IOPRepository(IRepository): def __init__(self) -> None: super().__init__() - self.s3 = S3Service(os.getenv("IOP_BUCKET_NAME", "iop")) + self.bucket = os.getenv("IOP_BUCKET_NAME", "iop") + self.s3 = S3Service(self.bucket) def get_all_raw_filenames(self): return [ diff --git a/dags/oup/oup_process_file.py b/dags/oup/oup_process_file.py index 3fd5c945..94c91dc3 100644 --- a/dags/oup/oup_process_file.py +++ b/dags/oup/oup_process_file.py @@ -6,19 +6,26 @@ from common.enhancer import Enhancer from common.enricher import Enricher from common.exceptions import EmptyOutputFromPreviousTask +from common.scoap3_s3 import Scoap3Repository from common.utils import create_or_update_article, parse_without_names_spaces +from inspire_utils.record import get_value from jsonschema import validate from oup.parser import OUPParser +from oup.repository import OUPRepository +from structlog import get_logger + +logger = get_logger() def oup_parse_file(**kwargs): if "params" not in kwargs or "file" not in kwargs["params"]: raise KeyError("There was no 'file' parameter. Exiting run.") encoded_xml = kwargs["params"]["file"] + file_name = kwargs["params"]["file_name"] xml_bytes = base64.b64decode(encoded_xml) xml = parse_without_names_spaces(xml_bytes.decode("utf-8")) - parser = OUPParser() + parser = OUPParser(file_name) parsed = parser.parse(xml) return parsed @@ -57,10 +64,22 @@ def enrich_file(enhanced_file): return oup_enrich_file(enhanced_file) @task() - def validate_record(enriched_file): - if not enriched_file: - raise EmptyOutputFromPreviousTask("enrich_file") - return oup_validate_record(enriched_file) + def populate_files(parsed_file): + if "files" not in parsed_file: + logger.info("No files to populate") + return parsed_file + + logger.info("Populating files", files=parsed_file["files"]) + + s3_client_bucket = OUPRepository().bucket + s3_scoap3_client = Scoap3Repository() + doi = get_value(parsed_file, "dois.value[0]") + files = s3_scoap3_client.copy_files( + s3_client_bucket, parsed_file["files"], prefix=doi + ) + parsed_file["files"] = files + logger.info("Files populated", files=parsed_file["files"]) + return parsed_file @task() def create_or_update(enriched_file): @@ -68,9 +87,9 @@ def create_or_update(enriched_file): parsed_file = parse_file() enhanced_file = enhance_file(parsed_file) - enriched_file = enrich_file(enhanced_file) - validated_record = validate_record(enriched_file) - create_or_update(validated_record) + enhanced_file_with_files = populate_files(enhanced_file) + enriched_file = enrich_file(enhanced_file_with_files) + create_or_update(enriched_file) dag_taskflow = oup_process_file() diff --git a/dags/oup/parser.py b/dags/oup/parser.py index 49c49410..817557be 100644 --- a/dags/oup/parser.py +++ b/dags/oup/parser.py @@ -1,4 +1,5 @@ import datetime +import os from common.parsing.parser import IParser from common.parsing.xml_extractors import ( @@ -16,7 +17,8 @@ class OUPParser(IParser): - def __init__(self) -> None: + def __init__(self, file_path=None): + self.file_path = file_path self.logger = get_logger().bind(class_name=type(self).__name__) self.article_type_mapping = { "research-article": "article", @@ -110,6 +112,11 @@ def __init__(self) -> None: destination="collections", value=["Progress of Theoretical and Experimental Physics"], ), + CustomExtractor( + destination="files", + extraction_function=self._get_local_files, + required=True, + ), ] super().__init__(extractors) @@ -256,3 +263,21 @@ def _get_license(self, article): except (KeyError, TypeError): self.logger.error("License is not found in XML.", dois=self.dois) return licenses + + def _get_local_files(self, article): + self.logger.msg("Parsing local files", file=self.file_path) + + dir_path = os.path.dirname(self.file_path) + file_name = os.path.basename(self.file_path).split(".")[0] + pdf_dir_path = dir_path.replace("xml", "pdf") + pdfa_dir_path = dir_path.replace(".xml", "_archival") + pdf_path = os.path.join(pdf_dir_path, f"{file_name}.pdf") + pdfa_path = os.path.join(pdfa_dir_path, f"{file_name}.pdf") + + files = { + "xml": self.file_path, + "pdf": pdf_path, + "pdfa": pdfa_path, + } + self.logger.msg("Local files parsed", files=files) + return files diff --git a/dags/oup/repository.py b/dags/oup/repository.py index 1f0687ce..d1455343 100644 --- a/dags/oup/repository.py +++ b/dags/oup/repository.py @@ -3,6 +3,9 @@ from common.repository import IRepository from common.s3_service import S3Service +from structlog import get_logger + +logger = get_logger() class OUPRepository(IRepository): @@ -11,7 +14,8 @@ class OUPRepository(IRepository): def __init__(self): super().__init__() - self.s3 = S3Service(os.getenv("OUP_BUCKET_NAME", "oup")) + self.bucket = os.getenv("OUP_BUCKET_NAME", "oup") + self.s3 = S3Service(self.bucket) def get_all_raw_filenames(self): return [ diff --git a/dags/springer/parser.py b/dags/springer/parser.py index 3697918c..e50393c7 100644 --- a/dags/springer/parser.py +++ b/dags/springer/parser.py @@ -1,4 +1,5 @@ import datetime +import os import re from common.exceptions import UnknownLicense @@ -13,7 +14,8 @@ class SpringerParser(IParser): - def __init__(self): + def __init__(self, file_path=None): + self.file_path = file_path self.logger = get_logger().bind(class_name=type(self).__name__) self.dois = None article_type_mapping = { @@ -111,6 +113,11 @@ def __init__(self): source="./Journal/JournalInfo/JournalTitle", extra_function=lambda x: [x.lstrip("The ")], ), + CustomExtractor( + destination="files", + extraction_function=self._get_local_files, + required=True, + ), ] super().__init__(extractors) @@ -286,3 +293,15 @@ def _get_license(self, article): "url": "https://creativecommons.org/licenses/by/3.0", } ] + + def _get_local_files(self, article): + self.logger.msg("Parsing local files", pdf=self.file_path) + + pdfa_name = f"{os.path.basename(self.file_path).split('.')[0]}.pdf" + pdfa_path = os.path.join( + os.path.dirname(self.file_path), "BodyRef", "PDF", pdfa_name + ) + return { + "pdfa": pdfa_path, + "xml": self.file_path, + } diff --git a/dags/springer/repository.py b/dags/springer/repository.py index c8b3a5e4..caf52a8e 100644 --- a/dags/springer/repository.py +++ b/dags/springer/repository.py @@ -3,6 +3,9 @@ from common.repository import IRepository from common.s3_service import S3Service +from structlog import get_logger + +logger = get_logger() class SpringerRepository(IRepository): @@ -11,7 +14,8 @@ class SpringerRepository(IRepository): def __init__(self) -> None: super().__init__() - self.s3 = S3Service(os.getenv("SPRINGER_BUCKET_NAME", "springer")) + self.bucket = os.getenv("SPRINGER_BUCKET_NAME", "springer") + self.s3 = S3Service(self.bucket) def get_all_raw_filenames(self): return [ diff --git a/dags/springer/springer_process_file.py b/dags/springer/springer_process_file.py index ca02d3b1..0b557527 100644 --- a/dags/springer/springer_process_file.py +++ b/dags/springer/springer_process_file.py @@ -7,18 +7,25 @@ from common.enhancer import Enhancer from common.enricher import Enricher from common.exceptions import EmptyOutputFromPreviousTask +from common.scoap3_s3 import Scoap3Repository from common.utils import create_or_update_article +from inspire_utils.record import get_value from jsonschema import validate from springer.parser import SpringerParser +from springer.repository import SpringerRepository +from structlog import get_logger + +logger = get_logger() def springer_parse_file(**kwargs): if "params" in kwargs and "file" in kwargs["params"]: encoded_xml = kwargs["params"]["file"] + file_name = kwargs["params"]["file_name"] xml_bytes = base64.b64decode(encoded_xml) xml = ET.fromstring(xml_bytes.decode("utf-8")) - parser = SpringerParser() + parser = SpringerParser(file_name) parsed = parser.parse(xml) return parsed @@ -58,10 +65,22 @@ def enrich_file(enhanced_file): return springer_enrich_file(enhanced_file) @task() - def validate_record(enriched_file): - if not enriched_file: - raise EmptyOutputFromPreviousTask("enrich_file") - return springer_validate_record(enriched_file) + def populate_files(parsed_file): + if "files" not in parsed_file: + logger.info("No files to populate") + return parsed_file + + logger.info("Populating files", files=parsed_file["files"]) + + s3_client_bucket = SpringerRepository().bucket + s3_scoap3_client = Scoap3Repository() + doi = get_value(parsed_file, "dois.value[0]") + files = s3_scoap3_client.copy_files( + s3_client_bucket, parsed_file["files"], prefix=doi + ) + parsed_file["files"] = files + logger.info("Files populated", files=parsed_file["files"]) + return parsed_file @task() def create_or_update(enriched_file): @@ -69,9 +88,9 @@ def create_or_update(enriched_file): parsed_file = parse_file() enhanced_file = enhance_file(parsed_file) - enriched_file = enrich_file(enhanced_file) - validated_record = validate_record(enriched_file) - create_or_update(validated_record) + enhanced_file_with_files = populate_files(enhanced_file) + enriched_file = enrich_file(enhanced_file_with_files) + create_or_update(enriched_file) dag_taskflow = springer_process_file() diff --git a/tests/integration/iop/test_iop_dag_process_file.py b/tests/integration/iop/test_iop_dag_process_file.py index bb087cc7..ec1415be 100644 --- a/tests/integration/iop/test_iop_dag_process_file.py +++ b/tests/integration/iop/test_iop_dag_process_file.py @@ -4,7 +4,7 @@ from airflow.models import DagBag from common.utils import parse_to_ET_element from freezegun import freeze_time -from iop.iop_process_file import iop_enhance_file, iop_enrich_file, iop_validate_record +from iop.iop_process_file import iop_enhance_file, iop_enrich_file from iop.parser import IOPParser from pytest import fixture @@ -50,11 +50,6 @@ def test_dag_loaded(dag): assert len(dag.tasks) == 5 -@pytest.mark.vcr -def test_dag_validate_file_processing(article): - iop_validate_record(article) - - publisher = "IOP" generic_pseudo_parser_output = { diff --git a/tests/units/elsevier/test_elsevier_validation.py b/tests/units/elsevier/test_elsevier_validation.py deleted file mode 100644 index 8331c9e3..00000000 --- a/tests/units/elsevier/test_elsevier_validation.py +++ /dev/null @@ -1,125 +0,0 @@ -import base64 - -import pytest -import requests -from common.enhancer import Enhancer -from common.enricher import Enricher -from common.utils import parse_without_names_spaces -from elsevier.elsevier_file_processing import parse_elsevier -from elsevier.metadata_parser import ElsevierMetadataParser -from elsevier.parser import ElsevierParser -from jsonschema import validate -from pytest import fixture - - -@fixture(scope="module") -def parser(): - return ElsevierParser() - - -@fixture(scope="module") -def metadata_parser(): - return ElsevierMetadataParser( - file_path="extracted/CERNQ000000010669A/CERNQ000000010669", - ) - - -@fixture -def parsed_article(shared_datadir, parser, metadata_parser): - with open( - shared_datadir / "CERNQ000000010011" / "S0370269323000643" / "main.xml" - ) as file: - parsed_file = parser.parse(parse_without_names_spaces(file.read())) - with open(shared_datadir / "CERNQ000000010011" / "dataset.xml") as file: - parsed_files_with_metadata = metadata_parser.parse( - parse_without_names_spaces(file.read()), - ) - full_parsed_file = {**parsed_file, **parsed_files_with_metadata[0]} - enhanced_file = Enhancer()("Elsevier", full_parsed_file) - return Enricher()(enhanced_file) - - -@pytest.mark.vcr -def test_elsevier_validate_record(parsed_article): - schema = requests.get(parsed_article["$schema"]).json() - validate(parsed_article, schema) - - -@pytest.mark.vcr -def test_parse_elsevier_task_and_validation(shared_datadir): - with open( - shared_datadir / "CERNQ000000010011" / "S0370269323000643" / "main.xml", "rb" - ) as file: - metadata = { - "dois": [{"value": "10.1016/j.physletb.2023.137730"}], - "date_published": "2023-02-04", - "collections": [{"primary": "PLB"}], - "license": [ - { - "url": "http://creativecommons.org/licenses/by/3.0/", - "license": "CC-BY-3.0", - } - ], - "files": { - "pdf": "extracted/CERNQ000000010669A/CERNQ000000010669/S0370269323000643/main.pdf", - "pdfa": "extracted/CERNQ000000010669A/CERNQ000000010669/S0370269323000643/main_a-2b.pdf", - "xml": "extracted/CERNQ000000010669A/CERNQ000000010669/S0370269323000643/main.xml", - }, - "publication_info": [{"journal_title": "PLB", "year": 2023}], - } - kwargs = { - "params": { - "file_content": base64.b64encode(file.read()).decode(), - "metadata": metadata, - } - } - article = parse_elsevier(**kwargs) - - assert len(article["authors"]) == 1023 - assert ( - article["title"] - == "System-size dependence of the charged-particle pseudorapidity density at sNN=5.02TeV for pp, p Pb, and Pb Pb collisions" - ) - enhanced_file = Enhancer()("Elsevier", article) - enriched_file = Enricher()(enhanced_file) - schema = requests.get(enriched_file["$schema"]).json() - validate(enriched_file, schema) - - -@pytest.mark.vcr -def test_parse_elsevier_task_and_validation_address_line(shared_datadir): - with open(shared_datadir / "address-line-valid" / "main_rjjlr.xml", "rb") as file: - metadata = { - "dois": [{"value": "10.1016/j.physletb.2022.137649"}], - "date_published": "2023-02-04", - "collections": [{"primary": "PLB"}], - "license": [ - { - "url": "http://creativecommons.org/licenses/by/3.0/", - "license": "CC-BY-3.0", - } - ], - "files": { - "pdf": "extracted/address-line-valid/main.pdf", - "pdfa": "extracted/address-line-valid/main_a-2b.pdf", - "xml": "extracted/address-line-valid/main_rjjlr.xml", - }, - "publication_info": [{"journal_title": "PLB", "year": 2023}], - } - kwargs = { - "params": { - "file_content": base64.b64encode(file.read()).decode(), - "metadata": metadata, - } - } - article = parse_elsevier(**kwargs) - - assert len(article["authors"]) == 1032 - assert ( - article["title"] - == "Study of charged particle production at high p T using event topology in pp, p–Pb and Pb–Pb collisions at sNN=5.02 TeV" - ) - enhanced_file = Enhancer()("Elsevier", article) - enriched_file = Enricher()(enhanced_file) - schema = requests.get(enriched_file["$schema"]).json() - validate(enriched_file, schema) diff --git a/tests/units/hindawi/test_hindawi_parser.py b/tests/units/hindawi/test_hindawi_parser.py index 5797037d..5037d044 100644 --- a/tests/units/hindawi/test_hindawi_parser.py +++ b/tests/units/hindawi/test_hindawi_parser.py @@ -2,11 +2,6 @@ import pytest from common.parsing.xml_extractors import RequiredFieldNotFoundExtractionError -from hindawi.hindawi_file_processing import ( - enhance_hindawi, - enrich_hindawi, - hindawi_validate_record, -) from hindawi.parser import HindawiParser @@ -283,12 +278,3 @@ def parsed_article_without_page_nr(hindawi_parser, article_without_page_nr): def test_page_nr(parsed_article_without_page_nr): assert parsed_article_without_page_nr["page_nr"] == [0] # Default value - - -@pytest.mark.vcr -def test_hindawi_record_validation(hindawi_parser, parsed_article_without_page_nr): - enhanced = enhance_hindawi( - hindawi_parser._generic_parsing(parsed_article_without_page_nr) - ) - enriched = enrich_hindawi(enhanced) - hindawi_validate_record(enriched) diff --git a/tests/units/hindawi/test_hindawi_validation.py b/tests/units/hindawi/test_hindawi_validation.py deleted file mode 100644 index 7b73cf87..00000000 --- a/tests/units/hindawi/test_hindawi_validation.py +++ /dev/null @@ -1,39 +0,0 @@ -import xml.etree.ElementTree as ET - -import pytest -import requests -from common.enhancer import Enhancer -from common.enricher import Enricher -from common.parsing.xml_extractors import RequiredFieldNotFoundExtractionError -from hindawi.parser import HindawiParser -from jsonschema import validate -from pytest import fixture, raises - - -@fixture(scope="module") -def parser(): - return HindawiParser() - - -@fixture -def parsed_article(shared_datadir, parser): - with open(shared_datadir / "example1.xml") as file: - parsed_file = parser.parse(ET.fromstring(file.read())) - enhanced_file = Enhancer()("Hindawi", parsed_file) - return Enricher()(enhanced_file) - - -@pytest.mark.vcr -def test_hindawi_validate_record(parsed_article): - schema = requests.get(parsed_article["$schema"]).json() - validate(parsed_article, schema) - - -def test_hindawi_validate_record_without_doi(parser, shared_datadir): - with open(shared_datadir / "without_doi.xml") as file: - with raises(RequiredFieldNotFoundExtractionError): - parsed_file = parser.parse(ET.fromstring(file.read())) - enhanced_file = Enhancer()("Hindawi", parsed_file) - parsed_article_without_doi = Enricher()(enhanced_file) - schema = requests.get(parsed_article_without_doi["$schema"]).json() - validate(parsed_article, schema)