Skip to content

Commit

Permalink
global: addition of files
Browse files Browse the repository at this point in the history
* Adds `files` key to all publishers.
  • Loading branch information
drjova committed Apr 17, 2024
1 parent 861ae5d commit 7dd4145
Show file tree
Hide file tree
Showing 35 changed files with 448 additions and 1,156 deletions.
39 changes: 24 additions & 15 deletions dags/aps/aps_process_file.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
import json

import pendulum
import requests
from airflow.decorators import dag, task
from aps.parser import APSParser
from common.enhancer import Enhancer
from common.enricher import Enricher
from common.exceptions import EmptyOutputFromPreviousTask
from common.scoap3_s3 import Scoap3Repository
from common.utils import create_or_update_article
from jsonschema import validate
from inspire_utils.record import get_value
from structlog import get_logger

logger = get_logger()


def parse_aps(data):
Expand All @@ -25,12 +28,6 @@ def enrich_aps(enhanced_file):
return Enricher()(enhanced_file)


def aps_validate_record(enriched_file):
schema = requests.get(enriched_file["$schema"]).json()
validate(enriched_file, schema)
return enriched_file


@dag(schedule=None, start_date=pendulum.today("UTC").add(days=-1))
def aps_process_file():
@task()
Expand All @@ -52,20 +49,32 @@ def enrich(enhanced_file):
return enrich_aps(enhanced_file)

@task()
def validate_record(enriched_file):
if not enriched_file:
raise EmptyOutputFromPreviousTask("enrich")
return aps_validate_record(enriched_file)
def populate_files(parsed_file):
if "dois" not in parsed_file:
return parsed_file

doi = get_value(parsed_file, "dois.value[0]")
logger.info("Populating files", doi=doi)

files = {
"pdf": f"http://harvest.aps.org/v2/journals/articles/{doi}",
"xml": f"http://harvest.aps.org/v2/journals/articles/{doi}",
}
s3_scoap3_client = Scoap3Repository()
downloaded_files = s3_scoap3_client.download_files_for_aps(files, prefix=doi)
parsed_file["files"] = downloaded_files
logger.info("Files populated", files=parsed_file["files"])
return parsed_file

@task()
def create_or_update(enriched_file):
create_or_update_article(enriched_file)

parsed_file = parse()
enhanced_file = enhance(parsed_file)
enriched_file = enrich(enhanced_file)
validated_record = validate_record(enriched_file)
create_or_update(validated_record)
enhanced_file_with_files = populate_files(enhanced_file)
enriched_file = enrich(enhanced_file_with_files)
create_or_update(enriched_file)


dag_for_aps_files_processing = aps_process_file()
20 changes: 2 additions & 18 deletions dags/aps/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from inspire_utils.record import get_value
from structlog import get_logger

logger = get_logger()


class APSParser(IParser):
def __init__(self) -> None:
Expand Down Expand Up @@ -74,7 +76,6 @@ def __init__(self) -> None:
extraction_function=lambda x: ["HEP", "Citeable", "Published"],
),
CustomExtractor("field_categories", self._get_field_categories),
CustomExtractor("files", self._build_files_data),
]

super().__init__(extractors)
Expand Down Expand Up @@ -120,23 +121,6 @@ def _get_field_categories(self, article):
)
]

def _build_files_data(self, article):
doi = get_value(article, "identifiers.doi")
return [
{
"url": "http://harvest.aps.org/v2/journals/articles/{0}".format(doi),
"headers": {"Accept": "application/pdf"},
"name": "{0}.pdf".format(doi),
"filetype": "pdf",
},
{
"url": "http://harvest.aps.org/v2/journals/articles/{0}".format(doi),
"headers": {"Accept": "text/xml"},
"name": "{0}.xml".format(doi),
"filetype": "xml",
},
]

def _get_licenses(self, x):
try:
rights = x["rights"]["licenses"]
Expand Down
1 change: 0 additions & 1 deletion dags/common/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ def remove_unnecessary_fields(obj):
fieldnames = [
"curated",
"citeable",
"files",
"date_published",
"source_file_path",
"local_files",
Expand Down
3 changes: 2 additions & 1 deletion dags/common/enricher.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ def __call__(self, article):
enriched_article = article.copy()
enriched_article.update(
{
"$schema": self._get_schema(),
"arxiv_eprints": [
self._set_categories(eprint)
for eprint in enriched_article.get("arxiv_eprints", [])
Expand All @@ -120,4 +119,6 @@ def __call__(self, article):
)
enriched_article = remove_empty_values(enriched_article)
enriched_article = remove_unnecessary_fields(enriched_article)

self.logger.info("Enriched article.", article=enriched_article)
return enriched_article
27 changes: 7 additions & 20 deletions dags/common/pull_ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def migrate_files(
s_ftp,
repo,
logger: PrintLogger,
process_archives = True,
process_archives=True,
):
logger.msg("Processing files.", filenames=archives_names)
extracted_or_downloaded_filenames = []
Expand Down Expand Up @@ -58,7 +58,6 @@ def migrate_from_ftp(
s_ftp,
repo,
logger,
publisher=None,
**kwargs,
):
params = kwargs["params"]
Expand All @@ -74,10 +73,10 @@ def migrate_from_ftp(
)

if force_pull_all_files:
return _force_pull(s_ftp, repo, logger, publisher, **kwargs)
return _force_pull(s_ftp, repo, logger, **kwargs)
elif force_pull_specific_files:
return _filenames_pull(s_ftp, repo, logger, publisher, **kwargs)
return _differential_pull(s_ftp, repo, logger, publisher, **kwargs)
return _filenames_pull(s_ftp, repo, logger, **kwargs)
return _differential_pull(s_ftp, repo, logger, **kwargs)


def reprocess_files(repo, logger, **kwargs):
Expand All @@ -91,32 +90,24 @@ def _force_pull(
s_ftp,
repo,
logger,
publisher,
**kwargs,
):
logger.msg("Force Pulling from SFTP.")
excluded_directories = kwargs["params"]["excluded_directories"]
filenames = s_ftp.list_files(excluded_directories=excluded_directories)
process_archives = publisher != "elsevier"
return migrate_files(
filenames, s_ftp, repo, logger, process_archives=process_archives
)
return migrate_files(filenames, s_ftp, repo, logger)


def _filenames_pull(
s_ftp,
repo,
logger,
publisher,
**kwargs,
):
filenames_pull_params = kwargs["params"]["filenames_pull"]
filenames = filenames_pull_params["filenames"]
logger.msg("Pulling specified filenames from SFTP")
process_archives = publisher != "elsevier"
return migrate_files(
filenames, s_ftp, repo, logger, process_archives=process_archives
)
return migrate_files(filenames, s_ftp, repo, logger)


def _find_files_in_zip(filenames, repo):
Expand All @@ -137,18 +128,14 @@ def _differential_pull(
s_ftp,
repo,
logger,
publisher,
**kwargs,
):
logger.msg("Pulling missing files only.")
excluded_directories = kwargs["params"]["excluded_directories"]
sftp_files = s_ftp.list_files(excluded_directories=excluded_directories)
s3_files = repo.get_all_raw_filenames()
diff_files = list(filter(lambda x: x not in s3_files, sftp_files))
process_archives = publisher != "elsevier"
return migrate_files(
diff_files, s_ftp, repo, logger, process_archives=process_archives
)
return migrate_files(diff_files, s_ftp, repo, logger)


def trigger_file_processing(
Expand Down
124 changes: 124 additions & 0 deletions dags/common/scoap3_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import os
from uuid import uuid4

import requests
from common.repository import IRepository
from common.s3_service import S3Service
from structlog import get_logger

logger = get_logger()


class Scoap3Repository(IRepository):
def __init__(self):
super().__init__()
self.bucket = os.getenv("SCOAP3_BUCKET_NAME", "scoap3")
self.s3 = S3Service(self.bucket)
self.client = self.s3.meta.client

def copy_file(self, source_bucket, source_key, prefix=None):
if not prefix:
prefix = str(uuid4())

copy_source = {"Bucket": source_bucket, "Key": source_key}
filename = os.path.basename(source_key)
destination_key = f"{prefix}/{filename}"

logger.info("Copying file from", copy_source=copy_source)
self.client.copy(
copy_source,
self.bucket,
destination_key,
ExtraArgs={
"Metadata": {
"source_bucket": source_bucket,
"source_key": source_key,
},
"MetadataDirective": "REPLACE",
},
)
logger.info(
f"Copied file from {source_bucket}/{source_key} to {self.bucket}/{destination_key}"
)
return f"{self.bucket}/{destination_key}"

def copy_files(self, bucket, files, prefix=None):
copied_files = {}
for type, path in files.items():
try:
copied_files[type] = self.copy_file(bucket, path, prefix=prefix)
except Exception as e:
logger.error("Failed to copy file.", error=str(e), type=type, path=path)
return copied_files

def download_files(self, files, prefix=None):
if not prefix:
prefix = str(uuid4())

downloaded_files = {}

for type, url in files.items():
try:
downloaded_files[type] = self.download_and_upload_to_s3(
url, prefix=prefix
)
logger.info("Downloaded file", type=type, url=url)
except Exception as e:
logger.error(
"Failed to download file.", error=str(e), type=type, url=url
)
return downloaded_files

def download_files_for_aps(self, files, prefix=None):
if not prefix:
prefix = str(uuid4())

downloaded_files = {}

for type, url in files.items():
headers = {
"Accept": f"application/{type}",
}
try:
downloaded_files[type] = self.download_and_upload_to_s3(
url, prefix=prefix, headers=headers
)
logger.info("Downloaded file", type=type, url=url)
except Exception as e:
logger.error(
"Failed to download file.", error=str(e), type=type, url=url
)
return downloaded_files

def download_and_upload_to_s3(self, url, prefix=None, headers=None):
if not prefix:
prefix = str(uuid4())

filename = os.path.basename(url)
destination_key = f"{prefix}/{filename}"

response = requests.get(url, headers=headers)
try:
response.raise_for_status()
except requests.exceptions.HTTPError as e:
logger.error("Failed to download file", error=str(e), url=url)
return

try:
# Upload the file to S3
self.client.put_object(
Body=response.content,
Bucket=self.bucket,
Key=destination_key,
Metadata={
"source_url": url,
},
)
return f"{self.bucket}/{destination_key}"
except Exception as e:
logger.error(
"Failed to upload file",
error=str(e),
bucket=self.bucket,
key=destination_key,
)
6 changes: 5 additions & 1 deletion dags/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,10 @@ def parse_to_ET_element(article):


def parse_without_names_spaces(xml):
it = ET.iterparse(StringIO(xml))
if type(xml) == str:
it = ET.iterparse(StringIO(xml))
else:
it = ET.iterparse(StringIO(xml.getvalue().decode("utf-8")))
for _, el in it:
el.tag = el.tag.rpartition("}")[-1]
root = it.root
Expand Down Expand Up @@ -243,6 +246,7 @@ def process_archive(file_bytes, file_name, **kwargs):
max_tries=5,
)
def create_or_update_article(data):
logger.info("Sending data to the backend", data=data)
backend_url = os.getenv(
"BACKEND_URL", "http://localhost:8000/api/article-workflow-import/"
)
Expand Down
Loading

0 comments on commit 7dd4145

Please sign in to comment.