diff --git a/ils_middleware/dags/stanford.py b/ils_middleware/dags/stanford_folio.py similarity index 62% rename from ils_middleware/dags/stanford.py rename to ils_middleware/dags/stanford_folio.py index 05e2390..e6dedd8 100644 --- a/ils_middleware/dags/stanford.py +++ b/ils_middleware/dags/stanford_folio.py @@ -6,7 +6,6 @@ from airflow.operators.python import PythonOperator from airflow.utils.task_group import TaskGroup -from ils_middleware.tasks.amazon.s3 import get_from_s3, send_to_s3 from ils_middleware.tasks.amazon.sqs import SubscribeOperator, parse_messages from ils_middleware.tasks.sinopia.local_metadata import new_local_admin_metadata @@ -14,13 +13,10 @@ notify_and_log, send_notification_emails, ) + from ils_middleware.tasks.sinopia.login import sinopia_login -from ils_middleware.tasks.sinopia.metadata_check import existing_metadata_check -from ils_middleware.tasks.sinopia.rdf2marc import Rdf2Marc -from ils_middleware.tasks.symphony.login import SymphonyLogin -from ils_middleware.tasks.symphony.new import NewMARCtoSymphony -from ils_middleware.tasks.symphony.mod_json import to_symphony_json -from ils_middleware.tasks.symphony.overlay import overlay_marc_in_symphony + +# from ils_middleware.tasks.sinopia.metadata_check import existing_metadata_check from ils_middleware.tasks.folio.build import build_records from ils_middleware.tasks.folio.login import FolioLogin from ils_middleware.tasks.folio.graph import construct_graph @@ -50,12 +46,12 @@ def dag_failure_callback(ctx_dict) -> None: } with DAG( - "stanford", + "stanford_folio", default_args=default_args, - description="Stanford Symphony and FOLIO DAG", + description="Stanford FOLIO DAG", schedule_interval=timedelta(minutes=5), start_date=datetime(2021, 8, 24), - tags=["symphony", "folio"], + tags=["folio", "stanford"], catchup=False, on_failure_callback=dag_failure_callback, ) as dag: @@ -64,100 +60,13 @@ def dag_failure_callback(ctx_dict) -> None: # deletes the message from the SQS queue. If deletion of messages fails an AirflowException is thrown otherwise, the # message is pushed through XCom with the key 'messages'." # https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_api/airflow/providers/amazon/aws/sensors/sqs/index.html - listen_sns = SubscribeOperator(queue="stanford-ils") + listen_sns = SubscribeOperator(queue="stanford-FOLIO") process_message = PythonOperator( task_id="sqs-message-parse", python_callable=parse_messages, ) - with TaskGroup(group_id="process_symphony") as symphony_task_group: - run_rdf2marc = PythonOperator( - task_id="rdf2marc", - python_callable=Rdf2Marc, - op_kwargs={ - "rdf2marc_lambda": Variable.get("rdf2marc_lambda"), - "s3_bucket": Variable.get("marc_s3_bucket"), - }, - ) - - download_marc = PythonOperator( - task_id="download_marc", - python_callable=get_from_s3, - ) - - export_marc_json = PythonOperator( - task_id="marc_json_to_s3", - python_callable=send_to_s3, - ) - - convert_to_symphony_json = PythonOperator( - task_id="convert_to_symphony_json", - python_callable=to_symphony_json, - ) - - # Symphony Dev Server Settings - library_key = "GREEN" - home_location = "STACKS" - symphony_app_id = Variable.get("symphony_app_id") - symphony_client_id = "SymWSStaffClient" - symphony_conn_id = "stanford_symphony_connection" - # This could be mapped from the Instance RDF template - symphony_item_type = "STKS-MONO" - - symphony_login = PythonOperator( - task_id="symphony-login", - python_callable=SymphonyLogin, - op_kwargs={ - "app_id": symphony_app_id, - "client_id": symphony_client_id, - "conn_id": symphony_conn_id, - "url": Variable.get("stanford_symphony_auth_url"), - "login": Variable.get("stanford_symphony_login"), - "password": Variable.get("stanford_symphony_password"), - }, - ) - - new_or_overlay = PythonOperator( - task_id="new-or-overlay", - python_callable=existing_metadata_check, - ) - - symphony_add_record = PythonOperator( - task_id="post_new_symphony", - python_callable=NewMARCtoSymphony, - op_kwargs={ - "app_id": symphony_app_id, - "client_id": symphony_client_id, - "conn_id": symphony_conn_id, - "home_location": home_location, - "item_type": symphony_item_type, - "library_key": library_key, - "token": "{{ task_instance.xcom_pull(key='return_value', task_ids='process_symphony.symphony-login')}}", - }, - ) - - symphony_overlay_record = PythonOperator( - task_id="post_overlay_symphony", - python_callable=overlay_marc_in_symphony, - op_kwargs={ - "app_id": symphony_app_id, - "client_id": symphony_client_id, - "conn_id": symphony_conn_id, - "token": "{{ task_instance.xcom_pull(key='return_value', task_ids='process_symphony.symphony-login')}}", - }, - ) - - ( - run_rdf2marc - >> download_marc - >> export_marc_json - >> convert_to_symphony_json - >> symphony_login - >> new_or_overlay - >> [symphony_add_record, symphony_overlay_record] - ) - with TaskGroup(group_id="process_folio") as folio_task_group: folio_login = PythonOperator( task_id="folio-login", @@ -265,7 +174,7 @@ def dag_failure_callback(ctx_dict) -> None: listen_sns >> [messages_received, messages_timeout] messages_received >> process_message -process_message >> [symphony_task_group, folio_task_group] >> processed_sinopia +process_message >> folio_task_group >> processed_sinopia processed_sinopia >> sinopia_update_group >> notify_sinopia_updated notify_sinopia_updated >> processing_complete messages_timeout >> processing_complete diff --git a/ils_middleware/tasks/amazon/sqs.py b/ils_middleware/tasks/amazon/sqs.py index fb2e8cb..d32874a 100644 --- a/ils_middleware/tasks/amazon/sqs.py +++ b/ils_middleware/tasks/amazon/sqs.py @@ -38,7 +38,6 @@ def parse_messages(**kwargs) -> str: """Parses SQS Message Body into constituent part.""" task_instance = kwargs["task_instance"] raw_sqs_messages = task_instance.xcom_pull(key="messages", task_ids="sqs-sensor") - resources = [] resources_with_errors = [] for message in raw_sqs_messages: @@ -54,6 +53,7 @@ def parse_messages(**kwargs) -> str: "target": message_body["target"], "resource_uri": resource_uri, "resource": get_resource(resource_uri), + "target_resource_id": message_body.get("targetResourceId"), }, ) except KeyError: diff --git a/ils_middleware/tasks/folio/build.py b/ils_middleware/tasks/folio/build.py index 6576f99..39f92df 100644 --- a/ils_middleware/tasks/folio/build.py +++ b/ils_middleware/tasks/folio/build.py @@ -4,6 +4,8 @@ import datetime import logging +import requests + from folio_uuid import FOLIONamespaces, FolioUUID from folioclient.FolioClient import FolioClient @@ -16,7 +18,6 @@ def _default_transform(**kwargs) -> tuple: folio_field = kwargs["folio_field"] values = kwargs.get("values", []) - logger.debug(f"field: {folio_field} values: {values} type: {type(values)}") return folio_field, values @@ -118,6 +119,28 @@ def _instance_type_id(**kwargs) -> tuple: return "instanceTypeId", ident +def _folio_hrid(folio_client: FolioClient) -> str: + """Queries for instance hrid, increments, and saves back to folio""" + endpoint = "/hrid-settings-storage/hrid-settings" + hrid_settings = folio_client.folio_get_single_object(endpoint) + instance_count = hrid_settings["instances"]["startNumber"] + new_instance_count = instance_count + 1 + if hrid_settings["commonRetainLeadingZeroes"]: + number = str(new_instance_count).zfill(11) + else: + number = new_instance_count + instance_hrid = f"{hrid_settings['instances']['prefix']}{number}" + hrid_settings["instances"]["startNumber"] = new_instance_count + + # Puts new instance startNumber back into FOLIO + hrid_put_result = requests.put( + f"{folio_client.okapi_url}{endpoint}", headers=folio_client.okapi_headers + ) + hrid_put_result.raise_for_status() + + return instance_hrid + + def _language(**kwargs) -> tuple: values = kwargs["values"] @@ -266,7 +289,7 @@ def _inventory_record(**kwargs) -> dict: record = { "id": _folio_id(instance_uri, okapi_url), - "hrid": instance_uri, + "hrid": _folio_hrid(folio_client), "metadata": _create_update_metadata(**kwargs), "source": "SINOPIA", } diff --git a/ils_middleware/tasks/sinopia/metadata_check.py b/ils_middleware/tasks/sinopia/metadata_check.py index b389896..1bad4e5 100644 --- a/ils_middleware/tasks/sinopia/metadata_check.py +++ b/ils_middleware/tasks/sinopia/metadata_check.py @@ -5,7 +5,7 @@ import rdflib import requests # type: ignore -from typing import Optional +from typing import Optional, Union logger = logging.getLogger(__name__) @@ -33,6 +33,16 @@ def _query_for_ils_info(graph_jsonld: str, uri: str) -> dict: return output +def _get_relationships(resource_uri: str) -> Union[list, None]: + result = requests.get(f"{resource_uri}/relationships") + if result.status_code > 399: + msg = f"Failed to retrieve {resource_uri}: {result.status_code}\n{result.text}" + logging.error(msg) + return None + + return result.json().get("sinopiaHasLocalAdminMetadataInferredRefs") + + def _get_retrieve_metadata_resource(uri: str) -> Optional[dict]: """Retrieves AdminMetadata resource and extracts any ILS identifiers""" metadata_result = requests.get(uri) @@ -63,16 +73,24 @@ def _retrieve_all_metadata(bf_admin_metadata_all: list) -> Optional[list]: return None -def _retrieve_all_resource_refs(resources: list) -> dict: +def _retrieve_all_resource_refs( + resources: list, task_instance, default_ils: str +) -> dict: retrieved_resources = {} for resource_uri in resources: - result = requests.get(f"{resource_uri}/relationships") - if result.status_code > 399: - msg = f"Failed to retrieve {resource_uri}: {result.status_code}\n{result.text}" - logging.error(msg) + # Check to see target_resource_id is present from SQS message + resource_info = task_instance.xcom_pull( + key=resource_uri, task_ids="sqs-message-parse" + ) + target_resource_id = resource_info["target_resource_id"] + if target_resource_id: + retrieved_resources[resource_uri] = [{default_ils: target_resource_id}] + continue + + metadata_uris = _get_relationships(resource_uri) + if metadata_uris is None: continue - metadata_uris = result.json().get("sinopiaHasLocalAdminMetadataInferredRefs") ils_info = _retrieve_all_metadata(metadata_uris) if ils_info: retrieved_resources[resource_uri] = ils_info @@ -83,11 +101,13 @@ def _retrieve_all_resource_refs(resources: list) -> dict: def existing_metadata_check(*args, **kwargs): """Queries Sinopia API for related resources of an instance.""" task_instance = kwargs["task_instance"] + default_ils = kwargs.get("default_ils") resource_uris = task_instance.xcom_pull( key="resources", task_ids="sqs-message-parse" ) - - resource_refs = _retrieve_all_resource_refs(resource_uris) + resource_refs = _retrieve_all_resource_refs( + resource_uris, task_instance, default_ils + ) new_resources = [] overlay_resources = [] for resource_uri in resource_uris: diff --git a/tests/tasks/folio/test_build.py b/tests/tasks/folio/test_build.py index ee90939..890db05 100644 --- a/tests/tasks/folio/test_build.py +++ b/tests/tasks/folio/test_build.py @@ -72,6 +72,18 @@ def __init__(self, *args): {"id": "6a2533a7-4de2-4e64-8466-074c2fa9308c", "name": "General note"}, ] + self.okapi_headers = {"tenant": "sul"} + self.okapi_url = "https://okapi.edu" + + def folio_get_single_object(self, endpoint): + match endpoint: + + case "/hrid-settings-storage/hrid-settings": + return { + "instances": {"startNumber": 1, "prefix": "i"}, + "commonRetainLeadingZeroes": True, + } + @pytest.fixture def mock_folio_client(monkeypatch): @@ -94,7 +106,7 @@ def test_happypath_build_records( ) record = test_task_instance().xcom_pull(key=instance_uri) - assert record["hrid"].startswith(instance_uri) + assert record["hrid"] == "i00000000002" assert record["metadata"]["createdByUserId"].startswith( "21eaac74-1b29-5546-a13b-bc2e7e4717c6" ) @@ -164,7 +176,7 @@ def test_instance_format_ids(mock_task_instance): # noqa: F811 assert (format_ids[1][0]).startswith("8d511d33-5e85-4c5d-9bce-6e3c9cd0c324") -def test_inventory_record(mock_task_instance): # noqa: F811 +def test_inventory_record(mock_task_instance, mock_requests_okapi): # noqa: F811 record = _inventory_record( instance_uri=instance_uri, task_instance=test_task_instance(), @@ -174,11 +186,12 @@ def test_inventory_record(mock_task_instance): # noqa: F811 username="test_user", tenant="sul", ) - assert record["hrid"].startswith(instance_uri) + assert record["hrid"].startswith("i00000000002") def test_inventory_record_existing_metadata( mock_task_instance, # noqa: F811 + mock_requests_okapi, # noqa ): # noqa: F811 metadata = { "createdDate": "2021-12-06T15:45:28.140795", @@ -193,7 +206,7 @@ def test_inventory_record_existing_metadata( folio_client=MockFolioClient(), metadata=metadata, ) - assert record["hrid"].startswith(instance_uri) + assert record["hrid"].startswith("i00000000002") assert record["metadata"]["createdDate"].startswith("2021-12-06T15:45:28.140795") diff --git a/tests/tasks/sinopia/test_metadata_check.py b/tests/tasks/sinopia/test_metadata_check.py index 66e1a7d..5cc14b4 100644 --- a/tests/tasks/sinopia/test_metadata_check.py +++ b/tests/tasks/sinopia/test_metadata_check.py @@ -15,7 +15,9 @@ from ils_middleware.tasks.sinopia.metadata_check import ( existing_metadata_check, + _retrieve_all_resource_refs, _get_retrieve_metadata_resource, + _get_relationships, _retrieve_all_metadata, ) @@ -118,7 +120,9 @@ def mock_xcom_pull(*args, **kwargs): "https://api.sinopia.io/resource/oprt5531", ] else: - return mock_push_store[key] + if key in mock_push_store: + return mock_push_store[key] + return {"target_resource_id": None} def mock_xcom_push(*args, **kwargs): key = kwargs.get("key") @@ -182,3 +186,26 @@ def test_dups_in_retrieve_all_metadata(mock_requests): ) assert result == [{"SIRSI": "13704749"}] + + +def test_get_relationships_no_resource(mock_requests, caplog): + missing_result = _get_relationships("https://sinopia.io/resource/no-resource") + assert missing_result is None + assert ( + "Failed to retrieve https://sinopia.io/resource/no-resource: 401" in caplog.text + ) + + +def test_retrieve_all_resource_refs_target_resource(): + mock_task_instance = MagicMock() + mock_task_instance.xcom_pull = lambda **kwargs: { + "target_resource_id": "https://sinopia.io/resource/34556-abcde" + } + retrieved_resources = _retrieve_all_resource_refs( + ["https://sinopia.io/resource/abcdefa-12345"], mock_task_instance, "folio" + ) + assert retrieved_resources == { + "https://sinopia.io/resource/abcdefa-12345": [ + {"folio": "https://sinopia.io/resource/34556-abcde"} + ] + }