Skip to content

Commit

Permalink
workflows: add hepdata harvesting
Browse files Browse the repository at this point in the history
reworked httphooks to make more easy to use

* ref: cern-sis/issues-inspire/issues/594
  • Loading branch information
DonHaul committed Dec 10, 2024
1 parent e21b157 commit fe673bc
Show file tree
Hide file tree
Showing 16 changed files with 512 additions and 99 deletions.
5 changes: 4 additions & 1 deletion backend/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 6 additions & 7 deletions ui/src/common/assets/bluesky.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
12 changes: 4 additions & 8 deletions ui/src/common/assets/mastodon.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ exports[`NewsAndUpdates renders 1`] = `
<Col
className="mt5"
>
<Space
size="middle"
<Space
size="middle"
>
Expand All @@ -55,6 +57,7 @@ exports[`NewsAndUpdates renders 1`] = `
}
style={
Object {
"color": "#1285FC",
"fontSize": "25px",
}
}
Expand Down
Empty file added workflows/dags/data/__init__.py
Empty file.
74 changes: 74 additions & 0 deletions workflows/dags/data/data_harvest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import datetime
import logging
from datetime import timedelta

from airflow.decorators import dag, task
from airflow.models import Variable
from hooks.generic_http_hook import GenericHttpHook
from hooks.inspirehep.inspire_http_record_management_hook import (
InspireHTTPRecordManagementHook,
)

logger = logging.getLogger(__name__)


@dag(
start_date=datetime.datetime(2024, 11, 28),
schedule="@daily",
catchup=False,
tags=["data"],
)
def data_harvest_dag():
"""
Initialize a DAG for data harvest workflow.
"""
generic_http_hook = GenericHttpHook(http_conn_id="hepdata_connection")
inspire_http_record_management_hook = InspireHTTPRecordManagementHook()

data_schema = Variable.get("data_schema")

@task
def collect_ids():
from_date = (datetime.datetime.now().date() - timedelta(days=1)).strftime(
"%Y-%m-%d"
)
# http sensor
payload = {"inspire_ids": True, "last_updated": from_date, "sort_by": "latest"}
hepdata_response = generic_http_hook.call_api(
endpoint="/search/ids", method="GET", params=payload
)

return hepdata_response.json()

@task(map_index_template="{{id}}")
def download_record(id):
hepdata_response = generic_http_hook.call_api(
endpoint=f"/record/ins{id}?format=json", method="GET"
)
return hepdata_response.json()

@task.virtualenv(requirements=["inspire-schemas"], system_site_packages=False)
def transform_record(data_schema, record):
from inspire_schemas.builders import LiteratureBuilder

builder = LiteratureBuilder()

data = builder.record
data["$schema"] = data_schema
data.update({"_collections": ["Data"]}) # to delete

return data

@task
def load_record(record):
inspire_http_record_management_hook.post_record(data=record, pid_type="data")

ids = collect_ids()
records = download_record.expand(id=ids)
built_records = transform_record.partial(data_schema=data_schema).expand(
record=records
)
load_record.expand(record=built_records)


data_harvest_dag()
37 changes: 2 additions & 35 deletions workflows/plugins/hooks/backoffice/base.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
import requests
from airflow.models import Variable
from airflow.providers.http.hooks.http import HttpHook
from hooks.tenacity_config import tenacity_retry_kwargs
from requests import Response
from hooks.generic_http_hook import GenericHttpHook


class BackofficeHook(HttpHook):
class BackofficeHook(GenericHttpHook):
"""
A hook to update the status of a workflow in the backoffice system.
Expand All @@ -28,33 +25,3 @@ def __init__(
"Accept": "application/json",
"Content-Type": "application/json",
}

@property
def tenacity_retry_kwargs(self) -> dict:
return tenacity_retry_kwargs()

def run(
self,
endpoint: str,
method: str = None,
data: dict = None,
headers: dict = None,
params: dict = None,
extra_options: dict = None,
) -> Response:
extra_options = extra_options or {}
headers = headers or self.headers
method = method or self.method

session = self.get_conn(headers)

if not self.base_url.endswith("/") and not endpoint.startswith("/"):
url = self.base_url + "/" + endpoint
else:
url = self.base_url + endpoint

req = requests.Request(method, url, json=data, headers=headers, params=params)

prepped_request = session.prepare_request(req)
self.log.info("Sending '%s' to url: %s", method, url)
return self.run_and_check(session, prepped_request, extra_options)
81 changes: 81 additions & 0 deletions workflows/plugins/hooks/generic_http_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import logging

import requests
from airflow.providers.http.hooks.http import HttpHook
from hooks.tenacity_config import tenacity_retry_kwargs
from requests import Response

logger = logging.getLogger()


class GenericHttpHook(HttpHook):
"""
Hook to interact with Inspire API
It overrides the original `run` method in HttpHook so that
we can pass data argument as data, not params
"""

def __init__(self, http_conn_id, method="GET", headers=None):
self._headers = headers
super().__init__(method=method, http_conn_id=http_conn_id)

@property
def tenacity_retry_kwargs(self) -> dict:
return tenacity_retry_kwargs()

@property
def headers(self) -> dict:
return self._headers

@headers.setter
def headers(self, headers):
self._headers = headers

def run(
self,
endpoint: str,
method: str = None,
json: dict = None,
data: dict = None,
params: dict = None,
headers: dict = None,
extra_options: dict = None,
):
extra_options = extra_options or {}
method = method or self.method
headers = headers or self.headers
session = self.get_conn(headers)

if not self.base_url.endswith("/") and not endpoint.startswith("/"):
url = self.base_url + "/" + endpoint
else:
url = self.base_url + endpoint

req = requests.Request(
method, url, json=json, data=data, params=params, headers=headers
)

prepped_request = session.prepare_request(req)
self.log.info("Sending '%s' to url: %s", method, url)
return self.run_and_check(session, prepped_request, extra_options)

def call_api(
self,
method: str,
endpoint: str,
data: dict = None,
params: dict = None,
headers: dict = None,
) -> Response:
return self.run_with_advanced_retry(
_retry_args=self.tenacity_retry_kwargs,
endpoint=endpoint,
headers=headers,
json=data,
params=params,
method=method,
)

def get_url(self) -> str:
self.get_conn()
return self.base_url
49 changes: 2 additions & 47 deletions workflows/plugins/hooks/inspirehep/inspire_http_hook.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import logging

import requests
from airflow.models import Variable
from airflow.providers.http.hooks.http import HttpHook
from hooks.tenacity_config import tenacity_retry_kwargs
from requests import Response
from hooks.generic_http_hook import GenericHttpHook

logger = logging.getLogger()

Expand All @@ -14,7 +11,7 @@
AUTHOR_UPDATE_FUNCTIONAL_CATEGORY = "Author updates"


class InspireHttpHook(HttpHook):
class InspireHttpHook(GenericHttpHook):
"""
Hook to interact with Inspire API
It overrides the original `run` method in HttpHook so that
Expand All @@ -24,58 +21,17 @@ class InspireHttpHook(HttpHook):
def __init__(self, method="GET", http_conn_id="inspire_connection"):
super().__init__(method=method, http_conn_id=http_conn_id)

@property
def tenacity_retry_kwargs(self) -> dict:
return tenacity_retry_kwargs()

@property
def headers(self) -> dict:
return {
"Authorization": f'Bearer {Variable.get("inspire_token")}',
"Accept": "application/vnd+inspire.record.raw+json",
}

def run(
self,
endpoint: str,
method: str = None,
json: dict = None,
data: dict = None,
headers: dict = None,
extra_options: dict = None,
):
extra_options = extra_options or {}
method = method or self.method
session = self.get_conn(headers)

if not self.base_url.endswith("/") and not endpoint.startswith("/"):
url = self.base_url + "/" + endpoint
else:
url = self.base_url + endpoint

req = requests.Request(method, url, json=json, data=data, headers=headers)

prepped_request = session.prepare_request(req)
self.log.info("Sending '%s' to url: %s", method, url)
return self.run_and_check(session, prepped_request, extra_options)

def call_api(self, method: str, endpoint: str, data: dict) -> Response:
return self.run_with_advanced_retry(
_retry_args=self.tenacity_retry_kwargs,
endpoint=endpoint,
headers=self.headers,
json=data,
method=method,
)

def get_backoffice_url(self, workflow_id: str) -> str:
self.get_conn()
return f"{self.base_url}/backoffice/{workflow_id}"

def get_url(self) -> str:
self.get_conn()
return self.base_url

def create_ticket(
self, functional_category, template_name, subject, email, template_context
):
Expand Down Expand Up @@ -120,6 +76,5 @@ def close_ticket(self, ticket_id, template=None, template_context=None):
)

logging.info(f"Closing ticket {ticket_id}")
print(request_data)

return self.call_api(endpoint=endpoint, data=request_data, method="POST")
1 change: 1 addition & 0 deletions workflows/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
apache-airflow==2.9.3
inspire_utils==3.0.61
10 changes: 10 additions & 0 deletions workflows/scripts/connections/connections.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,16 @@
"schema": "",
"extra": ""
},
"hepdata_connection": {
"conn_type": "http",
"description": "Used for data harvesting",
"login": "",
"password": null,
"host": "https://www.hepdata.net",
"port": null,
"schema": "",
"extra": ""
},
"inspire_db_connection": {
"conn_type": "postgres",
"description": "",
Expand Down
3 changes: 2 additions & 1 deletion workflows/scripts/variables/variables.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"backoffice_token": "2e04111a61e8f5ba6ecec52af21bbb9e81732085",
"inspire_token": "CHANGE_ME",
"author_schema": "https://inspirehep.net/schemas/records/authors.json"
"author_schema": "https://inspirehep.net/schemas/records/authors.json",
"data_schema": "https://inspirehep.net/schemas/records/data.json"
}
Loading

0 comments on commit fe673bc

Please sign in to comment.