Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add action to submit HySDS job for NISAR ECMWF task #5

Merged
merged 3 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions src/unity_initiator/actions/submit_hysds_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import json
import uuid

import httpx

from ..utils.logger import logger
from .base import Action

__all__ = ["SubmitHysdsJob"]


class SubmitHysdsJob(Action):
def __init__(self, payload, payload_info, params):
super().__init__(payload, payload_info, params)
logger.info("instantiated %s", __class__.__name__)

def execute(self):
"""Submit job to mozart via REST API."""

# build job params
job_params = {
"payload": self._payload,
"payload_info": self._payload_info,
"on_success": self._params["on_success"],
}

# setup url and request body
url = self._params["mozart_base_api_endpoint"]
body = {
"queue": self._params["queue"],
"priority": self._params.get("priority", 0),
"tags": json.dumps(self._params.get("tags", [])),
"type": self._params["job_spec"],
"params": json.dumps(job_params),
"name": f"{self._params['job_spec'].split(':')[0]}-{str(uuid.uuid4())}",
}

# submit job
logger.info("job URL: %s", url)
logger.info("job params: %s", json.dumps(body, indent=2, sort_keys=True))
response = httpx.post(url, data=body, verify=False) # nosec
if response.status_code in (200, 201):
success = True
resp = response.json()
logger.info(
"Successfully submitted HySDS job %s: %s",
self._params["job_spec"],
resp,
)
else:
success = False
resp = response.text
logger.info(
"Failed to submit HySDS job %s: %s", self._params["job_spec"], resp
)
return {"success": success, "response": resp}
13 changes: 12 additions & 1 deletion src/unity_initiator/resources/routers_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ evaluator_config:
# Currently only 2 types of actions are supported:
# 1. submit payload to an SNS topic
# 2. submit payload to an airflow DAG
action_config: any(include("submit_dag_by_id_action"), include("submit_to_sns_topic_action"), include("submit_ogc_process_execution_action"))
action_config: any(include("submit_dag_by_id_action"), include("submit_to_sns_topic_action"), include("submit_ogc_process_execution_action"), include("submit_hysds_job_action"))

# Configuration for submitting a payload to an airflow DAG.
submit_dag_by_id_action:
Expand Down Expand Up @@ -69,6 +69,17 @@ submit_ogc_process_execution_action:
execution_subscriber: map(required=False)
on_success: include("on_success_actions", required=False)

# Configuration for submitting a HySDS job
submit_hysds_job_action:
name: str(equals="submit_hysds_job")
params:
mozart_base_api_endpoint: str(required=True)
job_spec: str(required=True)
queue: str(required=True)
priority: int(min=0, max=10, required=False)
tags: list(str(), required=False)
on_success: include("on_success_actions", required=False)

# Configuration to pass onto the evaluator to use when evaluation is a success.
on_success_actions:
actions: list(include("action_config"), required=True, min=1, max=1)
14 changes: 8 additions & 6 deletions tests/resources/test_router.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,15 @@ initiator_config:
topic_arn: arn:aws:sns:hilo-hawaii-1:123456789012:eval_airs_ingest
on_success:
actions:
- name: submit_dag_by_id
- name: submit_hysds_job
params:
dag_id: submit_airs_ingest
airflow_base_api_endpoint: xxx
airflow_username: <SSM parameter, e.g. /unity/airflow/username> <ARN to username entry in AWS Secrets Manager>
airflow_password: <SSM parameter, e.g. /unity/airflow/password> <ARN to password entry in Secrets Manager>

mozart_base_api_endpoint: https://example.com/api/v0.1/job/submit
job_spec: submit_airs_ingest:v1
queue: ingest_queue
priority: 0
tags:
- airs
- hysds

- regexes:
- '(?<=/)(?P<filename>hello_world\.txt)$'
Expand Down
40 changes: 40 additions & 0 deletions tests/test_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ def setup_mock_resources():
sns_client.create_topic(
Name="eval_nisar_ingest", Attributes={"TracingConfig": "Active"}
)
sns_client.create_topic(
Name="eval_airs_ingest", Attributes={"TracingConfig": "Active"}
)

# mock airflow REST API
respx.post("https://example.com/api/v1/dags/eval_nisar_l0a_readiness/dagRuns").mock(
Expand All @@ -83,6 +86,19 @@ def setup_mock_resources():
)
)

# mock mozart REST API
respx.post("https://example.com/api/v0.1/job/submit").mock(
return_value=Response(
200,
json={
"success": True,
"message": "",
"result": "fda11fad-35f0-466e-a785-4678a0e662de",
"tags": ["airs", "hysds"],
},
)
)


@respx.mock
@mock_aws
Expand Down Expand Up @@ -257,6 +273,21 @@ def test_invoke_function_all_test_cases(self):
for res in results:
assert res["success"]

# AIRS RetStd use case
in_data = {
"payload": "s3://bucket/prefix/AIRS.2009.06.13.001.L2.RetStd.v6.0.7.0.G13077043030.hdf"
}
invoke_res = self.client.invoke(
FunctionName=self.function_name,
InvocationType="Event",
Payload=json.dumps(in_data),
)
logger.info("invoke_res: %s", invoke_res)
results = json.loads(invoke_res["Payload"].read().decode("utf-8"))
logger.info("results: %s", results)
for res in results:
assert res["success"]

def test_invoke_function_unrecognized(self):
"""Test invocations of the router lambda using an unrecognized url."""

Expand Down Expand Up @@ -450,3 +481,12 @@ def test_initiator_nisar_ldf(self):
self.bucket_name,
"prefix/NISAR_S198_PA_PA11_M00_P00922_R00_C01_G00_2024_010_18_03_05_087077000.ldf",
)

def test_initiator_airs_retstd(self):
"""Test invocations of the initiator lambda via S3 event using AIRS RetStd test case: submit_hysds_job"""

# Upload file to trigger notification
assert self.invoke_initiator_via_s3_event(
self.bucket_name,
"prefix/AIRS.2009.06.13.001.L2.RetStd.v6.0.7.0.G13077043030.hdf",
)
32 changes: 32 additions & 0 deletions tests/test_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,38 @@ def test_execute_actions_for_nisar_ldf_url():
assert res["success"]


@respx.mock
@mock_aws
def test_execute_actions_for_airs_retstd_url():
"""Test routing a url payload and executing actions: AIRS RetStd example"""

# mock mozart REST API
respx.post("https://example.com/api/v0.1/job/submit").mock(
return_value=Response(
200,
json={
"success": True,
"message": "",
"result": "fda11fad-35f0-466e-a785-4678a0e662de",
"tags": ["airs", "hysds"],
},
)
)

url = "s3://bucket/prefix/AIRS.2009.06.13.001.L2.RetStd.v6.0.7.0.G13077043030.hdf"
client = boto3.client("sns")
router_file = files("tests.resources").joinpath("test_router.yaml")
router = Router(router_file)
client.create_topic(
Name=list(router.get_evaluators_by_url(url))[0].name,
Attributes={"TracingConfig": "Active"},
)
results = router.execute_actions(url)
logger.info("results: %s", results)
for res in results:
assert res["success"]


@mock_aws
def test_unrecognized_url():
"""Test routing a url payload that is unrecognized: NISAR L0B example"""
Expand Down