Skip to content

Commit

Permalink
Retrieving venue-dependent parameters from SSM
Browse files Browse the repository at this point in the history
  • Loading branch information
LucaCinquini committed Apr 23, 2024
1 parent f0dcf6d commit afbade9
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 6 deletions.
27 changes: 21 additions & 6 deletions airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import shutil
import uuid
import boto3
from datetime import datetime

from airflow.models.baseoperator import chain
Expand All @@ -27,6 +28,11 @@
# This is fixed to the EFS /scratch directory in this DAG.
WORKING_DIR = "/scratch"

# SSM keys for venue-dependent parameters
UNITY_DAPA_CLIENT_ID = "/unity/sps/processing/workflows/dapa_client_id"
UNITY_DAPA_API_URL = "/unity/sps/processing/workflows/dapa_api_url"
UNITY_OUTPUT_DATA_BUCKET = "/unity/sps/processing/workflows/output_data_bucket"

# Resources needed by each Task
# EC2 r6a.xlarge 4vCPU 32GiB
CONTAINER_RESOURCES = k8s.V1ResourceRequirements(
Expand Down Expand Up @@ -100,10 +106,10 @@
"crid": Param("001", type="string"),
# Unity venue-dependent parameters
# These values should be retrieved from SSM
"unity_dapa_client": Param("40c2s0ulbhp9i0fmaph3su9jch", type="string"),
"unity_dapa_api": Param("https://d3vc8w9zcq658.cloudfront.net", type="string"),
# "unity_dapa_client": Param("40c2s0ulbhp9i0fmaph3su9jch", type="string"),
# "unity_dapa_api": Param("https://d3vc8w9zcq658.cloudfront.net", type="string"),
"unity_stac_auth": Param("UNITY", type="string"),
"output_data_bucket": Param("sps-dev-ds-storage", type="string"),
# "output_data_bucket": Param("sps-dev-ds-storage", type="string"),
},
)

Expand All @@ -112,12 +118,21 @@
# Task that serializes the job arguments into a JSON string
def setup(ti=None, **context):

# retrieve the venue-dependent parameters from SSM
# FIXME: must coordinate SSM keys with U-CS, U-DS
ssm_client = boto3.client('ssm', region_name='us-west-2')
ssm_response = ssm_client.get_parameters(Names=[UNITY_DAPA_CLIENT_ID, UNITY_DAPA_API_URL, UNITY_OUTPUT_DATA_BUCKET])

# dictionary containing venue dependent parameters common to all Tasks
# this dictionary is merged into each Task specific dictionary
venue_dict = {
"input_unity_dapa_client": context["params"]["unity_dapa_client"],
"input_unity_dapa_api": context["params"]["unity_dapa_api"],
"output_data_bucket": context["params"]["output_data_bucket"],
# the SSM response Parameters have the same order as the input keys in the request
"input_unity_dapa_client": ssm_response['Parameters'][0]['Value'],
"input_unity_dapa_api": ssm_response['Parameters'][1]['Value'],
"output_data_bucket": ssm_response['Parameters'][2]['Value'],
# "input_unity_dapa_client": context["params"]["unity_dapa_client"],
# "input_unity_dapa_api": context["params"]["unity_dapa_api"],
# "output_data_bucket": context["params"]["output_data_bucket"],
}

preprocess_dict = {
Expand Down
1 change: 1 addition & 0 deletions airflow/docker/custom_airflow/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
FROM apache/airflow:2.9.0-python3.11

RUN pip install cwltool==3.1.20240112164112
RUN pip install boto3==1.34.89

0 comments on commit afbade9

Please sign in to comment.