Skip to content

Commit

Permalink
workflows: add hepdata harvesting
Browse files Browse the repository at this point in the history
reworked httphooks to make more easy to use

* ref: cern-sis/issues-inspire/issues/594
  • Loading branch information
DonHaul committed Dec 10, 2024
1 parent e21b157 commit c714c31
Show file tree
Hide file tree
Showing 19 changed files with 6,209 additions and 99 deletions.
5 changes: 4 additions & 1 deletion backend/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 6 additions & 7 deletions ui/src/common/assets/bluesky.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
12 changes: 4 additions & 8 deletions ui/src/common/assets/mastodon.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ exports[`NewsAndUpdates renders 1`] = `
<Col
className="mt5"
>
<Space
size="middle"
<Space
size="middle"
>
Expand All @@ -55,6 +57,7 @@ exports[`NewsAndUpdates renders 1`] = `
}
style={
Object {
"color": "#1285FC",
"fontSize": "25px",
}
}
Expand Down
Empty file added workflows/dags/data/__init__.py
Empty file.
201 changes: 201 additions & 0 deletions workflows/dags/data/data_harvest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
import datetime
import logging
from datetime import timedelta

from airflow.decorators import dag, task, task_group
from airflow.models import Variable
from hooks.generic_http_hook import GenericHttpHook
from hooks.inspirehep.inspire_http_record_management_hook import (
InspireHTTPRecordManagementHook,
)

logger = logging.getLogger(__name__)


@dag(
start_date=datetime.datetime(2024, 11, 28),
schedule="@daily",
catchup=False,
tags=["data"],
)
def data_harvest_dag():
"""
Initialize a DAG for data harvest workflow.
"""
generic_http_hook = GenericHttpHook(http_conn_id="hepdata_connection")
inspire_http_record_management_hook = InspireHTTPRecordManagementHook()

data_schema = Variable.get("data_schema")

@task(task_id="collect_ids")
def collect_ids():
"""Collects the ids of the records that have been updated in the last two days.
Returns: list of ids
"""

from_date = (datetime.datetime.now().date() - timedelta(days=2)).strftime(
"%Y-%m-%d"
)
# http sensor
payload = {"inspire_ids": True, "last_updated": from_date, "sort_by": "latest"}
hepdata_response = generic_http_hook.call_api(
endpoint="/search/ids", method="GET", params=payload
)

return hepdata_response.json()

@task_group
def process_record(record_id):
"""
Process the record by downloading the versions,
building the record and loading it to inspirehep.
"""

@task
def download_record_versions(id):
"""
Download the versions of the record.
Args: id (int): The id of the record.
Returns: dict: The record versions.
"""

hepdata_response = generic_http_hook.call_api(
endpoint=f"/record/ins{id}?format=json"
)
hepdata_response.raise_for_status()
payload = hepdata_response.json()

record = {"base": payload}
for version in range(1, payload["record"]["version"]):
response = generic_http_hook.call_api(
endpoint=f"/record/ins{id}?format=json&version={version}"
)
record[version] = response.json()

return record

@task.virtualenv(
requirements=["inspire-schemas"],
system_site_packages=False,
)
def build_record(data_schema, payload):
"""Build the record from the payload.
Args: data_schema (str): The schema of the data.
payload (dict): The payload of the record.
Returns: dict: The built record.
"""
import datetime
import re

from inspire_schemas.builders import DataBuilder

def add_version_specific_dois(record, builder):
"""
Add dois to the record.
"""
for data_table in record["data_tables"]:
builder.add_doi(data_table["doi"], material="part")
for resource_with_doi in record["resources_with_doi"]:
builder.add_doi(resource_with_doi["doi"], material="part")

print("adding doi", record["record"]["hepdata_doi"])
builder.add_doi(record["record"]["hepdata_doi"], material="version")

def add_keywords(record, builder):
"""
Add keywords to the record.
"""
for keyword, item in record.get("data_keywords", {}).items():
if keyword == "cmenergies":
builder.add_keyword(
f"{keyword}: {item[0]['lte']}-{item[0]['gte']}"
)
elif keyword == "observables":
builder.add_keyword(f"{keyword}: {','.join(item)}")
else:
for value in item:
builder.add_keyword(value)

builder = DataBuilder(source="hepdata")

builder.add_creation_date(datetime.datetime.now(datetime.UTC).isoformat())

base_record = payload["base"]

for collaboration in base_record["record"]["collaborations"]:
builder.add_collaboration(collaboration)

builder.add_abstract(base_record["record"]["data_abstract"])

add_keywords(base_record["record"], builder)

builder.add_literature(
doi=base_record["record"]["doi"],
record={
"$ref": f"https://inspirehep.net/literature/{base_record['record']['inspire_id']}"
},
)

for resource in base_record["record"]["resources"]:
if resource["url"].startswith(
"https://www.hepdata.net/record/resource/"
):
continue
builder.add_url(resource["url"], description=resource["description"])

builder.add_title(base_record["record"]["title"])

builder.add_acquisition_source(
method="hepdata_harvest",
submission_number=base_record["record"]["inspire_id"],
datetime=datetime.datetime.now(datetime.UTC).isoformat(),
)

mtc = re.match(r"(.*?)\.v\d+", base_record["record"]["hepdata_doi"])
if mtc:
builder.add_doi(doi=mtc.group(1), material="data")
else:
builder.add_doi(
doi=base_record["record"]["hepdata_doi"], material="data"
)

for _, record_version in payload.items():
add_version_specific_dois(record_version, builder)

data = builder.record
data["$schema"] = data_schema
return data

@task
def load_record(record):
"""
Load the record to inspirehep.
"""

response = inspire_http_record_management_hook.get_record(
pid_type="data", pid_value=record["inspire_id"]
)
if response.ok:
response = inspire_http_record_management_hook.put_record(
data=record, pid_type="data", pid_value=record["inspire_id"]
)
response.raise_for_status()
return

response = inspire_http_record_management_hook.post_record(
data=record, pid_type="data"
)
response.raise_for_status()

hepdata_record_versions = download_record_versions(record_id)
record = build_record(data_schema=data_schema, payload=hepdata_record_versions)
load_record(record)

process_record.expand(record_id=collect_ids())


data_harvest_dag()
37 changes: 2 additions & 35 deletions workflows/plugins/hooks/backoffice/base.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
import requests
from airflow.models import Variable
from airflow.providers.http.hooks.http import HttpHook
from hooks.tenacity_config import tenacity_retry_kwargs
from requests import Response
from hooks.generic_http_hook import GenericHttpHook


class BackofficeHook(HttpHook):
class BackofficeHook(GenericHttpHook):
"""
A hook to update the status of a workflow in the backoffice system.
Expand All @@ -28,33 +25,3 @@ def __init__(
"Accept": "application/json",
"Content-Type": "application/json",
}

@property
def tenacity_retry_kwargs(self) -> dict:
return tenacity_retry_kwargs()

def run(
self,
endpoint: str,
method: str = None,
data: dict = None,
headers: dict = None,
params: dict = None,
extra_options: dict = None,
) -> Response:
extra_options = extra_options or {}
headers = headers or self.headers
method = method or self.method

session = self.get_conn(headers)

if not self.base_url.endswith("/") and not endpoint.startswith("/"):
url = self.base_url + "/" + endpoint
else:
url = self.base_url + endpoint

req = requests.Request(method, url, json=data, headers=headers, params=params)

prepped_request = session.prepare_request(req)
self.log.info("Sending '%s' to url: %s", method, url)
return self.run_and_check(session, prepped_request, extra_options)
81 changes: 81 additions & 0 deletions workflows/plugins/hooks/generic_http_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import logging

import requests
from airflow.providers.http.hooks.http import HttpHook
from hooks.tenacity_config import tenacity_retry_kwargs
from requests import Response

logger = logging.getLogger()


class GenericHttpHook(HttpHook):
"""
Hook to interact with Inspire API
It overrides the original `run` method in HttpHook so that
we can pass data argument as data, not params
"""

def __init__(self, http_conn_id, method="GET", headers=None):
self._headers = headers
super().__init__(method=method, http_conn_id=http_conn_id)

@property
def tenacity_retry_kwargs(self) -> dict:
return tenacity_retry_kwargs()

@property
def headers(self) -> dict:
return self._headers

@headers.setter
def headers(self, headers):
self._headers = headers

def run(
self,
endpoint: str,
method: str = None,
json: dict = None,
data: dict = None,
params: dict = None,
headers: dict = None,
extra_options: dict = None,
):
extra_options = extra_options or {}
method = method or self.method
headers = headers or self.headers
session = self.get_conn(headers)

if not self.base_url.endswith("/") and not endpoint.startswith("/"):
url = self.base_url + "/" + endpoint
else:
url = self.base_url + endpoint

req = requests.Request(
method, url, json=json, data=data, params=params, headers=headers
)

prepped_request = session.prepare_request(req)
self.log.info("Sending '%s' to url: %s", method, url)
return self.run_and_check(session, prepped_request, extra_options)

def call_api(
self,
endpoint: str,
method: str = None,
data: dict = None,
params: dict = None,
headers: dict = None,
) -> Response:
return self.run_with_advanced_retry(
_retry_args=self.tenacity_retry_kwargs,
endpoint=endpoint,
headers=headers,
json=data,
params=params,
method=method,
)

def get_url(self) -> str:
self.get_conn()
return self.base_url
Loading

0 comments on commit c714c31

Please sign in to comment.