Skip to content

Commit

Permalink
global: addition of files
Browse files Browse the repository at this point in the history
* Adds `files` key to all publishers.
  • Loading branch information
drjova committed Apr 12, 2024
1 parent 861ae5d commit f236521
Show file tree
Hide file tree
Showing 25 changed files with 384 additions and 348 deletions.
9 changes: 1 addition & 8 deletions dags/aps/aps_process_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,14 @@ def enrich(enhanced_file):
raise EmptyOutputFromPreviousTask("enhance")
return enrich_aps(enhanced_file)

@task()
def validate_record(enriched_file):
if not enriched_file:
raise EmptyOutputFromPreviousTask("enrich")
return aps_validate_record(enriched_file)

@task()
def create_or_update(enriched_file):
create_or_update_article(enriched_file)

parsed_file = parse()
enhanced_file = enhance(parsed_file)
enriched_file = enrich(enhanced_file)
validated_record = validate_record(enriched_file)
create_or_update(validated_record)
create_or_update(enriched_file)


dag_for_aps_files_processing = aps_process_file()
27 changes: 12 additions & 15 deletions dags/aps/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from inspire_utils.record import get_value
from structlog import get_logger

logger = get_logger()


class APSParser(IParser):
def __init__(self) -> None:
Expand Down Expand Up @@ -74,7 +76,10 @@ def __init__(self) -> None:
extraction_function=lambda x: ["HEP", "Citeable", "Published"],
),
CustomExtractor("field_categories", self._get_field_categories),
CustomExtractor("files", self._build_files_data),
CustomExtractor(
destination="files",
extraction_function=self._build_files_data,
),
]

super().__init__(extractors)
Expand Down Expand Up @@ -122,20 +127,12 @@ def _get_field_categories(self, article):

def _build_files_data(self, article):
doi = get_value(article, "identifiers.doi")
return [
{
"url": "http://harvest.aps.org/v2/journals/articles/{0}".format(doi),
"headers": {"Accept": "application/pdf"},
"name": "{0}.pdf".format(doi),
"filetype": "pdf",
},
{
"url": "http://harvest.aps.org/v2/journals/articles/{0}".format(doi),
"headers": {"Accept": "text/xml"},
"name": "{0}.xml".format(doi),
"filetype": "xml",
},
]
files = {
"pdf": f"http://harvest.aps.org/v2/journals/articles/{doi}",
"xml": f"http://harvest.aps.org/v2/journals/articles/{doi}",
}
logger.info("Files data", files=files)
return files

def _get_licenses(self, x):
try:
Expand Down
1 change: 0 additions & 1 deletion dags/common/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ def remove_unnecessary_fields(obj):
fieldnames = [
"curated",
"citeable",
"files",
"date_published",
"source_file_path",
"local_files",
Expand Down
3 changes: 2 additions & 1 deletion dags/common/enricher.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ def __call__(self, article):
enriched_article = article.copy()
enriched_article.update(
{
"$schema": self._get_schema(),
"arxiv_eprints": [
self._set_categories(eprint)
for eprint in enriched_article.get("arxiv_eprints", [])
Expand All @@ -120,4 +119,6 @@ def __call__(self, article):
)
enriched_article = remove_empty_values(enriched_article)
enriched_article = remove_unnecessary_fields(enriched_article)

self.logger.info("Enriched article.", article=enriched_article)
return enriched_article
26 changes: 7 additions & 19 deletions dags/common/pull_ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def migrate_files(
s_ftp,
repo,
logger: PrintLogger,
process_archives = True,
process_archives=True,
):
logger.msg("Processing files.", filenames=archives_names)
extracted_or_downloaded_filenames = []
Expand Down Expand Up @@ -58,7 +58,6 @@ def migrate_from_ftp(
s_ftp,
repo,
logger,
publisher=None,
**kwargs,
):
params = kwargs["params"]
Expand All @@ -74,10 +73,10 @@ def migrate_from_ftp(
)

if force_pull_all_files:
return _force_pull(s_ftp, repo, logger, publisher, **kwargs)
return _force_pull(s_ftp, repo, logger, **kwargs)
elif force_pull_specific_files:
return _filenames_pull(s_ftp, repo, logger, publisher, **kwargs)
return _differential_pull(s_ftp, repo, logger, publisher, **kwargs)
return _filenames_pull(s_ftp, repo, logger, **kwargs)
return _differential_pull(s_ftp, repo, logger, **kwargs)


def reprocess_files(repo, logger, **kwargs):
Expand All @@ -97,26 +96,19 @@ def _force_pull(
logger.msg("Force Pulling from SFTP.")
excluded_directories = kwargs["params"]["excluded_directories"]
filenames = s_ftp.list_files(excluded_directories=excluded_directories)
process_archives = publisher != "elsevier"
return migrate_files(
filenames, s_ftp, repo, logger, process_archives=process_archives
)
return migrate_files(filenames, s_ftp, repo, logger)


def _filenames_pull(
s_ftp,
repo,
logger,
publisher,
**kwargs,
):
filenames_pull_params = kwargs["params"]["filenames_pull"]
filenames = filenames_pull_params["filenames"]
logger.msg("Pulling specified filenames from SFTP")
process_archives = publisher != "elsevier"
return migrate_files(
filenames, s_ftp, repo, logger, process_archives=process_archives
)
return migrate_files(filenames, s_ftp, repo, logger)


def _find_files_in_zip(filenames, repo):
Expand All @@ -137,18 +129,14 @@ def _differential_pull(
s_ftp,
repo,
logger,
publisher,
**kwargs,
):
logger.msg("Pulling missing files only.")
excluded_directories = kwargs["params"]["excluded_directories"]
sftp_files = s_ftp.list_files(excluded_directories=excluded_directories)
s3_files = repo.get_all_raw_filenames()
diff_files = list(filter(lambda x: x not in s3_files, sftp_files))
process_archives = publisher != "elsevier"
return migrate_files(
diff_files, s_ftp, repo, logger, process_archives=process_archives
)
return migrate_files(diff_files, s_ftp, repo, logger)


def trigger_file_processing(
Expand Down
99 changes: 99 additions & 0 deletions dags/common/scoap3_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import os
from uuid import uuid4

import requests
from common.repository import IRepository
from common.s3_service import S3Service
from structlog import get_logger

logger = get_logger()


class Scoap3Repository(IRepository):
def __init__(self):
super().__init__()
self.bucket = os.getenv("SCOAP3_BUCKET_NAME", "scoap3")
self.s3 = S3Service(self.bucket)
self.client = self.s3.meta.client

def copy_file(self, source_bucket, source_key, prefix=None):
if not prefix:
prefix = str(uuid4())

copy_source = {"Bucket": source_bucket, "Key": source_key}
filename = os.path.basename(source_key)
destination_key = f"{prefix}/{filename}"

logger.info("Copying file from", copy_source=copy_source)
self.client.copy(
copy_source,
self.bucket,
destination_key,
ExtraArgs={
"Metadata": {
"source_bucket": source_bucket,
"source_key": source_key,
},
"MetadataDirective": "REPLACE",
},
)
logger.info(
f"Copied file from {source_bucket}/{source_key} to {self.bucket}/{destination_key}"
)
return f"{self.bucket}/{destination_key}"

def copy_files(self, bucket, files, prefix=None):
copied_files = {}
for type, path in files.items():
try:
copied_files[type] = self.copy_file(bucket, path, prefix=prefix)
except Exception as e:
logger.error("Failed to copy file.", error=str(e), type=type, path=path)
return copied_files

def download_files(self, files, prefix=None):
downloaded_files = {}
for type, url in files.items():
try:
downloaded_files[type] = self.download_and_upload_to_s3(
url, prefix=prefix
)
logger.info("Downloaded file", type=type, url=url)
except Exception as e:
logger.error(
"Failed to download file.", error=str(e), type=type, url=url
)
return downloaded_files

def download_and_upload_to_s3(self, url, prefix=None):
if not prefix:
prefix = str(uuid4())

filename = os.path.basename(url)
destination_key = f"{prefix}/{filename}"

response = requests.get(url)
try:
response.raise_for_status()
except requests.exceptions.HTTPError as e:
logger.error("Failed to download file", error=str(e), url=url)
return

try:
# Upload the file to S3
self.client.put_object(
Body=response.content,
Bucket=self.bucket,
Key=destination_key,
Metadata={
"source_url": url,
},
)
return f"{self.bucket}/{destination_key}"
except Exception as e:
logger.error(
"Failed to upload file",
error=str(e),
bucket=self.bucket,
key=destination_key,
)
6 changes: 5 additions & 1 deletion dags/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,10 @@ def parse_to_ET_element(article):


def parse_without_names_spaces(xml):
it = ET.iterparse(StringIO(xml))
if type(xml) == str:
it = ET.iterparse(StringIO(xml))
else:
it = ET.iterparse(StringIO(xml.getvalue().decode("utf-8")))
for _, el in it:
el.tag = el.tag.rpartition("}")[-1]
root = it.root
Expand Down Expand Up @@ -243,6 +246,7 @@ def process_archive(file_bytes, file_name, **kwargs):
max_tries=5,
)
def create_or_update_article(data):
logger.info("Sending data to the backend", data=data)
backend_url = os.getenv(
"BACKEND_URL", "http://localhost:8000/api/article-workflow-import/"
)
Expand Down
58 changes: 35 additions & 23 deletions dags/elsevier/elsevier_file_processing.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
import base64

import pendulum
import requests
from airflow.decorators import dag, task
from common.enhancer import Enhancer
from common.enricher import Enricher
from common.exceptions import EmptyOutputFromPreviousTask
from common.scoap3_s3 import Scoap3Repository
from common.utils import create_or_update_article, parse_without_names_spaces
from elsevier.parser import ElsevierParser
from jsonschema import validate
from elsevier.repository import ElsevierRepository
from inspire_utils.record import get_value
from structlog import get_logger

logger = get_logger()


def parse_elsevier(**kwargs):
try:
encoded_xml = kwargs["params"]["file_content"]
except KeyError:
raise Exception("There was no 'file_content' parameter. Exiting run.")
xml_bytes = base64.b64decode(encoded_xml)
xml = parse_without_names_spaces(xml_bytes.decode("utf-8"))
xml_content_bytes = kwargs["params"]["file_content"]

xml = parse_without_names_spaces(xml_content_bytes)
parser = ElsevierParser()
parsed = parser.parse(xml)
try:
Expand All @@ -35,15 +34,16 @@ def enrich_elsevier(enhanced_file):
return Enricher()(enhanced_file)


def elsevier_validate_record(file_with_metadata):
schema = requests.get(file_with_metadata["$schema"]).json()
validate(file_with_metadata, schema)


@dag(schedule=None, start_date=pendulum.today("UTC").add(days=-1))
def elsevier_process_file():

s3_client = ElsevierRepository()

@task()
def parse(**kwargs):
xml_path = kwargs["params"]["file_name"]
xml_content_bytes = s3_client.get_by_id(xml_path)
kwargs["params"]["file_content"] = xml_content_bytes
return parse_elsevier(**kwargs)

@task()
Expand All @@ -52,26 +52,38 @@ def enhance(parsed_file):
return parsed_file and enhance_elsevier(parsed_file)
raise EmptyOutputFromPreviousTask("parse_metadata")

@task()
def populate_files(parsed_file):
if "files" not in parsed_file:
logger.info("No files to populate")
return parsed_file

logger.info("Populating files", files=parsed_file["files"])

s3_client_bucket = s3_client.bucket
s3_scoap3_client = Scoap3Repository()
doi = get_value(parsed_file, "dois.value[0]")
files = s3_scoap3_client.copy_files(
s3_client_bucket, parsed_file["files"], prefix=doi
)
parsed_file["files"] = files
logger.info("Files populated", files=parsed_file["files"])
return parsed_file

@task()
def enrich(enhanced_file):
if enhanced_file:
return enrich_elsevier(enhanced_file)
raise EmptyOutputFromPreviousTask("enhanced_file_with_metadata")

@task()
def validate_record(enriched_file):
if enriched_file:
return elsevier_validate_record(enriched_file)
raise EmptyOutputFromPreviousTask("enriched_file_with_metadata")

@task()
def create_or_update(enriched_file):
create_or_update_article(enriched_file)

parsed_file = parse()
enhanced_file = enhance(parsed_file)
enriched_file = enrich(enhanced_file)
validate_record(enriched_file)
enhanced_file_with_files = populate_files(enhanced_file)
enriched_file = enrich(enhanced_file_with_files)
create_or_update(enriched_file)


Expand Down
Loading

0 comments on commit f236521

Please sign in to comment.