Skip to content

Commit

Permalink
Merge pull request #233 from LD4P/refactor-xcom
Browse files Browse the repository at this point in the history
Refactor XCOM to use Sinopia UUIDs instead of URI
  • Loading branch information
jermnelson authored Nov 27, 2024
2 parents d78bf20 + 5fc464e commit 9afa452
Show file tree
Hide file tree
Showing 31 changed files with 98 additions and 106 deletions.
1 change: 1 addition & 0 deletions ils_middleware/dags/cornell.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def dag_failure_callback(ctx_dict) -> None:
description="Cornell FOLIO DAG",
start_date=datetime(2021, 8, 14),
tags=["folio"],
schedule=None,
catchup=False,
) as dag:

Expand Down
1 change: 1 addition & 0 deletions ils_middleware/dags/stanford.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def dag_failure_callback(ctx_dict) -> None:
start_date=datetime(2021, 8, 24),
tags=["symphony", "folio"],
catchup=False,
schedule=None,
on_failure_callback=dag_failure_callback,
) as dag:

Expand Down
6 changes: 4 additions & 2 deletions ils_middleware/tasks/alma/post_bfinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ def NewInstancetoAlma(**kwargs):
if status == 200:
xml_response = ET.fromstring(result)
mms_id = xml_response.xpath("//mms_id/text()")
task_instance.xcom_push(key=instance_uri, value=mms_id)
instance_uuid = instance_uri.split("/")[-1]
task_instance.xcom_push(key=instance_uuid, value=mms_id)
elif status == 400:
# run xslt on the result in case the response is 400 and we need to update the record
put_mms_id_str = parse_400(result)
Expand Down Expand Up @@ -110,7 +111,8 @@ def putInstanceToAlma(
put_update_status = put_update.status_code
match put_update_status:
case 200:
task_instance.xcom_push(key=instance_uri, value=put_mms_id_str)
instance_uuid = instance_uri.split("/")[-1]
task_instance.xcom_push(key=instance_uuid, value=put_mms_id_str)
case 500:
raise Exception(f"Internal server error from Alma API: {put_update_status}")
case _:
Expand Down
6 changes: 4 additions & 2 deletions ils_middleware/tasks/alma/post_bfwork.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ def NewWorktoAlma(**kwargs):
if status == 200:
xml_response = ET.fromstring(result)
mms_id = xml_response.xpath("//mms_id/text()")
task_instance.xcom_push(key=instance_uri, value=mms_id)
instance_uuid = instance_uri.split("/")[-1]
task_instance.xcom_push(key=instance_uuid, value=mms_id)
elif status == 400:
# run xslt on the result in case the response is 400 and we need to update the record
put_mms_id_str = parse_400(result)
Expand Down Expand Up @@ -111,7 +112,8 @@ def putWorkToAlma(
put_update_status = put_update.status_code
match put_update_status:
case 200:
task_instance.xcom_push(key=instance_uri, value=put_mms_id_str)
instance_uuid = instance_uri.split("/")[-1]
task_instance.xcom_push(key=instance_uuid, value=put_mms_id_str)
case 500:
raise Exception(f"Internal server error from Alma API: {put_update_status}")
case _:
Expand Down
3 changes: 2 additions & 1 deletion ils_middleware/tasks/amazon/alma_instance_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ def send_instance_to_alma_s3(**kwargs):
replace=True,
)
# save to xcom
instance_uuid = instance_uri.split("/")[-1]
task_instance.xcom_push(
key=instance_uri, value=instance_alma_xml.decode("utf-8")
key=instance_uuid, value=instance_alma_xml.decode("utf-8")
)
logger.info(f"Saved BFInstance description for {instance_uri} to alma.")
logger.info(f"bf_instance_alma_xml: {instance_alma_xml.decode('utf-8')}")
3 changes: 2 additions & 1 deletion ils_middleware/tasks/amazon/alma_work_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ def load_to_s3(s3_hook, bfwork_alma_xml, instance_uri):


def push_to_xcom(task_instance, instance_uri, bfwork_alma_xml):
task_instance.xcom_push(key=instance_uri, value=bfwork_alma_xml.decode("utf-8"))
instance_uuid = instance_uri.split("/")[-1]
task_instance.xcom_push(key=instance_uuid, value=bfwork_alma_xml.decode("utf-8"))


def send_work_to_alma_s3(**kwargs):
Expand Down
7 changes: 4 additions & 3 deletions ils_middleware/tasks/amazon/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ def get_from_s3(**kwargs):
key=f"marc/airflow/{instance_id}/record.mar",
bucket_name=Variable.get("marc_s3_bucket"),
)
task_instance.xcom_push(key=instance_uri, value=temp_file)
instance_uuid = instance_uri.split("/")[-1]
task_instance.xcom_push(key=instance_uuid, value=temp_file)


def send_to_s3(**kwargs):
Expand All @@ -45,7 +46,7 @@ def send_to_s3(**kwargs):
instance_path = urlparse(instance_uri).path
instance_id = path.split(instance_path)[-1]
temp_file = task_instance.xcom_pull(
key=instance_uri, task_ids="process_symphony.download_marc"
key=instance_id, task_ids="process_symphony.download_marc"
)
marc_record = marc_record_from_temp_file(instance_id, temp_file)
s3_hook.load_string(
Expand All @@ -54,7 +55,7 @@ def send_to_s3(**kwargs):
Variable.get("marc_s3_bucket"),
replace=True,
)
task_instance.xcom_push(key=instance_uri, value=marc_record.as_json())
task_instance.xcom_push(key=instance_id, value=marc_record.as_json())


def marc_record_from_temp_file(instance_id, temp_file):
Expand Down
3 changes: 2 additions & 1 deletion ils_middleware/tasks/amazon/sqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ def parse_messages(**kwargs) -> str:
resources_with_errors = []

resource_uri = message["resource"]["uri"]
resource_uuid = resource_uri.split("/")[-1]
try:
task_instance.xcom_push(
key=resource_uri,
key=resource_uuid,
value={
"email": message["user"]["email"],
"group": message["group"],
Expand Down
7 changes: 5 additions & 2 deletions ils_middleware/tasks/folio/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,10 +294,12 @@ def _inventory_record(**kwargs) -> dict:
"source": "SINOPIA",
"electronicAccess": [_electronic_access(**kwargs)],
}
instance_uuid = instance_uri.split("/")[-1]

for folio_field in FOLIO_FIELDS:
post_processing = transforms.get(folio_field, _default_transform)
task_id = _task_ids(task_groups, folio_field)
raw_values = task_instance.xcom_pull(key=instance_uri, task_ids=task_id)
raw_values = task_instance.xcom_pull(key=instance_uuid, task_ids=task_id)
if raw_values:
record_field, values = post_processing(
values=raw_values,
Expand Down Expand Up @@ -335,5 +337,6 @@ def build_records(**kwargs):
folio_client=folio_client,
**kwargs,
)
task_instance.xcom_push(key=resource_uri, value=inventory_rec)
resource_uuid = resource_uri.split("/")[-1]
task_instance.xcom_push(key=resource_uuid, value=inventory_rec)
return "build-complete"
5 changes: 3 additions & 2 deletions ils_middleware/tasks/folio/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ def construct_graph(**kwargs):
resources = task_instance.xcom_pull(key="resources", task_ids="sqs-message-parse")

for instance_uri in resources:
instance_uuid = instance_uri.split("/")[-1]
resource = task_instance.xcom_pull(
key=instance_uri, task_ids="sqs-message-parse"
key=instance_uuid, task_ids="sqs-message-parse"
).get("resource")
work_refs = resource.get("bfWorkRefs")
if len(work_refs) < 1:
Expand All @@ -41,7 +42,7 @@ def construct_graph(**kwargs):
] # Grabs the first Work URI, may need a way to specify a work
graph = _build_graph(resource.get("data"), work_uri)
task_instance.xcom_push(
key=instance_uri,
key=instance_uuid,
value={"graph": graph.serialize(format="json-ld"), "work_uri": work_uri},
)
return "constructed_graphs"
9 changes: 6 additions & 3 deletions ils_middleware/tasks/folio/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ def _build_and_query_graph(**kwargs) -> list:

query_params = {}
task_id = _task_id(task_groups)
instance_uuid = instance_uri.split("/")[-1]

if uri_type.startswith("bf_work"):
query_params[uri_type] = task_instance.xcom_pull(
key=instance_uri, task_ids=task_id
key=instance_uuid, task_ids=task_id
).get("work_uri")
else:
query_params[uri_type] = instance_uri
Expand All @@ -97,7 +98,8 @@ def _build_and_query_graph(**kwargs) -> list:

query = template.format(**query_params)
graph = rdflib.Graph()
json_ld = task_instance.xcom_pull(key=instance_uri, task_ids=task_id).get("graph")

json_ld = task_instance.xcom_pull(key=instance_uuid, task_ids=task_id).get("graph")
graph.parse(data=json_ld, format="json-ld")
logging.info(f"SPARQL:\n{query}")
return [row for row in graph.query(query)]
Expand All @@ -119,5 +121,6 @@ def map_to_folio(**kwargs):
bf_class=bf_class,
**kwargs,
)
task_instance.xcom_push(key=instance_uri, value=values)
instance_uuid = instance_uri.split("/")[-1]
task_instance.xcom_push(key=instance_uuid, value=values)
return "mapping_complete"
9 changes: 5 additions & 4 deletions ils_middleware/tasks/folio/new.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ def _check_for_existance(records: list, folio_client: FolioClient) -> tuple:
def _push_to_xcom(records: list, task_instance):
for record in records:
logger.debug(record)
task_instance.xcom_push(
key=record["electronicAccess"][0]["uri"], value=record["id"]
)
uri = record["electronicAccess"][0]["uri"]
uuid = uri.split("/")[-1]
task_instance.xcom_push(key=uuid, value=record["id"])


def _post_to_okapi(**kwargs):
Expand Down Expand Up @@ -95,8 +95,9 @@ def post_folio_records(**kwargs):

inventory_records = []
for instance_uri in resources:
instance_uuid = instance_uri.split("/")[-1]
inventory_records.append(
task_instance.xcom_pull(key=instance_uri, task_ids=task_id)
task_instance.xcom_pull(key=instance_uuid, task_ids=task_id)
)

new_records, existing_records = _check_for_existance(
Expand Down
6 changes: 4 additions & 2 deletions ils_middleware/tasks/sinopia/email.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ def send_update_success_emails(**kwargs) -> None:

ses_hook = SesHook(aws_conn_id="aws_ses_connection")
for resource_uri in resources:
resource_uuid = resource_uri.split("/")[-1]
message = task_instance.xcom_pull(
key=resource_uri, task_ids="sqs-message-parse"
key=resource_uuid, task_ids="sqs-message-parse"
)
email_attributes = _email_on_success_attributes(message)
ses_hook.send_email(**email_attributes)
Expand All @@ -51,8 +52,9 @@ def send_task_failure_notifications(**kwargs) -> None:
key="conversion_failures", task_ids="process_symphony.rdf2marc"
)
for resource_uri in failed_resources or []:
resource_uuid = resource_uri.split("/")[-1]
message = task_instance.xcom_pull(
key=resource_uri, task_ids="sqs-message-parse"
key=resource_uuid, task_ids="sqs-message-parse"
)
email_attributes = _email_on_failure_attributes(message)
ses_hook.send_email(**email_attributes)
Expand Down
8 changes: 5 additions & 3 deletions ils_middleware/tasks/sinopia/local_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ def _add_local_system_id(


def _pull_identifiers(tasks, resource_uri, instance) -> typing.Optional[str]:
resource_uuid = resource_uri.split("/")[-1]
for task_id in tasks:
value = instance.xcom_pull(key=resource_uri, task_ids=task_id)
value = instance.xcom_pull(key=resource_uuid, task_ids=task_id)
if value:
return value
return None
Expand Down Expand Up @@ -99,8 +100,9 @@ def new_local_admin_metadata(*args, **kwargs):
sinopia_api_uri = Variable.get("sinopia_api_uri")

for resource_uri in resources:
resource_uuid = resource_uri.split("/")[-1]
message = task_instance.xcom_pull(
key=resource_uri, task_ids="sqs-message-parse"
key=resource_uuid, task_ids="sqs-message-parse"
)
resource = message.get("resource")
group = resource.get("group")
Expand Down Expand Up @@ -158,4 +160,4 @@ def new_local_admin_metadata(*args, **kwargs):
raise Exception(msg)

logger.debug(f"Results of new_admin_result {new_admin_result.text}")
task_instance.xcom_push(key=resource_uri, value=admin_metadata_uri)
task_instance.xcom_push(key=resource_uuid, value=admin_metadata_uri)
3 changes: 2 additions & 1 deletion ils_middleware/tasks/sinopia/rdf2marc.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ def Rdf2Marc(**kwargs):
msg = f"RDF2MARC conversion failed for {instance_uri}, error: {payload.get('errorMessage')}"
conversion_failures[instance_uri] = msg
elif result["StatusCode"] == 200:
task_instance.xcom_push(key=instance_uri, value=marc_path)
instance_uuid = instance_uri.split("/")[-1]
task_instance.xcom_push(key=instance_uuid, value=marc_path)
else:
msg = f"RDF2MARC conversion failed for {instance_uri}: {result['FunctionError']}"
conversion_failures[instance_uri] = msg
Expand Down
5 changes: 3 additions & 2 deletions ils_middleware/tasks/symphony/mod_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,14 @@ def to_symphony_json(**kwargs):
resources = task_instance.xcom_pull(key="resources", task_ids="sqs-message-parse")

for instance_uri in resources:
instance_uuid = instance_uri.split("/")[-1]
marc_raw_json = task_instance.xcom_pull(
key=instance_uri, task_ids="process_symphony.marc_json_to_s3"
key=instance_uuid, task_ids="process_symphony.marc_json_to_s3"
)
pymarc_json = json.loads(marc_raw_json)
record = {"standard": "MARC21", "type": "BIB", "fields": []}
record["leader"] = pymarc_json.get("leader")
for field in pymarc_json["fields"]:
record["fields"].append(_get_fields(field))

task_instance.xcom_push(key=instance_uri, value=record)
task_instance.xcom_push(key=instance_uuid, value=record)
7 changes: 4 additions & 3 deletions ils_middleware/tasks/symphony/new.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ def NewMARCtoSymphony(**kwargs):
)

for resource_uri in resources:
resource_uuid = resource_uri.split("/")[-1]
marc_json = task_instance.xcom_pull(
key=resource_uri, task_ids="process_symphony.convert_to_symphony_json"
key=resource_uuid, task_ids="process_symphony.convert_to_symphony_json"
)

payload = {
Expand Down Expand Up @@ -54,9 +55,9 @@ def NewMARCtoSymphony(**kwargs):
}
],
}

resource_uuid = resource_uri.split("/")[-1]
task_instance.xcom_push(
key=resource_uri,
key=resource_uuid,
value=SymphonyRequest(
**kwargs,
data=json.dumps(payload),
Expand Down
7 changes: 5 additions & 2 deletions ils_middleware/tasks/symphony/overlay.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ def overlay_marc_in_symphony(*args, **kwargs):
else:
catkey = resource["catkey"][0].get("SIRSI")

resource_uuid = resource_uri.split("/")[-1]
marc_json = task_instance.xcom_pull(
key=resource_uri, task_ids="process_symphony.convert_to_symphony_json"
key=resource_uuid, task_ids="process_symphony.convert_to_symphony_json"
)

payload = {
Expand All @@ -43,8 +44,10 @@ def overlay_marc_in_symphony(*args, **kwargs):
"bib": marc_json,
}

resource_uuid = resource_uri.split("/")[-1]

task_instance.xcom_push(
key=resource_uri,
key=resource_uuid,
value=SymphonyRequest(
**kwargs,
data=json.dumps(payload),
Expand Down
Loading

0 comments on commit 9afa452

Please sign in to comment.