Skip to content

Commit

Permalink
global: addition of files
Browse files Browse the repository at this point in the history
* Adds `files` key to all publishers.
  • Loading branch information
drjova committed Apr 15, 2024
1 parent 861ae5d commit 7c40d69
Show file tree
Hide file tree
Showing 35 changed files with 410 additions and 1,154 deletions.
17 changes: 1 addition & 16 deletions dags/aps/aps_process_file.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import json

import pendulum
import requests
from airflow.decorators import dag, task
from aps.parser import APSParser
from common.enhancer import Enhancer
from common.enricher import Enricher
from common.exceptions import EmptyOutputFromPreviousTask
from common.utils import create_or_update_article
from jsonschema import validate


def parse_aps(data):
Expand All @@ -25,12 +23,6 @@ def enrich_aps(enhanced_file):
return Enricher()(enhanced_file)


def aps_validate_record(enriched_file):
schema = requests.get(enriched_file["$schema"]).json()
validate(enriched_file, schema)
return enriched_file


@dag(schedule=None, start_date=pendulum.today("UTC").add(days=-1))
def aps_process_file():
@task()
Expand All @@ -51,21 +43,14 @@ def enrich(enhanced_file):
raise EmptyOutputFromPreviousTask("enhance")
return enrich_aps(enhanced_file)

@task()
def validate_record(enriched_file):
if not enriched_file:
raise EmptyOutputFromPreviousTask("enrich")
return aps_validate_record(enriched_file)

@task()
def create_or_update(enriched_file):
create_or_update_article(enriched_file)

parsed_file = parse()
enhanced_file = enhance(parsed_file)
enriched_file = enrich(enhanced_file)
validated_record = validate_record(enriched_file)
create_or_update(validated_record)
create_or_update(enriched_file)


dag_for_aps_files_processing = aps_process_file()
27 changes: 12 additions & 15 deletions dags/aps/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from inspire_utils.record import get_value
from structlog import get_logger

logger = get_logger()


class APSParser(IParser):
def __init__(self) -> None:
Expand Down Expand Up @@ -74,7 +76,10 @@ def __init__(self) -> None:
extraction_function=lambda x: ["HEP", "Citeable", "Published"],
),
CustomExtractor("field_categories", self._get_field_categories),
CustomExtractor("files", self._build_files_data),
CustomExtractor(
destination="files",
extraction_function=self._build_files_data,
),
]

super().__init__(extractors)
Expand Down Expand Up @@ -122,20 +127,12 @@ def _get_field_categories(self, article):

def _build_files_data(self, article):
doi = get_value(article, "identifiers.doi")
return [
{
"url": "http://harvest.aps.org/v2/journals/articles/{0}".format(doi),
"headers": {"Accept": "application/pdf"},
"name": "{0}.pdf".format(doi),
"filetype": "pdf",
},
{
"url": "http://harvest.aps.org/v2/journals/articles/{0}".format(doi),
"headers": {"Accept": "text/xml"},
"name": "{0}.xml".format(doi),
"filetype": "xml",
},
]
files = {
"pdf": f"http://harvest.aps.org/v2/journals/articles/{doi}",
"xml": f"http://harvest.aps.org/v2/journals/articles/{doi}",
}
logger.info("Files data", files=files)
return files

def _get_licenses(self, x):
try:
Expand Down
1 change: 0 additions & 1 deletion dags/common/cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ def remove_unnecessary_fields(obj):
fieldnames = [
"curated",
"citeable",
"files",
"date_published",
"source_file_path",
"local_files",
Expand Down
3 changes: 2 additions & 1 deletion dags/common/enricher.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ def __call__(self, article):
enriched_article = article.copy()
enriched_article.update(
{
"$schema": self._get_schema(),
"arxiv_eprints": [
self._set_categories(eprint)
for eprint in enriched_article.get("arxiv_eprints", [])
Expand All @@ -120,4 +119,6 @@ def __call__(self, article):
)
enriched_article = remove_empty_values(enriched_article)
enriched_article = remove_unnecessary_fields(enriched_article)

self.logger.info("Enriched article.", article=enriched_article)
return enriched_article
27 changes: 7 additions & 20 deletions dags/common/pull_ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def migrate_files(
s_ftp,
repo,
logger: PrintLogger,
process_archives = True,
process_archives=True,
):
logger.msg("Processing files.", filenames=archives_names)
extracted_or_downloaded_filenames = []
Expand Down Expand Up @@ -58,7 +58,6 @@ def migrate_from_ftp(
s_ftp,
repo,
logger,
publisher=None,
**kwargs,
):
params = kwargs["params"]
Expand All @@ -74,10 +73,10 @@ def migrate_from_ftp(
)

if force_pull_all_files:
return _force_pull(s_ftp, repo, logger, publisher, **kwargs)
return _force_pull(s_ftp, repo, logger, **kwargs)
elif force_pull_specific_files:
return _filenames_pull(s_ftp, repo, logger, publisher, **kwargs)
return _differential_pull(s_ftp, repo, logger, publisher, **kwargs)
return _filenames_pull(s_ftp, repo, logger, **kwargs)
return _differential_pull(s_ftp, repo, logger, **kwargs)


def reprocess_files(repo, logger, **kwargs):
Expand All @@ -91,32 +90,24 @@ def _force_pull(
s_ftp,
repo,
logger,
publisher,
**kwargs,
):
logger.msg("Force Pulling from SFTP.")
excluded_directories = kwargs["params"]["excluded_directories"]
filenames = s_ftp.list_files(excluded_directories=excluded_directories)
process_archives = publisher != "elsevier"
return migrate_files(
filenames, s_ftp, repo, logger, process_archives=process_archives
)
return migrate_files(filenames, s_ftp, repo, logger)


def _filenames_pull(
s_ftp,
repo,
logger,
publisher,
**kwargs,
):
filenames_pull_params = kwargs["params"]["filenames_pull"]
filenames = filenames_pull_params["filenames"]
logger.msg("Pulling specified filenames from SFTP")
process_archives = publisher != "elsevier"
return migrate_files(
filenames, s_ftp, repo, logger, process_archives=process_archives
)
return migrate_files(filenames, s_ftp, repo, logger)


def _find_files_in_zip(filenames, repo):
Expand All @@ -137,18 +128,14 @@ def _differential_pull(
s_ftp,
repo,
logger,
publisher,
**kwargs,
):
logger.msg("Pulling missing files only.")
excluded_directories = kwargs["params"]["excluded_directories"]
sftp_files = s_ftp.list_files(excluded_directories=excluded_directories)
s3_files = repo.get_all_raw_filenames()
diff_files = list(filter(lambda x: x not in s3_files, sftp_files))
process_archives = publisher != "elsevier"
return migrate_files(
diff_files, s_ftp, repo, logger, process_archives=process_archives
)
return migrate_files(diff_files, s_ftp, repo, logger)


def trigger_file_processing(
Expand Down
99 changes: 99 additions & 0 deletions dags/common/scoap3_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import os
from uuid import uuid4

import requests
from common.repository import IRepository
from common.s3_service import S3Service
from structlog import get_logger

logger = get_logger()


class Scoap3Repository(IRepository):
def __init__(self):
super().__init__()
self.bucket = os.getenv("SCOAP3_BUCKET_NAME", "scoap3")
self.s3 = S3Service(self.bucket)
self.client = self.s3.meta.client

def copy_file(self, source_bucket, source_key, prefix=None):
if not prefix:
prefix = str(uuid4())

copy_source = {"Bucket": source_bucket, "Key": source_key}
filename = os.path.basename(source_key)
destination_key = f"{prefix}/{filename}"

logger.info("Copying file from", copy_source=copy_source)
self.client.copy(
copy_source,
self.bucket,
destination_key,
ExtraArgs={
"Metadata": {
"source_bucket": source_bucket,
"source_key": source_key,
},
"MetadataDirective": "REPLACE",
},
)
logger.info(
f"Copied file from {source_bucket}/{source_key} to {self.bucket}/{destination_key}"
)
return f"{self.bucket}/{destination_key}"

def copy_files(self, bucket, files, prefix=None):
copied_files = {}
for type, path in files.items():
try:
copied_files[type] = self.copy_file(bucket, path, prefix=prefix)
except Exception as e:
logger.error("Failed to copy file.", error=str(e), type=type, path=path)
return copied_files

def download_files(self, files, prefix=None):
downloaded_files = {}
for type, url in files.items():
try:
downloaded_files[type] = self.download_and_upload_to_s3(
url, prefix=prefix
)
logger.info("Downloaded file", type=type, url=url)
except Exception as e:
logger.error(
"Failed to download file.", error=str(e), type=type, url=url
)
return downloaded_files

def download_and_upload_to_s3(self, url, prefix=None):
if not prefix:
prefix = str(uuid4())

filename = os.path.basename(url)
destination_key = f"{prefix}/{filename}"

response = requests.get(url)
try:
response.raise_for_status()
except requests.exceptions.HTTPError as e:
logger.error("Failed to download file", error=str(e), url=url)
return

try:
# Upload the file to S3
self.client.put_object(
Body=response.content,
Bucket=self.bucket,
Key=destination_key,
Metadata={
"source_url": url,
},
)
return f"{self.bucket}/{destination_key}"
except Exception as e:
logger.error(
"Failed to upload file",
error=str(e),
bucket=self.bucket,
key=destination_key,
)
6 changes: 5 additions & 1 deletion dags/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,10 @@ def parse_to_ET_element(article):


def parse_without_names_spaces(xml):
it = ET.iterparse(StringIO(xml))
if type(xml) == str:
it = ET.iterparse(StringIO(xml))
else:
it = ET.iterparse(StringIO(xml.getvalue().decode("utf-8")))
for _, el in it:
el.tag = el.tag.rpartition("}")[-1]
root = it.root
Expand Down Expand Up @@ -243,6 +246,7 @@ def process_archive(file_bytes, file_name, **kwargs):
max_tries=5,
)
def create_or_update_article(data):
logger.info("Sending data to the backend", data=data)
backend_url = os.getenv(
"BACKEND_URL", "http://localhost:8000/api/article-workflow-import/"
)
Expand Down
Loading

0 comments on commit 7c40d69

Please sign in to comment.