Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for targetResourceId in SQS Message #160

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,17 @@
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup

from ils_middleware.tasks.amazon.s3 import get_from_s3, send_to_s3
from ils_middleware.tasks.amazon.sqs import SubscribeOperator, parse_messages

from ils_middleware.tasks.sinopia.local_metadata import new_local_admin_metadata
from ils_middleware.tasks.sinopia.email import (
notify_and_log,
send_notification_emails,
)

from ils_middleware.tasks.sinopia.login import sinopia_login
from ils_middleware.tasks.sinopia.metadata_check import existing_metadata_check
from ils_middleware.tasks.sinopia.rdf2marc import Rdf2Marc
from ils_middleware.tasks.symphony.login import SymphonyLogin
from ils_middleware.tasks.symphony.new import NewMARCtoSymphony
from ils_middleware.tasks.symphony.mod_json import to_symphony_json
from ils_middleware.tasks.symphony.overlay import overlay_marc_in_symphony

# from ils_middleware.tasks.sinopia.metadata_check import existing_metadata_check
from ils_middleware.tasks.folio.build import build_records
from ils_middleware.tasks.folio.login import FolioLogin
from ils_middleware.tasks.folio.graph import construct_graph
Expand Down Expand Up @@ -50,12 +46,12 @@ def dag_failure_callback(ctx_dict) -> None:
}

with DAG(
"stanford",
"stanford_folio",
default_args=default_args,
description="Stanford Symphony and FOLIO DAG",
description="Stanford FOLIO DAG",
schedule_interval=timedelta(minutes=5),
start_date=datetime(2021, 8, 24),
tags=["symphony", "folio"],
tags=["folio", "stanford"],
catchup=False,
on_failure_callback=dag_failure_callback,
) as dag:
Expand All @@ -64,100 +60,13 @@ def dag_failure_callback(ctx_dict) -> None:
# deletes the message from the SQS queue. If deletion of messages fails an AirflowException is thrown otherwise, the
# message is pushed through XCom with the key 'messages'."
# https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_api/airflow/providers/amazon/aws/sensors/sqs/index.html
listen_sns = SubscribeOperator(queue="stanford-ils")
listen_sns = SubscribeOperator(queue="stanford-FOLIO")

process_message = PythonOperator(
task_id="sqs-message-parse",
python_callable=parse_messages,
)

with TaskGroup(group_id="process_symphony") as symphony_task_group:
run_rdf2marc = PythonOperator(
task_id="rdf2marc",
python_callable=Rdf2Marc,
op_kwargs={
"rdf2marc_lambda": Variable.get("rdf2marc_lambda"),
"s3_bucket": Variable.get("marc_s3_bucket"),
},
)

download_marc = PythonOperator(
task_id="download_marc",
python_callable=get_from_s3,
)

export_marc_json = PythonOperator(
task_id="marc_json_to_s3",
python_callable=send_to_s3,
)

convert_to_symphony_json = PythonOperator(
task_id="convert_to_symphony_json",
python_callable=to_symphony_json,
)

# Symphony Dev Server Settings
library_key = "GREEN"
home_location = "STACKS"
symphony_app_id = Variable.get("symphony_app_id")
symphony_client_id = "SymWSStaffClient"
symphony_conn_id = "stanford_symphony_connection"
# This could be mapped from the Instance RDF template
symphony_item_type = "STKS-MONO"

symphony_login = PythonOperator(
task_id="symphony-login",
python_callable=SymphonyLogin,
op_kwargs={
"app_id": symphony_app_id,
"client_id": symphony_client_id,
"conn_id": symphony_conn_id,
"url": Variable.get("stanford_symphony_auth_url"),
"login": Variable.get("stanford_symphony_login"),
"password": Variable.get("stanford_symphony_password"),
},
)

new_or_overlay = PythonOperator(
task_id="new-or-overlay",
python_callable=existing_metadata_check,
)

symphony_add_record = PythonOperator(
task_id="post_new_symphony",
python_callable=NewMARCtoSymphony,
op_kwargs={
"app_id": symphony_app_id,
"client_id": symphony_client_id,
"conn_id": symphony_conn_id,
"home_location": home_location,
"item_type": symphony_item_type,
"library_key": library_key,
"token": "{{ task_instance.xcom_pull(key='return_value', task_ids='process_symphony.symphony-login')}}",
},
)

symphony_overlay_record = PythonOperator(
task_id="post_overlay_symphony",
python_callable=overlay_marc_in_symphony,
op_kwargs={
"app_id": symphony_app_id,
"client_id": symphony_client_id,
"conn_id": symphony_conn_id,
"token": "{{ task_instance.xcom_pull(key='return_value', task_ids='process_symphony.symphony-login')}}",
},
)

(
run_rdf2marc
>> download_marc
>> export_marc_json
>> convert_to_symphony_json
>> symphony_login
>> new_or_overlay
>> [symphony_add_record, symphony_overlay_record]
)

with TaskGroup(group_id="process_folio") as folio_task_group:
folio_login = PythonOperator(
task_id="folio-login",
Expand Down Expand Up @@ -265,7 +174,7 @@ def dag_failure_callback(ctx_dict) -> None:

listen_sns >> [messages_received, messages_timeout]
messages_received >> process_message
process_message >> [symphony_task_group, folio_task_group] >> processed_sinopia
process_message >> folio_task_group >> processed_sinopia
processed_sinopia >> sinopia_update_group >> notify_sinopia_updated
notify_sinopia_updated >> processing_complete
messages_timeout >> processing_complete
2 changes: 1 addition & 1 deletion ils_middleware/tasks/amazon/sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ def parse_messages(**kwargs) -> str:
"""Parses SQS Message Body into constituent part."""
task_instance = kwargs["task_instance"]
raw_sqs_messages = task_instance.xcom_pull(key="messages", task_ids="sqs-sensor")

resources = []
resources_with_errors = []
for message in raw_sqs_messages:
Expand All @@ -54,6 +53,7 @@ def parse_messages(**kwargs) -> str:
"target": message_body["target"],
"resource_uri": resource_uri,
"resource": get_resource(resource_uri),
"target_resource_id": message_body.get("targetResourceId"),
},
)
except KeyError:
Expand Down
27 changes: 25 additions & 2 deletions ils_middleware/tasks/folio/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import datetime
import logging

import requests

from folio_uuid import FOLIONamespaces, FolioUUID
from folioclient.FolioClient import FolioClient

Expand All @@ -16,7 +18,6 @@
def _default_transform(**kwargs) -> tuple:
folio_field = kwargs["folio_field"]
values = kwargs.get("values", [])
logger.debug(f"field: {folio_field} values: {values} type: {type(values)}")
return folio_field, values


Expand Down Expand Up @@ -118,6 +119,28 @@ def _instance_type_id(**kwargs) -> tuple:
return "instanceTypeId", ident


def _folio_hrid(folio_client: FolioClient) -> str:
"""Queries for instance hrid, increments, and saves back to folio"""
endpoint = "/hrid-settings-storage/hrid-settings"
hrid_settings = folio_client.folio_get_single_object(endpoint)
instance_count = hrid_settings["instances"]["startNumber"]
new_instance_count = instance_count + 1
if hrid_settings["commonRetainLeadingZeroes"]:
number = str(new_instance_count).zfill(11)
else:
number = new_instance_count
instance_hrid = f"{hrid_settings['instances']['prefix']}{number}"
hrid_settings["instances"]["startNumber"] = new_instance_count

# Puts new instance startNumber back into FOLIO
hrid_put_result = requests.put(
f"{folio_client.okapi_url}{endpoint}", headers=folio_client.okapi_headers
)
hrid_put_result.raise_for_status()

return instance_hrid


def _language(**kwargs) -> tuple:
values = kwargs["values"]

Expand Down Expand Up @@ -266,7 +289,7 @@ def _inventory_record(**kwargs) -> dict:

record = {
"id": _folio_id(instance_uri, okapi_url),
"hrid": instance_uri,
"hrid": _folio_hrid(folio_client),
"metadata": _create_update_metadata(**kwargs),
"source": "SINOPIA",
}
Expand Down
38 changes: 29 additions & 9 deletions ils_middleware/tasks/sinopia/metadata_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import rdflib
import requests # type: ignore

from typing import Optional
from typing import Optional, Union

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -33,6 +33,16 @@ def _query_for_ils_info(graph_jsonld: str, uri: str) -> dict:
return output


def _get_relationships(resource_uri: str) -> Union[list, None]:
result = requests.get(f"{resource_uri}/relationships")
if result.status_code > 399:
msg = f"Failed to retrieve {resource_uri}: {result.status_code}\n{result.text}"
logging.error(msg)
return None

return result.json().get("sinopiaHasLocalAdminMetadataInferredRefs")


def _get_retrieve_metadata_resource(uri: str) -> Optional[dict]:
"""Retrieves AdminMetadata resource and extracts any ILS identifiers"""
metadata_result = requests.get(uri)
Expand Down Expand Up @@ -63,16 +73,24 @@ def _retrieve_all_metadata(bf_admin_metadata_all: list) -> Optional[list]:
return None


def _retrieve_all_resource_refs(resources: list) -> dict:
def _retrieve_all_resource_refs(
resources: list, task_instance, default_ils: str
) -> dict:
retrieved_resources = {}
for resource_uri in resources:
result = requests.get(f"{resource_uri}/relationships")
if result.status_code > 399:
msg = f"Failed to retrieve {resource_uri}: {result.status_code}\n{result.text}"
logging.error(msg)
# Check to see target_resource_id is present from SQS message
resource_info = task_instance.xcom_pull(
key=resource_uri, task_ids="sqs-message-parse"
)
target_resource_id = resource_info["target_resource_id"]
if target_resource_id:
retrieved_resources[resource_uri] = [{default_ils: target_resource_id}]
continue

metadata_uris = _get_relationships(resource_uri)
if metadata_uris is None:
continue

metadata_uris = result.json().get("sinopiaHasLocalAdminMetadataInferredRefs")
ils_info = _retrieve_all_metadata(metadata_uris)
if ils_info:
retrieved_resources[resource_uri] = ils_info
Expand All @@ -83,11 +101,13 @@ def _retrieve_all_resource_refs(resources: list) -> dict:
def existing_metadata_check(*args, **kwargs):
"""Queries Sinopia API for related resources of an instance."""
task_instance = kwargs["task_instance"]
default_ils = kwargs.get("default_ils")
resource_uris = task_instance.xcom_pull(
key="resources", task_ids="sqs-message-parse"
)

resource_refs = _retrieve_all_resource_refs(resource_uris)
resource_refs = _retrieve_all_resource_refs(
resource_uris, task_instance, default_ils
)
new_resources = []
overlay_resources = []
for resource_uri in resource_uris:
Expand Down
21 changes: 17 additions & 4 deletions tests/tasks/folio/test_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@ def __init__(self, *args):
{"id": "6a2533a7-4de2-4e64-8466-074c2fa9308c", "name": "General note"},
]

self.okapi_headers = {"tenant": "sul"}
self.okapi_url = "https://okapi.edu"

def folio_get_single_object(self, endpoint):
match endpoint:

case "/hrid-settings-storage/hrid-settings":
return {
"instances": {"startNumber": 1, "prefix": "i"},
"commonRetainLeadingZeroes": True,
}


@pytest.fixture
def mock_folio_client(monkeypatch):
Expand All @@ -94,7 +106,7 @@ def test_happypath_build_records(
)
record = test_task_instance().xcom_pull(key=instance_uri)

assert record["hrid"].startswith(instance_uri)
assert record["hrid"] == "i00000000002"
assert record["metadata"]["createdByUserId"].startswith(
"21eaac74-1b29-5546-a13b-bc2e7e4717c6"
)
Expand Down Expand Up @@ -164,7 +176,7 @@ def test_instance_format_ids(mock_task_instance): # noqa: F811
assert (format_ids[1][0]).startswith("8d511d33-5e85-4c5d-9bce-6e3c9cd0c324")


def test_inventory_record(mock_task_instance): # noqa: F811
def test_inventory_record(mock_task_instance, mock_requests_okapi): # noqa: F811
record = _inventory_record(
instance_uri=instance_uri,
task_instance=test_task_instance(),
Expand All @@ -174,11 +186,12 @@ def test_inventory_record(mock_task_instance): # noqa: F811
username="test_user",
tenant="sul",
)
assert record["hrid"].startswith(instance_uri)
assert record["hrid"].startswith("i00000000002")


def test_inventory_record_existing_metadata(
mock_task_instance, # noqa: F811
mock_requests_okapi, # noqa
): # noqa: F811
metadata = {
"createdDate": "2021-12-06T15:45:28.140795",
Expand All @@ -193,7 +206,7 @@ def test_inventory_record_existing_metadata(
folio_client=MockFolioClient(),
metadata=metadata,
)
assert record["hrid"].startswith(instance_uri)
assert record["hrid"].startswith("i00000000002")
assert record["metadata"]["createdDate"].startswith("2021-12-06T15:45:28.140795")


Expand Down
Loading