Skip to content

Commit

Permalink
workflows: add hepdata harvesting
Browse files Browse the repository at this point in the history
reworked httphooks to make more easy to use

* ref: cern-sis/issues-inspire/issues/594
  • Loading branch information
DonHaul committed Dec 11, 2024
1 parent 207a527 commit 6299860
Show file tree
Hide file tree
Showing 18 changed files with 6,632 additions and 85 deletions.
3 changes: 2 additions & 1 deletion backend/inspirehep/pidstore/api/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

from inspirehep.pidstore.api.base import PidStoreBase
from inspirehep.pidstore.minters.control_number import DataMinter
from inspirehep.pidstore.minters.doi import DoiMinter


class PidStoreData(PidStoreBase):
minters = [DataMinter]
minters = [DataMinter, DoiMinter]
Empty file added workflows/dags/data/__init__.py
Empty file.
227 changes: 227 additions & 0 deletions workflows/dags/data/data_harvest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
import datetime
import logging
from datetime import timedelta

from airflow.decorators import dag, task, task_group
from airflow.models import Variable
from hooks.generic_http_hook import GenericHttpHook
from hooks.inspirehep.inspire_http_record_management_hook import (
InspireHTTPRecordManagementHook,
)
from tenacity import RetryError

logger = logging.getLogger(__name__)


@dag(
start_date=datetime.datetime(2024, 11, 28),
schedule="@daily",
catchup=False,
tags=["data"],
)
def data_harvest_dag():
"""
Initialize a DAG for data harvest workflow.
"""
generic_http_hook = GenericHttpHook(http_conn_id="hepdata_connection")
inspire_http_record_management_hook = InspireHTTPRecordManagementHook()

data_schema = Variable.get("data_schema")

@task(task_id="collect_ids")
def collect_ids():
"""Collects the ids of the records that have been updated in the last two days.
Returns: list of ids
"""

from_date = (datetime.datetime.now().date() - timedelta(days=1)).strftime(
"%Y-%m-%d"
)
# http sensor
payload = {"inspire_ids": True, "last_updated": from_date, "sort_by": "latest"}
hepdata_response = generic_http_hook.call_api(
endpoint="/search/ids", method="GET", params=payload
)

return hepdata_response.json()

@task_group
def process_record(record_id):
"""
Process the record by downloading the versions,
building the record and loading it to inspirehep.
"""

@task(task_concurrency=5)
def download_record_versions(id):
"""
Download the versions of the record.
Args: id (int): The id of the record.
Returns: dict: The record versions.
"""

hepdata_response = generic_http_hook.call_api(
endpoint=f"/record/ins{id}?format=json"
)
hepdata_response.raise_for_status()
payload = hepdata_response.json()

record = {"base": payload}
for version in range(1, payload["record"]["version"]):
response = generic_http_hook.call_api(
endpoint=f"/record/ins{id}?format=json&version={version}"
)
record[version] = response.json()

return record

@task.virtualenv(
requirements=["inspire-schemas"],
system_site_packages=False,
map_index_template="{{params}}",
)
def build_record(data_schema, payload, **context):
"""Build the record from the payload.
Args: data_schema (str): The schema of the data.
payload (dict): The payload of the record.
Returns: dict: The built record.
"""
import datetime
import re

from inspire_schemas.builders import DataBuilder

print("wow")

context["conf"] = {"wow": "wow1"}
context["params"] = {"wow": "wow2"}
print(context["params"])

def add_version_specific_dois(record, builder):
"""
Add dois to the record.
"""
for data_table in record["data_tables"]:
builder.add_doi(data_table["doi"], material="part")
for resource_with_doi in record["resources_with_doi"]:
builder.add_doi(resource_with_doi["doi"], material="part")

builder.add_doi(record["record"]["hepdata_doi"], material="version")

def add_keywords(record, builder):
"""
Add keywords to the record.
"""
for keyword, item in record.get("data_keywords", {}).items():
if keyword == "cmenergies":
if len(item) > 1 and "lte" in item[0] and "gte" in item[0]:
builder.add_keyword(
f"{keyword}: {item[0]['lte']}-{item[0]['gte']}"
)
elif keyword == "observables":
builder.add_keyword(f"{keyword}: {','.join(item)}")
else:
for value in item:
builder.add_keyword(value)

builder = DataBuilder(source="hepdata")

builder.add_creation_date(datetime.datetime.now(datetime.UTC).isoformat())

base_record = payload["base"]

for collaboration in base_record["record"]["collaborations"]:
builder.add_collaboration(collaboration)

builder.add_abstract(base_record["record"]["data_abstract"])

add_keywords(base_record["record"], builder)

doi = base_record["record"].get("doi")

if doi:
builder.add_literature(
doi={"value": doi},
record={
"$ref": f"https://inspirehep.net/api/literature/{base_record['record']['inspire_id']}"
},
)
else:
builder.add_literature(
record={
"$ref": f"https://inspirehep.net/api/literature/{base_record['record']['inspire_id']}"
},
)

for resource in base_record["record"]["resources"]:
if resource["url"].startswith(
"https://www.hepdata.net/record/resource/"
):
continue
builder.add_url(resource["url"], description=resource["description"])

builder.add_title(base_record["record"]["title"])

builder.add_acquisition_source(
method="hepcrawl",
submission_number=base_record["record"]["inspire_id"],
datetime=datetime.datetime.now(datetime.UTC).isoformat(),
)

mtc = re.match(r"(.*?)\.v\d+", base_record["record"]["hepdata_doi"])
if mtc:
builder.add_doi(doi=mtc.group(1), material="data")
else:
builder.add_doi(
doi=base_record["record"]["hepdata_doi"], material="data"
)

for _, record_version in payload.items():
add_version_specific_dois(record_version, builder)

data = builder.record
data["$schema"] = data_schema
return data

@task
def load_record(new_record):
"""
Load the record to inspirehep.
"""

try:
response = inspire_http_record_management_hook.get_record(
pid_type="doi", control_number=new_record["dois"][0]["value"]
)
except RetryError:
logger.info("Creating Record")
post_response = inspire_http_record_management_hook.post_record(
data=new_record, pid_type="data"
)
post_response.raise_for_status()
return post_response.json()

old_record = response["metadata"]
revision_id = response.get("revision_id", 0)
old_record.update(new_record)
logger.info(f"Updating Record: {old_record['control_number']}")
response = inspire_http_record_management_hook.update_record(
data=old_record,
pid_type="data",
control_number=old_record["control_number"],
revision_id=revision_id + 1,
)
return response.json()

hepdata_record_versions = download_record_versions(record_id)
record = build_record(data_schema=data_schema, payload=hepdata_record_versions)
load_record(record)

process_record.expand(record_id=collect_ids())


data_harvest_dag()
37 changes: 2 additions & 35 deletions workflows/plugins/hooks/backoffice/base.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
import requests
from airflow.models import Variable
from airflow.providers.http.hooks.http import HttpHook
from hooks.tenacity_config import tenacity_retry_kwargs
from requests import Response
from hooks.generic_http_hook import GenericHttpHook


class BackofficeHook(HttpHook):
class BackofficeHook(GenericHttpHook):
"""
A hook to update the status of a workflow in the backoffice system.
Expand All @@ -28,33 +25,3 @@ def __init__(
"Accept": "application/json",
"Content-Type": "application/json",
}

@property
def tenacity_retry_kwargs(self) -> dict:
return tenacity_retry_kwargs()

def run(
self,
endpoint: str,
method: str = None,
data: dict = None,
headers: dict = None,
params: dict = None,
extra_options: dict = None,
) -> Response:
extra_options = extra_options or {}
headers = headers or self.headers
method = method or self.method

session = self.get_conn(headers)

if not self.base_url.endswith("/") and not endpoint.startswith("/"):
url = self.base_url + "/" + endpoint
else:
url = self.base_url + endpoint

req = requests.Request(method, url, json=data, headers=headers, params=params)

prepped_request = session.prepare_request(req)
self.log.info("Sending '%s' to url: %s", method, url)
return self.run_and_check(session, prepped_request, extra_options)
81 changes: 81 additions & 0 deletions workflows/plugins/hooks/generic_http_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import logging

import requests
from airflow.providers.http.hooks.http import HttpHook
from hooks.tenacity_config import tenacity_retry_kwargs
from requests import Response

logger = logging.getLogger()


class GenericHttpHook(HttpHook):
"""
Hook to interact with Inspire API
It overrides the original `run` method in HttpHook so that
we can pass data argument as data, not params
"""

def __init__(self, http_conn_id, method="GET", headers=None):
self._headers = headers
super().__init__(method=method, http_conn_id=http_conn_id)

@property
def tenacity_retry_kwargs(self) -> dict:
return tenacity_retry_kwargs()

@property
def headers(self) -> dict:
return self._headers

@headers.setter
def headers(self, headers):
self._headers = headers

def run(
self,
endpoint: str,
method: str = None,
json: dict = None,
data: dict = None,
params: dict = None,
headers: dict = None,
extra_options: dict = None,
):
extra_options = extra_options or {}
method = method or self.method
headers = headers or self.headers
session = self.get_conn(headers)

if not self.base_url.endswith("/") and not endpoint.startswith("/"):
url = self.base_url + "/" + endpoint
else:
url = self.base_url + endpoint

req = requests.Request(
method, url, json=json, data=data, params=params, headers=headers
)

prepped_request = session.prepare_request(req)
self.log.info("Sending '%s' to url: %s", method, url)
return self.run_and_check(session, prepped_request, extra_options)

def call_api(
self,
endpoint: str,
method: str = None,
data: dict = None,
params: dict = None,
headers: dict = None,
) -> Response:
return self.run_with_advanced_retry(
_retry_args=self.tenacity_retry_kwargs,
endpoint=endpoint,
headers=headers,
json=data,
params=params,
method=method,
)

def get_url(self) -> str:
self.get_conn()
return self.base_url
Loading

0 comments on commit 6299860

Please sign in to comment.