From e807d06c1ee050f807f3010a088a18d661cce5e4 Mon Sep 17 00:00:00 2001 From: Gerald Manipon Date: Thu, 19 Sep 2024 10:40:10 -0700 Subject: [PATCH 1/3] initial implementation of HySDS submit job action --- .../actions/submit_hysds_job.py | 56 +++++++++++++++++++ .../resources/routers_schema.yaml | 11 ++++ tests/resources/test_router.yaml | 14 +++-- 3 files changed, 75 insertions(+), 6 deletions(-) create mode 100644 src/unity_initiator/actions/submit_hysds_job.py diff --git a/src/unity_initiator/actions/submit_hysds_job.py b/src/unity_initiator/actions/submit_hysds_job.py new file mode 100644 index 0000000..e4e45d0 --- /dev/null +++ b/src/unity_initiator/actions/submit_hysds_job.py @@ -0,0 +1,56 @@ +import uuid +from datetime import datetime + +import httpx + +from ..utils.logger import logger +from .base import Action + +__all__ = ["SubmitHysdsJob"] + + +class SubmitHysdsJob(Action): + def __init__(self, payload, payload_info, params): + super().__init__(payload, payload_info, params) + logger.info("instantiated %s", __class__.__name__) + + def execute(self): + """Submit job to mozart via REST API.""" + + # setup url and request body + url = f"{self._params['mozart_base_api_endpoint']}/api/v0.1/job/submit" + body = { + "queue": self._params["queue"], + "priority": self._params.get("priority", 0), + "tags": json.dumps(self._params.get("tags", [])), + "type": self._params["job_spec"], + "params": json.dumps(job_params), + "name": f"{self._params['job_spec'].split(':')[0]}-{str(uuid.uuid4())}" + } + + # build job params + job_params = { + "payload": self._payload, + "payload_info": self._payload_info, + "on_success": self._params["on_success"], + } + + # submit job + logger.info("job URL: %s", url) + logger.info("job params: %s", json.dumps(body, indent=2, sort_keys=True)) + response = httpx.post(url, data=body, verify=False) # nosec + if response.status_code in (200, 201): + success = True + resp = response.json() + logger.info( + "Successfully submitted HySDS job %s: %s", + self._params["job_spec"], + resp, + ) + else: + success = False + resp = response.text + logger.info( + "Failed to submit HySDS job %s: %s", self._params["job_spec"], resp + ) + return {"success": success, "response": resp} diff --git a/src/unity_initiator/resources/routers_schema.yaml b/src/unity_initiator/resources/routers_schema.yaml index fdbabe9..21acee3 100644 --- a/src/unity_initiator/resources/routers_schema.yaml +++ b/src/unity_initiator/resources/routers_schema.yaml @@ -69,6 +69,17 @@ submit_ogc_process_execution_action: execution_subscriber: map(required=False) on_success: include("on_success_actions", required=False) +# Configuration for submitting a HySDS job +submit_hysds_job: + name: str(equals="submit_hysds_job") + params: + mozart_base_api_endpoint: str(required=True) + job_spec: str(required=True) + queue: str(required=True) + priority: int(min=0, max=10, required=False) + tags: list(str(), required=False) + on_success: include("on_success_actions", required=False) + # Configuration to pass onto the evaluator to use when evaluation is a success. on_success_actions: actions: list(include("action_config"), required=True, min=1, max=1) diff --git a/tests/resources/test_router.yaml b/tests/resources/test_router.yaml index f7212d1..fc89083 100644 --- a/tests/resources/test_router.yaml +++ b/tests/resources/test_router.yaml @@ -148,13 +148,15 @@ initiator_config: topic_arn: arn:aws:sns:hilo-hawaii-1:123456789012:eval_airs_ingest on_success: actions: - - name: submit_dag_by_id + - name: submit_hysds_job params: - dag_id: submit_airs_ingest - airflow_base_api_endpoint: xxx - airflow_username: - airflow_password: - + mozart_base_api_endpoint: xxx + job_spec: submit_airs_ingest:v1 + queue: ingest_queue + priority: 0 + tags: + - airs + - hysds - regexes: - '(?<=/)(?Phello_world\.txt)$' From 36d468b7a2557488658a5b54ffe605a0a0a8b0b4 Mon Sep 17 00:00:00 2001 From: Gerald Manipon Date: Thu, 19 Sep 2024 11:06:09 -0700 Subject: [PATCH 2/3] fix schema --- src/unity_initiator/resources/routers_schema.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/unity_initiator/resources/routers_schema.yaml b/src/unity_initiator/resources/routers_schema.yaml index 21acee3..69faea6 100644 --- a/src/unity_initiator/resources/routers_schema.yaml +++ b/src/unity_initiator/resources/routers_schema.yaml @@ -36,7 +36,7 @@ evaluator_config: # Currently only 2 types of actions are supported: # 1. submit payload to an SNS topic # 2. submit payload to an airflow DAG -action_config: any(include("submit_dag_by_id_action"), include("submit_to_sns_topic_action"), include("submit_ogc_process_execution_action")) +action_config: any(include("submit_dag_by_id_action"), include("submit_to_sns_topic_action"), include("submit_ogc_process_execution_action"), include("submit_hysds_job_action")) # Configuration for submitting a payload to an airflow DAG. submit_dag_by_id_action: @@ -70,7 +70,7 @@ submit_ogc_process_execution_action: on_success: include("on_success_actions", required=False) # Configuration for submitting a HySDS job -submit_hysds_job: +submit_hysds_job_action: name: str(equals="submit_hysds_job") params: mozart_base_api_endpoint: str(required=True) From c2910b541b2f4184c6ecda006fadfccf35c2e949 Mon Sep 17 00:00:00 2001 From: Gerald Manipon Date: Thu, 19 Sep 2024 11:47:20 -0700 Subject: [PATCH 3/3] finalize implementation; add unit and regression tests --- .../actions/submit_hysds_job.py | 20 +++++----- tests/resources/test_router.yaml | 2 +- tests/test_lambda.py | 40 +++++++++++++++++++ tests/test_router.py | 32 +++++++++++++++ 4 files changed, 83 insertions(+), 11 deletions(-) diff --git a/src/unity_initiator/actions/submit_hysds_job.py b/src/unity_initiator/actions/submit_hysds_job.py index e4e45d0..874b8ae 100644 --- a/src/unity_initiator/actions/submit_hysds_job.py +++ b/src/unity_initiator/actions/submit_hysds_job.py @@ -1,5 +1,5 @@ +import json import uuid -from datetime import datetime import httpx @@ -17,22 +17,22 @@ def __init__(self, payload, payload_info, params): def execute(self): """Submit job to mozart via REST API.""" + # build job params + job_params = { + "payload": self._payload, + "payload_info": self._payload_info, + "on_success": self._params["on_success"], + } + # setup url and request body - url = f"{self._params['mozart_base_api_endpoint']}/api/v0.1/job/submit" + url = self._params["mozart_base_api_endpoint"] body = { "queue": self._params["queue"], "priority": self._params.get("priority", 0), "tags": json.dumps(self._params.get("tags", [])), "type": self._params["job_spec"], "params": json.dumps(job_params), - "name": f"{self._params['job_spec'].split(':')[0]}-{str(uuid.uuid4())}" - } - - # build job params - job_params = { - "payload": self._payload, - "payload_info": self._payload_info, - "on_success": self._params["on_success"], + "name": f"{self._params['job_spec'].split(':')[0]}-{str(uuid.uuid4())}", } # submit job diff --git a/tests/resources/test_router.yaml b/tests/resources/test_router.yaml index fc89083..e1e09df 100644 --- a/tests/resources/test_router.yaml +++ b/tests/resources/test_router.yaml @@ -150,7 +150,7 @@ initiator_config: actions: - name: submit_hysds_job params: - mozart_base_api_endpoint: xxx + mozart_base_api_endpoint: https://example.com/api/v0.1/job/submit job_spec: submit_airs_ingest:v1 queue: ingest_queue priority: 0 diff --git a/tests/test_lambda.py b/tests/test_lambda.py index c3168e5..27cc9fd 100644 --- a/tests/test_lambda.py +++ b/tests/test_lambda.py @@ -59,6 +59,9 @@ def setup_mock_resources(): sns_client.create_topic( Name="eval_nisar_ingest", Attributes={"TracingConfig": "Active"} ) + sns_client.create_topic( + Name="eval_airs_ingest", Attributes={"TracingConfig": "Active"} + ) # mock airflow REST API respx.post("https://example.com/api/v1/dags/eval_nisar_l0a_readiness/dagRuns").mock( @@ -83,6 +86,19 @@ def setup_mock_resources(): ) ) + # mock mozart REST API + respx.post("https://example.com/api/v0.1/job/submit").mock( + return_value=Response( + 200, + json={ + "success": True, + "message": "", + "result": "fda11fad-35f0-466e-a785-4678a0e662de", + "tags": ["airs", "hysds"], + }, + ) + ) + @respx.mock @mock_aws @@ -257,6 +273,21 @@ def test_invoke_function_all_test_cases(self): for res in results: assert res["success"] + # AIRS RetStd use case + in_data = { + "payload": "s3://bucket/prefix/AIRS.2009.06.13.001.L2.RetStd.v6.0.7.0.G13077043030.hdf" + } + invoke_res = self.client.invoke( + FunctionName=self.function_name, + InvocationType="Event", + Payload=json.dumps(in_data), + ) + logger.info("invoke_res: %s", invoke_res) + results = json.loads(invoke_res["Payload"].read().decode("utf-8")) + logger.info("results: %s", results) + for res in results: + assert res["success"] + def test_invoke_function_unrecognized(self): """Test invocations of the router lambda using an unrecognized url.""" @@ -450,3 +481,12 @@ def test_initiator_nisar_ldf(self): self.bucket_name, "prefix/NISAR_S198_PA_PA11_M00_P00922_R00_C01_G00_2024_010_18_03_05_087077000.ldf", ) + + def test_initiator_airs_retstd(self): + """Test invocations of the initiator lambda via S3 event using AIRS RetStd test case: submit_hysds_job""" + + # Upload file to trigger notification + assert self.invoke_initiator_via_s3_event( + self.bucket_name, + "prefix/AIRS.2009.06.13.001.L2.RetStd.v6.0.7.0.G13077043030.hdf", + ) diff --git a/tests/test_router.py b/tests/test_router.py index bcd9728..097bc25 100644 --- a/tests/test_router.py +++ b/tests/test_router.py @@ -178,6 +178,38 @@ def test_execute_actions_for_nisar_ldf_url(): assert res["success"] +@respx.mock +@mock_aws +def test_execute_actions_for_airs_retstd_url(): + """Test routing a url payload and executing actions: AIRS RetStd example""" + + # mock mozart REST API + respx.post("https://example.com/api/v0.1/job/submit").mock( + return_value=Response( + 200, + json={ + "success": True, + "message": "", + "result": "fda11fad-35f0-466e-a785-4678a0e662de", + "tags": ["airs", "hysds"], + }, + ) + ) + + url = "s3://bucket/prefix/AIRS.2009.06.13.001.L2.RetStd.v6.0.7.0.G13077043030.hdf" + client = boto3.client("sns") + router_file = files("tests.resources").joinpath("test_router.yaml") + router = Router(router_file) + client.create_topic( + Name=list(router.get_evaluators_by_url(url))[0].name, + Attributes={"TracingConfig": "Active"}, + ) + results = router.execute_actions(url) + logger.info("results: %s", results) + for res in results: + assert res["success"] + + @mock_aws def test_unrecognized_url(): """Test routing a url payload that is unrecognized: NISAR L0B example"""