From e0f78c009c11e77ef44a77cbe2fa64aaa0164730 Mon Sep 17 00:00:00 2001 From: Gerald Manipon Date: Thu, 19 Sep 2024 12:08:35 -0700 Subject: [PATCH] add action to submit HySDS job for NISAR ECMWF task (#5) * initial implementation of HySDS submit job action * fix schema * finalize implementation; add unit and regression tests --- .../actions/submit_hysds_job.py | 56 +++++++++++++++++++ .../resources/routers_schema.yaml | 13 ++++- tests/resources/test_router.yaml | 14 +++-- tests/test_lambda.py | 40 +++++++++++++ tests/test_router.py | 32 +++++++++++ 5 files changed, 148 insertions(+), 7 deletions(-) create mode 100644 src/unity_initiator/actions/submit_hysds_job.py diff --git a/src/unity_initiator/actions/submit_hysds_job.py b/src/unity_initiator/actions/submit_hysds_job.py new file mode 100644 index 0000000..874b8ae --- /dev/null +++ b/src/unity_initiator/actions/submit_hysds_job.py @@ -0,0 +1,56 @@ +import json +import uuid + +import httpx + +from ..utils.logger import logger +from .base import Action + +__all__ = ["SubmitHysdsJob"] + + +class SubmitHysdsJob(Action): + def __init__(self, payload, payload_info, params): + super().__init__(payload, payload_info, params) + logger.info("instantiated %s", __class__.__name__) + + def execute(self): + """Submit job to mozart via REST API.""" + + # build job params + job_params = { + "payload": self._payload, + "payload_info": self._payload_info, + "on_success": self._params["on_success"], + } + + # setup url and request body + url = self._params["mozart_base_api_endpoint"] + body = { + "queue": self._params["queue"], + "priority": self._params.get("priority", 0), + "tags": json.dumps(self._params.get("tags", [])), + "type": self._params["job_spec"], + "params": json.dumps(job_params), + "name": f"{self._params['job_spec'].split(':')[0]}-{str(uuid.uuid4())}", + } + + # submit job + logger.info("job URL: %s", url) + logger.info("job params: %s", json.dumps(body, indent=2, sort_keys=True)) + response = httpx.post(url, data=body, verify=False) # nosec + if response.status_code in (200, 201): + success = True + resp = response.json() + logger.info( + "Successfully submitted HySDS job %s: %s", + self._params["job_spec"], + resp, + ) + else: + success = False + resp = response.text + logger.info( + "Failed to submit HySDS job %s: %s", self._params["job_spec"], resp + ) + return {"success": success, "response": resp} diff --git a/src/unity_initiator/resources/routers_schema.yaml b/src/unity_initiator/resources/routers_schema.yaml index fdbabe9..69faea6 100644 --- a/src/unity_initiator/resources/routers_schema.yaml +++ b/src/unity_initiator/resources/routers_schema.yaml @@ -36,7 +36,7 @@ evaluator_config: # Currently only 2 types of actions are supported: # 1. submit payload to an SNS topic # 2. submit payload to an airflow DAG -action_config: any(include("submit_dag_by_id_action"), include("submit_to_sns_topic_action"), include("submit_ogc_process_execution_action")) +action_config: any(include("submit_dag_by_id_action"), include("submit_to_sns_topic_action"), include("submit_ogc_process_execution_action"), include("submit_hysds_job_action")) # Configuration for submitting a payload to an airflow DAG. submit_dag_by_id_action: @@ -69,6 +69,17 @@ submit_ogc_process_execution_action: execution_subscriber: map(required=False) on_success: include("on_success_actions", required=False) +# Configuration for submitting a HySDS job +submit_hysds_job_action: + name: str(equals="submit_hysds_job") + params: + mozart_base_api_endpoint: str(required=True) + job_spec: str(required=True) + queue: str(required=True) + priority: int(min=0, max=10, required=False) + tags: list(str(), required=False) + on_success: include("on_success_actions", required=False) + # Configuration to pass onto the evaluator to use when evaluation is a success. on_success_actions: actions: list(include("action_config"), required=True, min=1, max=1) diff --git a/tests/resources/test_router.yaml b/tests/resources/test_router.yaml index f7212d1..e1e09df 100644 --- a/tests/resources/test_router.yaml +++ b/tests/resources/test_router.yaml @@ -148,13 +148,15 @@ initiator_config: topic_arn: arn:aws:sns:hilo-hawaii-1:123456789012:eval_airs_ingest on_success: actions: - - name: submit_dag_by_id + - name: submit_hysds_job params: - dag_id: submit_airs_ingest - airflow_base_api_endpoint: xxx - airflow_username: - airflow_password: - + mozart_base_api_endpoint: https://example.com/api/v0.1/job/submit + job_spec: submit_airs_ingest:v1 + queue: ingest_queue + priority: 0 + tags: + - airs + - hysds - regexes: - '(?<=/)(?Phello_world\.txt)$' diff --git a/tests/test_lambda.py b/tests/test_lambda.py index c3168e5..27cc9fd 100644 --- a/tests/test_lambda.py +++ b/tests/test_lambda.py @@ -59,6 +59,9 @@ def setup_mock_resources(): sns_client.create_topic( Name="eval_nisar_ingest", Attributes={"TracingConfig": "Active"} ) + sns_client.create_topic( + Name="eval_airs_ingest", Attributes={"TracingConfig": "Active"} + ) # mock airflow REST API respx.post("https://example.com/api/v1/dags/eval_nisar_l0a_readiness/dagRuns").mock( @@ -83,6 +86,19 @@ def setup_mock_resources(): ) ) + # mock mozart REST API + respx.post("https://example.com/api/v0.1/job/submit").mock( + return_value=Response( + 200, + json={ + "success": True, + "message": "", + "result": "fda11fad-35f0-466e-a785-4678a0e662de", + "tags": ["airs", "hysds"], + }, + ) + ) + @respx.mock @mock_aws @@ -257,6 +273,21 @@ def test_invoke_function_all_test_cases(self): for res in results: assert res["success"] + # AIRS RetStd use case + in_data = { + "payload": "s3://bucket/prefix/AIRS.2009.06.13.001.L2.RetStd.v6.0.7.0.G13077043030.hdf" + } + invoke_res = self.client.invoke( + FunctionName=self.function_name, + InvocationType="Event", + Payload=json.dumps(in_data), + ) + logger.info("invoke_res: %s", invoke_res) + results = json.loads(invoke_res["Payload"].read().decode("utf-8")) + logger.info("results: %s", results) + for res in results: + assert res["success"] + def test_invoke_function_unrecognized(self): """Test invocations of the router lambda using an unrecognized url.""" @@ -450,3 +481,12 @@ def test_initiator_nisar_ldf(self): self.bucket_name, "prefix/NISAR_S198_PA_PA11_M00_P00922_R00_C01_G00_2024_010_18_03_05_087077000.ldf", ) + + def test_initiator_airs_retstd(self): + """Test invocations of the initiator lambda via S3 event using AIRS RetStd test case: submit_hysds_job""" + + # Upload file to trigger notification + assert self.invoke_initiator_via_s3_event( + self.bucket_name, + "prefix/AIRS.2009.06.13.001.L2.RetStd.v6.0.7.0.G13077043030.hdf", + ) diff --git a/tests/test_router.py b/tests/test_router.py index bcd9728..097bc25 100644 --- a/tests/test_router.py +++ b/tests/test_router.py @@ -178,6 +178,38 @@ def test_execute_actions_for_nisar_ldf_url(): assert res["success"] +@respx.mock +@mock_aws +def test_execute_actions_for_airs_retstd_url(): + """Test routing a url payload and executing actions: AIRS RetStd example""" + + # mock mozart REST API + respx.post("https://example.com/api/v0.1/job/submit").mock( + return_value=Response( + 200, + json={ + "success": True, + "message": "", + "result": "fda11fad-35f0-466e-a785-4678a0e662de", + "tags": ["airs", "hysds"], + }, + ) + ) + + url = "s3://bucket/prefix/AIRS.2009.06.13.001.L2.RetStd.v6.0.7.0.G13077043030.hdf" + client = boto3.client("sns") + router_file = files("tests.resources").joinpath("test_router.yaml") + router = Router(router_file) + client.create_topic( + Name=list(router.get_evaluators_by_url(url))[0].name, + Attributes={"TracingConfig": "Active"}, + ) + results = router.execute_actions(url) + logger.info("results: %s", results) + for res in results: + assert res["success"] + + @mock_aws def test_unrecognized_url(): """Test routing a url payload that is unrecognized: NISAR L0B example"""