Skip to content

Commit

Permalink
add action to submit HySDS job for NISAR ECMWF task (#5)
Browse files Browse the repository at this point in the history
* initial implementation of HySDS submit job action

* fix schema

* finalize implementation; add unit and regression tests
  • Loading branch information
pymonger authored Sep 19, 2024
1 parent e84d016 commit e0f78c0
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 7 deletions.
56 changes: 56 additions & 0 deletions src/unity_initiator/actions/submit_hysds_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import json
import uuid

import httpx

from ..utils.logger import logger
from .base import Action

__all__ = ["SubmitHysdsJob"]


class SubmitHysdsJob(Action):
def __init__(self, payload, payload_info, params):
super().__init__(payload, payload_info, params)
logger.info("instantiated %s", __class__.__name__)

def execute(self):
"""Submit job to mozart via REST API."""

# build job params
job_params = {
"payload": self._payload,
"payload_info": self._payload_info,
"on_success": self._params["on_success"],
}

# setup url and request body
url = self._params["mozart_base_api_endpoint"]
body = {
"queue": self._params["queue"],
"priority": self._params.get("priority", 0),
"tags": json.dumps(self._params.get("tags", [])),
"type": self._params["job_spec"],
"params": json.dumps(job_params),
"name": f"{self._params['job_spec'].split(':')[0]}-{str(uuid.uuid4())}",
}

# submit job
logger.info("job URL: %s", url)
logger.info("job params: %s", json.dumps(body, indent=2, sort_keys=True))
response = httpx.post(url, data=body, verify=False) # nosec
if response.status_code in (200, 201):
success = True
resp = response.json()
logger.info(
"Successfully submitted HySDS job %s: %s",
self._params["job_spec"],
resp,
)
else:
success = False
resp = response.text
logger.info(
"Failed to submit HySDS job %s: %s", self._params["job_spec"], resp
)
return {"success": success, "response": resp}
13 changes: 12 additions & 1 deletion src/unity_initiator/resources/routers_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ evaluator_config:
# Currently only 2 types of actions are supported:
# 1. submit payload to an SNS topic
# 2. submit payload to an airflow DAG
action_config: any(include("submit_dag_by_id_action"), include("submit_to_sns_topic_action"), include("submit_ogc_process_execution_action"))
action_config: any(include("submit_dag_by_id_action"), include("submit_to_sns_topic_action"), include("submit_ogc_process_execution_action"), include("submit_hysds_job_action"))

# Configuration for submitting a payload to an airflow DAG.
submit_dag_by_id_action:
Expand Down Expand Up @@ -69,6 +69,17 @@ submit_ogc_process_execution_action:
execution_subscriber: map(required=False)
on_success: include("on_success_actions", required=False)

# Configuration for submitting a HySDS job
submit_hysds_job_action:
name: str(equals="submit_hysds_job")
params:
mozart_base_api_endpoint: str(required=True)
job_spec: str(required=True)
queue: str(required=True)
priority: int(min=0, max=10, required=False)
tags: list(str(), required=False)
on_success: include("on_success_actions", required=False)

# Configuration to pass onto the evaluator to use when evaluation is a success.
on_success_actions:
actions: list(include("action_config"), required=True, min=1, max=1)
14 changes: 8 additions & 6 deletions tests/resources/test_router.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,15 @@ initiator_config:
topic_arn: arn:aws:sns:hilo-hawaii-1:123456789012:eval_airs_ingest
on_success:
actions:
- name: submit_dag_by_id
- name: submit_hysds_job
params:
dag_id: submit_airs_ingest
airflow_base_api_endpoint: xxx
airflow_username: <SSM parameter, e.g. /unity/airflow/username> <ARN to username entry in AWS Secrets Manager>
airflow_password: <SSM parameter, e.g. /unity/airflow/password> <ARN to password entry in Secrets Manager>

mozart_base_api_endpoint: https://example.com/api/v0.1/job/submit
job_spec: submit_airs_ingest:v1
queue: ingest_queue
priority: 0
tags:
- airs
- hysds

- regexes:
- '(?<=/)(?P<filename>hello_world\.txt)$'
Expand Down
40 changes: 40 additions & 0 deletions tests/test_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ def setup_mock_resources():
sns_client.create_topic(
Name="eval_nisar_ingest", Attributes={"TracingConfig": "Active"}
)
sns_client.create_topic(
Name="eval_airs_ingest", Attributes={"TracingConfig": "Active"}
)

# mock airflow REST API
respx.post("https://example.com/api/v1/dags/eval_nisar_l0a_readiness/dagRuns").mock(
Expand All @@ -83,6 +86,19 @@ def setup_mock_resources():
)
)

# mock mozart REST API
respx.post("https://example.com/api/v0.1/job/submit").mock(
return_value=Response(
200,
json={
"success": True,
"message": "",
"result": "fda11fad-35f0-466e-a785-4678a0e662de",
"tags": ["airs", "hysds"],
},
)
)


@respx.mock
@mock_aws
Expand Down Expand Up @@ -257,6 +273,21 @@ def test_invoke_function_all_test_cases(self):
for res in results:
assert res["success"]

# AIRS RetStd use case
in_data = {
"payload": "s3://bucket/prefix/AIRS.2009.06.13.001.L2.RetStd.v6.0.7.0.G13077043030.hdf"
}
invoke_res = self.client.invoke(
FunctionName=self.function_name,
InvocationType="Event",
Payload=json.dumps(in_data),
)
logger.info("invoke_res: %s", invoke_res)
results = json.loads(invoke_res["Payload"].read().decode("utf-8"))
logger.info("results: %s", results)
for res in results:
assert res["success"]

def test_invoke_function_unrecognized(self):
"""Test invocations of the router lambda using an unrecognized url."""

Expand Down Expand Up @@ -450,3 +481,12 @@ def test_initiator_nisar_ldf(self):
self.bucket_name,
"prefix/NISAR_S198_PA_PA11_M00_P00922_R00_C01_G00_2024_010_18_03_05_087077000.ldf",
)

def test_initiator_airs_retstd(self):
"""Test invocations of the initiator lambda via S3 event using AIRS RetStd test case: submit_hysds_job"""

# Upload file to trigger notification
assert self.invoke_initiator_via_s3_event(
self.bucket_name,
"prefix/AIRS.2009.06.13.001.L2.RetStd.v6.0.7.0.G13077043030.hdf",
)
32 changes: 32 additions & 0 deletions tests/test_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,38 @@ def test_execute_actions_for_nisar_ldf_url():
assert res["success"]


@respx.mock
@mock_aws
def test_execute_actions_for_airs_retstd_url():
"""Test routing a url payload and executing actions: AIRS RetStd example"""

# mock mozart REST API
respx.post("https://example.com/api/v0.1/job/submit").mock(
return_value=Response(
200,
json={
"success": True,
"message": "",
"result": "fda11fad-35f0-466e-a785-4678a0e662de",
"tags": ["airs", "hysds"],
},
)
)

url = "s3://bucket/prefix/AIRS.2009.06.13.001.L2.RetStd.v6.0.7.0.G13077043030.hdf"
client = boto3.client("sns")
router_file = files("tests.resources").joinpath("test_router.yaml")
router = Router(router_file)
client.create_topic(
Name=list(router.get_evaluators_by_url(url))[0].name,
Attributes={"TracingConfig": "Active"},
)
results = router.execute_actions(url)
logger.info("results: %s", results)
for res in results:
assert res["success"]


@mock_aws
def test_unrecognized_url():
"""Test routing a url payload that is unrecognized: NISAR L0B example"""
Expand Down

0 comments on commit e0f78c0

Please sign in to comment.