A framework for implementing triggers and evaluators in the Unity SDS
Quite simply, an SDS (Science Data System) is an orchestrated set of networked compute and storage resources that is adapted to process science data through a pipeline. As described by Hua et al. [2022]:
Science Data Systems (SDSes) provide the capability to develop, test, process, and analyze instrument observational data efficiently, systematically, and at large scales. SDSes ingest the raw satellite instrument observations and process them from low‐level instrument values into higher level observational measurement values that compose the science data products.
The Unity SDS is an implementation of an SDS by the Unity project at NASA Jet Propulsion Laboratory.
Trigger events are events that could potentially kick off processing in an SDS. Examples of trigger events are:
- A raw data file is deposited into a location e.g. an S3 bucket or a local directory.
- A scheduled task runs and finds a new raw data file has been published to a data repository e.g. CMR (Common Metadata Repository), ASF DAAC's Vertex, etc.
The different types of trigger events lend themselves to particular trigger implementations. Taking #1 as an example and specifically using the S3 bucket use case, an implementation of that trigger could be to use the native S3 event notification capability to notify the SDS that a new file was deposited in the bucket. For the local directory use case, the trigger implementation could be to use the python watchdog library to monitor a local directory and to notify the SDS when a new file has been deposited there.
Taking #2 as an example, an implementation of that trigger would be a cron job running on a local machine that would start up a script that queries for new data using some remote API call which would then notify the SDS. An "all-in" cloud implementation of this trigger would be to use AWS EventBridge as the cron scheduler and AWS Lambda as the "script" that performs the querying and SDS notification.
These are just an initial subset of the different types of trigger events and their respective trigger implementations. This unity-initiator github repository provides examples of some of these trigger implementations. More importantly, however, the unity-initator provides the common interface to which any trigger implementation can notify the SDS of a triggering event. This common interface is called the initiator topic (implemented as an SNS topic) and the following screenshot from the above architecture diagram shows their interaction:
Trigger events by themselves don't automatically mean that SDS processing is ready to proceed. That's what evaluators are for.
As described by Hua et al. [2022]:
A fundamental capability of an SDS is to systematically process science data through a series of data transformations from raw instrument data to geophysical measurements. Data are first made available to the SDS from GDS to be processed to higher level data products. The data transformation steps may utilize ancillary and auxiliary files as well as production rules that stipulate conditions for when each step should be executed.
In an SDS, evaluators are functions (irrespective of how they are deployed and called) that perform adaptation-specific evaluation to determine if the next step in the processing pipeline is ready for execution. In an SDS, evaluators are functions (irrespective of how they are deployed and called) that perform adaptation-specific evaluation to determine if the next step in the processing pipeline is ready for execution.
As an example, the following shows the input-output diagram for the NISAR L-SAR L0B PGE (a.k.a. science algorithm):
The NISAR L-SAR L0B PGE is only executed when the evaluator function determines that:
- All input L0A files necessary to cover the L0B granule timespan are present in the SDS
- The following ancillary files for the input data timespan exist in the SDS and are of the correct fidelity (forecast vs. near vs. medium vs. precise): LRCLK-UTC, orbit ephemeris, radar pointing, radar config, BFPQ lookup tables, LSAR channel data
- Metadata regarding the NISAR-specific observation plan, CTZ (cycle time zero) and other orbit-related fields fields are available from these ancillary files: dCOP, oROST, STUF
When evaluation is successful, the L0B PGE job is submitted, L0B products are produced, and evaluators for downstream PGEs (e.g. L1) are executed.
The unity-initiator github repository provides examples of evaluators that can be used as templates to adapt and deploy for a mission or project. More importantly, the unity-initiator provides the set of common interfaces for which any adaptation-specific evaluator can be called as a result of a trigger event. Currently there are 2 supported interfaces but this repository is organized and structured to easily extend to new interfaces:
- Trigger event information published to an evaluator SNS topic + SQS queue executes an evaluator implemented as an AWS Lambda function (submit_to_sns_topic action)
- Trigger event information submitted as DAG run for an evaluator implemented in SPS (submit_dag_by_id action)
The following screenshot shows examples of both of these interfaces:
It is the responsibility of the initiator to perform the routing of triggers to their respective evaluators.
The Unity initiator is the set of compute resources that enable the routing of trigger events to their respective evaluators. It is agnostic of the trigger event source and agnostic of the adaptation-specific evaluator code. It is completely driven by configuration (a.k.a. router configuration YAML). The following screenshot shows the current architecture for the initiator:
The initiator topic, an SNS topic, is the common interface that all triggers will submit events to. The initiator topic is subscribed to by the initiator SQS queue (complete with dead-letter queue for resiliency) which in turn is subscribed to by the router Lambda function. How the router Lambda routes payloads of the trigger events is defined by the router configuration YAML. The full YAML schema for the router configuration is located here.
In the context of trigger events where a new file is detected (payload_type=url
), the router Lambda extracts the URL of the new file, instantiates a router object and attempts to match it up against of set of regular expressions defined in the router configuration file. Let's consider this minimal router configuration YAML file example:
initiator_config:
name: minimal config example
payload_type:
url:
- regexes:
- '/(?P<id>(?P<Mission>NISAR)_S(?P<SCID>\d{3})_(?P<Station>\w{2,3})_(?P<Antenna>\w{3,4})_M(?P<Mode>\d{2})_P(?P<Pass>\d{5})_R(?P<Receiver>\d{2})_C(?P<Channel>\d{2})_G(?P<Group>\d{2})_(?P<FileCreationDateTime>\d{4}_\d{3}_\d{2}_\d{2}_\d{2}_\d{6})\d{3}\.vc(?P<VCID>\w{2}))$'
evaluators:
- name: eval_nisar_ingest
actions:
- name: submit_to_sns_topic
params:
topic_arn: arn:aws:sns:hilo-hawaii-1:123456789012:eval_nisar_ingest
on_success:
actions:
- name: submit_dag_by_id
params:
dag_id: submit_nisar_tlm_ingest
airflow_base_api_endpoint: xxx
airflow_username: <SSM parameter, e.g. /unity/airflow/username> <ARN to username entry in AWS Secrets Manager>
airflow_password: <SSM parameter, e.g. /unity/airflow/password> <ARN to password entry in Secrets Manager>
and a trigger event payload for a new file that was triggered:
{
"payload": "s3://test_bucket/prefix/NISAR_S198_PA_PA11_M00_P00922_R00_C01_G00_2024_010_17_57_57_714280000.vc29"
}
The router will iterate over the set of url configs and attempt to match the URL against its set of regexes. If a match is successful, the router will iterate over the configured evaluators configs and perform the configured action to submit the URL payload to the evaluator interface (either SNS topic or DAG submission). In this case, the router sees that the action is submit_to_sns_topic
and thus publishes the URL payload (and the regular expression captured groups as payload_info
) to the SNS topic (topic_arn
) configured in the action's parameters. In addition to the payload URL and the payload info, the router also includes the on_success
parameters configured for the action. This will propagate pertinent info to the underlying evaluator code which would be used if evaluation is successful. In this case, if the evaulator successfully evaluates that everything is ready for this input file, it can proceed to submit a DAG run for the submit_nisar_tlm_ingest
DAG in the underlying SPS.
Let's consider another example but this time the configured action is to submit a DAG run instead of publishing to an evaluator's SNS topic:
initiator_config:
name: minimal config example
payload_type:
url:
- regexes:
- '/(?P<id>(?P<Mission>NISAR)_S(?P<SCID>\d{3})_(?P<Station>\w{2,3})_(?P<Antenna>\w{3,4})_M(?P<Mode>\d{2})_P(?P<Pass>\d{5})_R(?P<Receiver>\d{2})_C(?P<Channel>\d{2})_G(?P<Group>\d{2})_(?P<FileCreationDateTime>\d{4}_\d{3}_\d{2}_\d{2}_\d{2}_\d{5})(?P<R>\d{1,4})\.ldf)$'
evaluators:
- name: eval_nisar_l0a_readiness
actions:
- name: submit_dag_by_id
params:
dag_id: eval_nisar_l0a_readiness
airflow_base_api_endpoint: https://example.com/api/v1
airflow_username: <SSM parameter, e.g. /unity/airflow/username> <ARN to username entry in AWS Secrets Manager>
airflow_password: <SSM parameter, e.g. /unity/airflow/password> <ARN to password entry in Secrets Manager>
on_success:
actions:
- name: submit_dag_by_id
params:
dag_id: submit_nisar_l0a_te_dag
# These are commented out because by default they will be pulled from the above configuration since we're in airflow.
# Otherwise these can be uncommented out for explicit configuration (e.g. another SPS cluster)
#airflow_base_api_endpoint: xxx
#airflow_username: <ARN to username entry in AWS Secrets Manager>
#airflow_password: <ARN to password entry in Secrets Manager>
and a trigger event payload for a new file that was triggered:
{
"payload": "s3://test_bucket/prefix/NISAR_S198_PA_PA11_M00_P00922_R00_C01_G00_2024_010_17_57_57_714280000.ldf"
}
In this case, the router sees that the action is submit_dag_by_id
and thus makes a REST call to SPS to submit the URL payload, payload info, and on_success
parameters as a DAG run. If the evaulator, running now as a DAG in SPS instead of an AWS Lambda function, successfully evaluates that everything is ready for this input file, it can proceed to submit a DAG run for the submit_nisar_l0a_te_dag
DAG in the underlying SPS.
- Examples of triggers
- Example templates of evaluators
- Configuration-driven routing of trigger events to evaluators
- Terraform script for easy of deploying the initiator, triggers, and evaluators
- Features
- Contents
- Quick Start
- Requirements
- Setting Up the End-to-End Demo
- Deploying the Initiator
- Deploying an Example Evaluator (SNS topic->SQS queue->Lambda)
- Deploying an S3 Event Notification Trigger
- Verify End-to-End Functionality (part 1)
- Deploying an EventBridge Scheduler Trigger
- Verify End-to-End Functionality (part 2)
- Deploying an EventBridge Scheduler Trigger for Periodic CMR Queries
- Verify End-to-End Functionality (part 3)
- Tear Down
- Setup Instructions for Development
- Build Instructions
- Test Instructions
- Changelog
- Frequently Asked Questions (FAQ)
- Contributing
- License
- References
This guide provides a quick way to get started with our project. Please see our [docs]([INSERT LINK TO DOCS SITE / WIKI HERE]) for a more comprehensive overview.
- python 3.9+
- docker
- hatch
- terraform
- all other dependencies (defined in the pyproject.toml) will be installed and managed by hatch
-
Clone repo:
git clone https://github.com/unity-sds/unity-initiator.git
-
Change directory to the location of the centralized log group terraform:
cd unity-initiator/terraform-unity/centralized_log_group/
-
Set up environment variables for
project
(by defaultuod
) and venue (by defaultdev
):export PROJECT=<your project, e.g. uod> export VENUE=<your venue, e.g. dev>
-
Initialize terraform:
terraform init
-
Run terraform apply:
terraform apply \ --var project=${PROJECT} \ --var venue=${VENUE} \ -auto-approve
Take note of the
centralized_log_group_name
that is output by terraform. It will be used when setting up other resources (e.g. initiator, trigger and evaluator lambdas). -
Export the
centralized_log_group_name
that was output from the centralized log group terraform deployment:export CENTRALIZED_LOG_GROUP=<your log group name, e.g. /unity/log/uod-dev-initiator-centralized-log-group>
-
Change directory to the location of the inititator terraform:
cd ../initiator/
-
You will need an S3 bucket for terraform to stage the router Lambda zip file and router configuration YAML file during deployment. Create one or reuse an existing one and set an environment variable for it:
export CODE_BUCKET=<some S3 bucket name>
-
Copy a sample router configuration YAML file to use for deployment and update the AWS region and AWS account ID to match your AWS environment. We will be using the NISAR TLM test case for this demo so we also rename the SNS topic ARN for it accordingly. We then upload the router configuration file:
cp ../../tests/resources/test_router.yaml . export AWS_ACCOUNT_ID=$(aws sts get-caller-identity --output text | awk '{print $1}') export AWS_REGION=$(aws configure get region) sed -i "s/hilo-hawaii-1/${AWS_REGION}/g" test_router.yaml sed -i "s/123456789012:eval_nisar_ingest/${AWS_ACCOUNT_ID}:uod-dev-eval_nisar_ingest-evaluator_topic/g" test_router.yaml sed -i "s/123456789012:eval_airs_ingest/${AWS_ACCOUNT_ID}:uod-dev-eval_airs_ingest-evaluator_topic/g" test_router.yaml aws s3 cp test_router.yaml s3://${CODE_BUCKET}/test_router.yaml
-
Initialize terraform:
terraform init
-
Run terraform apply:
terraform apply \ --var project=${PROJECT} \ --var venue=${VENUE} \ --var code_bucket=${CODE_BUCKET} \ --var router_config=s3://${CODE_BUCKET}/test_router.yaml \ -auto-approve
Take note of the
initiator_topic_arn
that is output by terraform. It will be used when setting up any triggers. -
Export the
initiator_topic_arn
that was output from the initiator terraform deployment:export INITIATOR_TOPIC_ARN=<initiator topic ARN>
In this demo we will deploy 2 evaluators:
-
eval_nisar_ingest
- evaluate ingestion of NISAR telemetry files deposited into the ISL bucket -
eval_airs_ingest
- evaluate ingestion of AIRS RetStd files returned by a periodic CMR query
-
Change directory to the location of the evaluators terraform:
cd ../evaluators
-
Make a copy of the
sns_sqs_lambda
directory for the NISAR TLM evaluator:cp -rp sns-sqs-lambda sns-sqs-lambda-nisar-tlm
-
Change directory into the NISAR TLM evaluator terraform:
cd sns-sqs-lambda-nisar-tlm/
-
Set the name of the evaluator to our NISAR example:
export EVALUATOR_NAME=eval_nisar_ingest
-
Note the implementation of the evaluator code. It currently doesn't do any real evaluation but simply returns that evaluation was successful:
cat lambda_handler.py
-
Initialize terraform:
terraform init
-
Run terraform apply:
terraform apply \ --var project=${PROJECT} \ --var venue=${VENUE} \ --var evaluator_name=${EVALUATOR_NAME} \ --var code_bucket=${CODE_BUCKET} \ -auto-approve
Take note of the
evaluator_topic_arn
that is output by terraform. It should match the topic ARN in the test_router.yaml file you used during the initiator deployment. If they match then the router Lambda is now able to submit payloads to this evaluator SNS topic.
-
Change directory to the location of the evaluators terraform:
cd ..
-
Make a copy of the
sns_sqs_lambda
directory for the AIRS RetStd evaluator:cp -rp sns-sqs-lambda sns-sqs-lambda-airs-retstd
-
Change directory into the AIRS RetStd evaluator terraform:
cd sns-sqs-lambda-airs-retstd/
-
Set the name of the evaluator to our AIRS example:
export EVALUATOR_NAME=eval_airs_ingest
-
Note the implementation of the evaluator code. It currently doesn't do any real evaluation but simply returns that evaluation was successful:
cat lambda_handler.py
-
Initialize terraform:
terraform init
-
Run terraform apply:
terraform apply \ --var project=${PROJECT} \ --var venue=${VENUE} \ --var evaluator_name=${EVALUATOR_NAME} \ --var code_bucket=${CODE_BUCKET} \ -auto-approve
Take note of the
evaluator_topic_arn
that is output by terraform. It should match the respective topic ARN in the test_router.yaml file you used during the initiator deployment. If they match then the router Lambda is now able to submit payloads to this evaluator SNS topic.
-
Change directory to the location of the s3-bucket-notification trigger terraform:
cd ../../triggers/s3-bucket-notification/
-
You will need an S3 bucket to configure event notification on. Create one or reuse an existing one (could be the same one in the previous steps) and set an environment variable for it:
export ISL_BUCKET=<some S3 bucket name>
-
Specify an S3 prefix from which S3 event notifications will be emitted when objects are created:
export ISL_BUCKET_PREFIX=incoming/
-
Export the
initiator_topic_arn
that was output from the initiator terraform deployment:export INITIATOR_TOPIC_ARN=<initiator topic ARN>
-
Initialize terraform:
terraform init
-
Run terraform apply:
terraform apply \ --var isl_bucket=${ISL_BUCKET} \ --var isl_bucket_prefix=${ISL_BUCKET_PREFIX} \ --var initiator_topic_arn=${INITIATOR_TOPIC_ARN} \ -auto-approve
-
Verify that the S3 event notification was correctly hooked up to the initiator by looking at the initiator Lambda's CloudWatch logs for a entry similar to this:
-
Create some fake NISAR TLM files and stage them up to the ISL bucket under the ISL prefix:
for i in $(echo 24 25 29); do echo 'Hawaii, No Ka Oi!' > NISAR_S198_PA_PA11_M00_P00922_R00_C01_G00_2024_010_17_57_57_714280000.vc${i} aws s3 cp NISAR_S198_PA_PA11_M00_P00922_R00_C01_G00_2024_010_17_57_57_714280000.vc${i} s3://${ISL_BUCKET}/${ISL_BUCKET_PREFIX} rm NISAR_S198_PA_PA11_M00_P00922_R00_C01_G00_2024_010_17_57_57_714280000.vc${i} done
-
Verify that the
eval_nisar_ingest
evaluator Lambda function was called successfully for each of those staged files by looking at its CloudWatch logs for entries similar to this:
-
Change directory to the location of the scheduled-task trigger terraform:
cd ../scheduled-task/
-
Note the implementation of the trigger lambda code. It currently hard codes a payload URL however in a real implementation, code would be written to query for new files from some REST API, database, etc. Here we simulate that and simply return a NISAR TLM file:
cat data.tf
-
Initialize terraform:
terraform init
-
Run terraform apply. Note the PROJECT, VENUE and INITIATOR_TOPIC_ARN environment variables should have been set in the previous steps. If not set them again:
terraform apply \ --var project=${PROJECT} \ --var venue=${VENUE} \ --var initiator_topic_arn=${INITIATOR_TOPIC_ARN} \ -auto-approve
- The deployed EventBridge scheduler runs the trigger Lambda function with schedule expression of
rate(1 minute)
. After a minute, verify that theeval_nisar_ingest
evaluator Lambda function was called successfully for each of those scheduled invocations by looking at its CloudWatch logs for entries similar to this:
-
Change directory to the location of the cmr-query trigger terraform:
cd ../cmr-query/
-
Note the implementation of the trigger lambda code. It will query CMR for granules for a particular collection within a timeframe, query its dynamodb table if they already exist, and if not, submit them as payload URLs to the initiator SNS topic and save them into the dynamodb table:
cat lambda_handler.py
-
Set the CMR provider ID for the AIRS RetStd collection:
export PROVIDER_ID=GES_DISC
-
Set the CMR concept ID for the AIRS RetStd collection:
export CONCEPT_ID=C1701805619-GES_DISC
-
Set the amount of seconds to look back from the current epoch for granules in the collection. For example, we will set this value to 2 days (172800 seconds) so that when the CMR query lambda kicks off, it will query for all AIRS RetStd granules using a temporal search of
now - 172800 seconds
tonow
:export SECONDS_BACK=172800
-
Initialize terraform:
terraform init
-
Run terraform apply. Note the PROJECT, CODE_BUCKET and INITIATOR_TOPIC_ARN environment variables should have been set in the previous steps. If not set them again:
terraform apply \ --var project=${PROJECT} \ --var venue=${VENUE} \ --var code_bucket=${CODE_BUCKET} \ --var initiator_topic_arn=${INITIATOR_TOPIC_ARN} \ --var provider_id=${PROVIDER_ID} \ --var concept_id=${CONCEPT_ID} \ --var seconds_back=${SECONDS_BACK} \ -auto-approve
- The deployed EventBridge scheduler runs the trigger CMR query Lambda function with schedule expression of
rate(1 minute)
. After a minute, verify that theeval_airs_ingest
evaluator Lambda function was called successfully for each of those scheduled invocations by looking at its CloudWatch logs for entries similar to this:
- Simply go back into each of the terraform directories for which
terraform apply
was run and runterraform destroy
.
-
Clone repo:
git clone https://github.com/unity-sds/unity-initiator.git
-
Install hatch:
pip install hatch
-
Build virtualenv and install dependencies:
cd unity-initiator hatch env create
-
Install dev tools:
./scripts/install_dev_tools.sh
-
Test pre-commit run:
pre-commit run --all-files
You should see the following output:
check for merge conflicts...............................................................Passed check for broken symlinks...........................................(no files to check)Skipped trim trailing whitespace................................................................Passed isort...................................................................................Passed black...................................................................................Passed ruff....................................................................................Passed bandit..................................................................................Passed prospector..............................................................................Passed Terraform fmt...........................................................................Passed Terraform docs..........................................................................Passed Terraform validate......................................................................Passed Lock terraform provider versions........................................................Passed Terraform validate with tflint..........................................................Passed Terraform validate with tfsec (deprecated, use "terraform_trivy").......................Passed
-
Follow Setup Instructions for Development above.
-
Enter environment:
hatch shell
-
Build:
hatch build
Wheel and tarballs will be built in the
dist/
directory:$ tree dist dist ├── unity_initiator-0.0.1-py3-none-any.whl └── unity_initiator-0.0.1.tar.gz 1 directory, 2 files
-
Follow Setup Instructions for Development above.
-
Enter environment:
hatch shell
-
Run tests:
hatch run pytest
For more information during test runs, set the log level accordingly. For example:
hatch run pytest -s -v --log-cli-level=INFO --log-level=INFO
See our CHANGELOG.md for a history of our changes.
See our releases page for our key versioned releases.
No questions yet. Propose a question to be added here by reaching out to our contributors! See support section below.
Interested in contributing to our project? Please see our: CONTRIBUTING.md
For guidance on how to interact with our team, please see our code of conduct located at: CODE_OF_CONDUCT.md
For guidance on our governance approach, including decision-making process and our various roles, please see our governance model at: GOVERNANCE.md
unity-initiator
is distributed under the terms of the MIT license.
[1] Hua, H., Manipon, G. and Shah, S. (2022). Scaling Big Earth Science Data Systems Via Cloud Computing. In Big Data Analytics in Earth, Atmospheric, and Ocean Sciences (eds T. Huang, T.C. Vance and C. Lynnes). https://doi.org/10.1002/9781119467557.ch3