Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

220 - Define stage in, process, stage out tasks in entrypoint #240

Merged
merged 20 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
8689bf6
Initial work to break out setup task work and define process arguments
nikki-t Nov 13, 2024
4ac5e6d
Add HTTP download option
nikki-t Nov 19, 2024
9676369
Fix linting and code format
nikki-t Nov 19, 2024
8b7fd85
Initial stage in, process, stage out entrypoint
nikki-t Nov 26, 2024
eb7245b
Re-organize input parameters and fix command line arguments to entryp…
nikki-t Nov 26, 2024
8c119a5
Fix formatting and references to CWL workflow files
nikki-t Nov 26, 2024
4dd974b
Fix stage in workflow reference
nikki-t Nov 26, 2024
9280d74
Fix reference to python utilities and install jq
nikki-t Nov 26, 2024
1e3524d
Pull stage out AWS credentials from SSM parameters
nikki-t Nov 26, 2024
f804def
Set entrypoint utils script to executable
nikki-t Nov 26, 2024
97e7a52
Build separate CWL DAG modular container image
nikki-t Dec 3, 2024
b853b64
Merge branch 'develop' of github.com:unity-sds/unity-sps into 220-sta…
nikki-t Dec 3, 2024
2b6a01a
Clean up variables and stage out input parameter
nikki-t Dec 3, 2024
83f560b
Fix code formatting
nikki-t Dec 3, 2024
a1be311
Merge branch 'develop' of github.com:unity-sds/unity-sps into 220-sta…
nikki-t Dec 3, 2024
df59659
Update to new DS container image
nikki-t Dec 10, 2024
71429ed
Define new stage in and stage out CWL workflows
nikki-t Dec 16, 2024
6bdd735
Remove SSM parameter query for project and venue and define from envi…
nikki-t Dec 16, 2024
2759143
Update stage in STAC JSON default
nikki-t Dec 17, 2024
ed69526
Merge branch 'develop' of github.com:unity-sds/unity-sps into 220-sta…
nikki-t Dec 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions .github/workflows/build_docker_images.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ env:
TAG: ${{ github.event.inputs.tag }}
SPS_AIRFLOW: ${{ github.repository }}/sps-airflow
SPS_DOCKER_CWL: ${{ github.repository }}/sps-docker-cwl
SPS_DOCKER_CWL_MODULAR: ${{ github.repository }}/sps-docker-cwl-modular

jobs:
build-sps-airflow:
Expand Down Expand Up @@ -61,3 +62,26 @@ jobs:
push: true
tags: ${{ env.REGISTRY }}/${{ env.SPS_DOCKER_CWL }}:${{ env.TAG }}
labels: ${{ steps.metascheduler.outputs.labels }}
build-sps-docker-cwl-modular:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Log in to the Container registry
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata (tags, labels) for SPS Docker CWL modular image
id: metascheduler
uses: docker/metadata-action@v5
with:
images: ${{ env.REGISTRY }}/${{ env.SPS_DOCKER_CWL_MODULAR }}
- name: Build and push SPS Docker CWL modular image
uses: docker/build-push-action@v5
with:
context: ./airflow/docker/cwl
file: airflow/docker/cwl/Dockerfile-modular
push: true
tags: ${{ env.REGISTRY }}/${{ env.SPS_DOCKER_CWL_MODULAR }}:${{ env.TAG }}
labels: ${{ steps.metascheduler.outputs.labels }}
223 changes: 98 additions & 125 deletions airflow/dags/cwl_dag_modular.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@

The Airflow KubernetesPodOperator starts a Docker container that includes the Docker engine and the CWL libraries.
The "cwl-runner" tool is invoked to execute the CWL workflow.
Parameter cwl_workflow: the URL of the CWL workflow to execute.
Parameter args_as_json: JSON string contained the specific values for the workflow specific inputs.
Parameter stage_in_args: The stage in job parameters encoded as a JSON string
Parameter process_workflow: the URL of the CWL workflow to execute.
Parameter process_args: JSON string contained the specific values for the processing workflow specific inputs.
Parameter stage_out_bucket: The S3 bucket to stage data out to.
Parameter collection_id: The output collection identifier for processed data.
"""

import json
Expand All @@ -25,8 +28,8 @@
from airflow import DAG

# Task constants
UNITY_STAGE_IN_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/unity-data-services/refs/heads/cwl-examples/cwl/stage-in-unity/stage-in-workflow.cwl"
DAAC_STAGE_IN_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/unity-data-services/refs/heads/cwl-examples/cwl/stage-in-daac/stage-in-workflow.cwl"
STAGE_IN_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/unity-sps-workflows/refs/heads/220-stage-in-task/demos/cwl_dag_modular_stage_in.cwl"
STAGE_OUT_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/unity-sps-workflows/refs/heads/220-stage-in-task/demos/cwl_dag_modular_stage_out.cwl"
LOCAL_DIR = "/shared-task-data"

# The path of the working directory where the CWL workflow is executed
Expand All @@ -35,21 +38,19 @@
WORKING_DIR = "/scratch"

# Default parameters
DEFAULT_CWL_WORKFLOW = (
"https://raw.githubusercontent.com/unity-sds/unity-sps-workflows/main/demos/echo_message.cwl"
DEFAULT_STAC_JSON = "https://raw.githubusercontent.com/unity-sds/unity-tutorial-application/refs/heads/main/test/stage_in/stage_in_results.json"
DEFAULT_PROCESS_WORKFLOW = (
"https://raw.githubusercontent.com/mike-gangl/unity-OGC-example-application/refs/heads/main/process.cwl"
)
DEFAULT_CWL_ARGUMENTS = json.dumps({"message": "Hello Unity"})
DEFAULT_STAC_JSON_URL = "https://cmr.earthdata.nasa.gov/stac/LPCLOUD/collections/EMITL1BRAD_001/items?limit=2"
DEFAULT_INPUT_LOCATION = "daac"

DEFAULT_PROCESS_ARGS = json.dumps({"example_argument_empty": ""})

# Alternative arguments to execute SBG Pre-Process
# DEFAULT_CWL_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.cwl"
# DEFAULT_CWL_ARGUMENTS = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.dev.yml"
# DEFAULT_PROCESS_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.cwl"
# DEFAULT_PROCESS_ARGS = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.dev.yml"

# Alternative arguments to execute SBG end-to-end
# DEFAULT_CWL_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/L1-to-L2-e2e.cwl"
# DEFAULT_CWL_ARGUMENTS = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/L1-to-L2-e2e.dev.yml"
# DEFAULT_PROCESS_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/L1-to-L2-e2e.cwl"
# DEFAULT_PROCESS_ARGS = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/L1-to-L2-e2e.dev.yml"

# Alternative arguments to execute SBG end-to-end
# unity_sps_sbg_debug.txt
Expand All @@ -67,13 +68,6 @@
# "ephemeral-storage": "30Gi"
# },
)
STAGE_IN_CONTAINER_RESOURCES = k8s.V1ResourceRequirements(
requests={
"memory": "4Gi",
"cpu": "4",
"ephemeral-storage": "{{ params.request_storage }}",
}
)

# Default DAG configuration
dag_default_args = {
Expand All @@ -95,14 +89,25 @@
max_active_tasks=30,
default_args=dag_default_args,
params={
"cwl_workflow": Param(
DEFAULT_CWL_WORKFLOW, type="string", title="CWL workflow", description="The CWL workflow URL"
"stac_json": Param(
DEFAULT_STAC_JSON,
type="string",
title="STAC JSON",
description="STAC JSON data to download granules encoded as a JSON string or the URL of a JSON or YAML file",
),
"process_workflow": Param(
DEFAULT_PROCESS_WORKFLOW,
type="string",
title="Processing workflow",
description="The processing workflow URL",
),
"cwl_args": Param(
DEFAULT_CWL_ARGUMENTS,
"process_args": Param(
DEFAULT_PROCESS_ARGS,
type="string",
title="CWL workflow parameters",
description=("The job parameters encoded as a JSON string," "or the URL of a JSON or YAML file"),
title="Processing workflow parameters",
description=(
"The processing job parameters encoded as a JSON string," "or the URL of a JSON or YAML file"
),
),
"request_memory": Param(
"4Gi",
Expand All @@ -123,144 +128,113 @@
title="Docker container storage",
),
"use_ecr": Param(False, type="boolean", title="Log into AWS Elastic Container Registry (ECR)"),
"stac_json_url": Param(
DEFAULT_STAC_JSON_URL,
type="string",
title="STAC JSON URL",
description="The URL to the STAC JSON document",
),
"input_location": Param(
DEFAULT_INPUT_LOCATION,
type="string",
enum=["daac", "unity"],
title="Input data location",
description="Indicate whether input data should be retrieved from a DAAC or Unity",
),
},
)


def setup(ti=None, **context):
def create_local_dir(dag_run_id):
"""
Task that creates the working directory on the shared volume
and parses the input parameter values.
Create local directory for working DAG data.
"""
context = get_current_context()
dag_run_id = context["dag_run"].run_id
local_dir = f"{LOCAL_DIR}/{dag_run_id}"
logging.info(f"Creating directory: {local_dir}")
os.makedirs(local_dir, exist_ok=True)
logging.info(f"Created directory: {local_dir}")

# select the node pool based on what resources were requested

def select_node_pool(ti, request_storage, request_memory, request_cpu):
"""
Select node pool based on resources requested in input parameters.
"""
node_pool = unity_sps_utils.NODE_POOL_DEFAULT
storage = context["params"]["request_storage"] # 100Gi
storage = int(storage[0:-2]) # 100
memory = context["params"]["request_memory"] # 32Gi
memory = int(memory[0:-2]) # 32
cpu = int(context["params"]["request_cpu"]) # 8
storage = int(request_storage[0:-2]) # 100Gi -> 100
memory = int(request_memory[0:-2]) # 32Gi -> 32
cpu = int(request_cpu) # 8

logging.info(f"Requesting storage={storage}Gi memory={memory}Gi CPU={cpu}")
if (storage > 30) or (memory > 32) or (cpu > 8):
node_pool = unity_sps_utils.NODE_POOL_HIGH_WORKLOAD
logging.info(f"Selecting node pool={node_pool}")
ti.xcom_push(key="node_pool_processing", value=node_pool)

# select "use_ecr" argument and determine if ECR login is required
logging.info("Use ECR: %s", context["params"]["use_ecr"])
if context["params"]["use_ecr"]:

def select_ecr(ti, use_ecr):
"""
Determine if ECR login is required.
"""
logging.info("Use ECR: %s", use_ecr)
if use_ecr:
ecr_login = os.environ["AIRFLOW_VAR_ECR_URI"]
ti.xcom_push(key="ecr_login", value=ecr_login)
logging.info("ECR login: %s", ecr_login)

# define stage in arguments
stage_in_args = {"download_dir": "input", "stac_json": context["params"]["stac_json_url"]}

# select stage in workflow based on input location
if context["params"]["input_location"] == "daac":
stage_in_workflow = DAAC_STAGE_IN_WORKFLOW
else:
stage_in_workflow = UNITY_STAGE_IN_WORKFLOW
ssm_client = boto3.client("ssm", region_name="us-west-2")
ss_acct_num = ssm_client.get_parameter(Name=unity_sps_utils.SS_ACT_NUM, WithDecryption=True)[
"Parameter"
]["Value"]
unity_client_id = ssm_client.get_parameter(
Name=f"arn:aws:ssm:us-west-2:{ss_acct_num}:parameter{unity_sps_utils.DS_CLIENT_ID_PARAM}",
WithDecryption=True,
)["Parameter"]["Value"]
stage_in_args["unity_client_id"] = unity_client_id
def select_stage_out(ti):
"""Retrieve stage out input parameters from SSM parameter store."""
ssm_client = boto3.client("ssm", region_name="us-west-2")

ti.xcom_push(key="stage_in_workflow", value=stage_in_workflow)
logging.info("Stage In workflow selected: %s", stage_in_workflow)
project = os.environ["AIRFLOW_VAR_UNITY_PROJECT"]
venue = os.environ["AIRFLOW_VAR_UNITY_VENUE"]
staging_bucket = ssm_client.get_parameter(Name=unity_sps_utils.DS_S3_BUCKET_PARAM, WithDecryption=True)[
"Parameter"
]["Value"]

ti.xcom_push(key="stage_in_args", value=stage_in_args)
logging.info("Stage in arguments selected: %s", stage_in_args)
stage_out_args = json.dumps({"project": project, "venue": venue, "staging_bucket": staging_bucket})
logging.info(f"Selecting stage out args={stage_out_args}")
ti.xcom_push(key="stage_out_args", value=stage_out_args)


setup_task = PythonOperator(task_id="Setup", python_callable=setup, dag=dag)
def setup(ti=None, **context):
"""
Task that creates the working directory on the shared volume
and parses the input parameter values.
"""
context = get_current_context()

# create local working directory
dag_run_id = context["dag_run"].run_id
create_local_dir(dag_run_id)

cwl_task_stage_in = unity_sps_utils.SpsKubernetesPodOperator(
retries=0,
task_id="cwl_task_stage_in",
namespace=unity_sps_utils.POD_NAMESPACE,
name="cwl-task-pod",
image=unity_sps_utils.SPS_DOCKER_CWL_IMAGE,
service_account_name="airflow-worker",
in_cluster=True,
get_logs=True,
startup_timeout_seconds=1800,
arguments=[
"-w",
"{{ ti.xcom_pull(task_ids='Setup', key='stage_in_workflow') }}",
"-j",
"{{ ti.xcom_pull(task_ids='Setup', key='stage_in_args') }}",
"-e",
"{{ ti.xcom_pull(task_ids='Setup', key='ecr_login') }}",
],
container_security_context={"privileged": True},
container_resources=STAGE_IN_CONTAINER_RESOURCES,
container_logs=True,
volume_mounts=[
k8s.V1VolumeMount(name="workers-volume", mount_path=WORKING_DIR, sub_path="{{ dag_run.run_id }}")
],
volumes=[
k8s.V1Volume(
name="workers-volume",
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="airflow-kpo"),
)
],
dag=dag,
node_selector={"karpenter.sh/nodepool": unity_sps_utils.NODE_POOL_DEFAULT},
labels={"app": unity_sps_utils.POD_LABEL},
annotations={"karpenter.sh/do-not-disrupt": "true"},
# note: 'affinity' cannot yet be templated
affinity=unity_sps_utils.get_affinity(
capacity_type=["spot"],
# instance_type=["t3.2xlarge"],
anti_affinity_label=unity_sps_utils.POD_LABEL,
),
on_finish_action="keep_pod",
is_delete_operator_pod=False,
)
# select the node pool based on what resources were requested
select_node_pool(
ti,
context["params"]["request_storage"],
context["params"]["request_memory"],
context["params"]["request_cpu"],
)

# select "use_ecr" argument and determine if ECR login is required
select_ecr(ti, context["params"]["use_ecr"])

# retrieve stage out aws api key and account id
select_stage_out(ti)


setup_task = PythonOperator(task_id="Setup", python_callable=setup, dag=dag)


cwl_task_processing = unity_sps_utils.SpsKubernetesPodOperator(
retries=0,
task_id="cwl_task_processing",
namespace=unity_sps_utils.POD_NAMESPACE,
name="cwl-task-pod",
image=unity_sps_utils.SPS_DOCKER_CWL_IMAGE,
image=unity_sps_utils.SPS_DOCKER_CWL_IMAGE_MODULAR,
service_account_name="airflow-worker",
in_cluster=True,
get_logs=True,
startup_timeout_seconds=1800,
arguments=[
"-i",
STAGE_IN_WORKFLOW,
"-s",
"{{ params.stac_json }}",
"-w",
"{{ params.cwl_workflow }}",
"{{ params.process_workflow }}",
"-j",
"{{ params.cwl_args }}",
"{{ params.process_args }}",
"-o",
STAGE_OUT_WORKFLOW,
"-d",
"{{ ti.xcom_pull(task_ids='Setup', key='stage_out_args') }}",
"-e",
"{{ ti.xcom_pull(task_ids='Setup', key='ecr_login') }}",
],
Expand Down Expand Up @@ -313,6 +287,5 @@ def cleanup(**context):
task_id="Cleanup", python_callable=cleanup, dag=dag, trigger_rule=TriggerRule.ALL_DONE
)

chain(
setup_task.as_setup(), cwl_task_stage_in, cwl_task_processing, cleanup_task.as_teardown(setups=setup_task)
)

chain(setup_task.as_setup(), cwl_task_processing, cleanup_task.as_teardown(setups=setup_task))
25 changes: 25 additions & 0 deletions airflow/docker/cwl/Dockerfile_modular
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# docker:dind Dockerfile: https://github.com/docker-library/docker/blob/master/Dockerfile-dind.template
# FROM docker:dind
FROM docker:25.0.3-dind

# install Python
RUN apk add --update --no-cache python3 && ln -sf python3 /usr/bin/python
RUN apk add gcc musl-dev linux-headers python3-dev jq
RUN apk add --no-cache python3 py3-pip
RUN apk add vim

# install CWL libraries
RUN mkdir /usr/share/cwl \
&& cd /usr/share/cwl \
&& python -m venv venv \
&& source venv/bin/activate \
&& pip install cwltool cwl-runner docker boto3 awscli pyyaml

# install nodejs to parse Javascript in CWL files
RUN apk add --no-cache nodejs npm

# script to execute a generic CWL workflow with arguments
COPY docker_cwl_entrypoint_modular.sh /usr/share/cwl/docker_cwl_entrypoint_modular.sh

WORKDIR /usr/share/cwl
ENTRYPOINT ["/usr/share/cwl/docker_cwl_entrypoint_modular.sh"]
Loading
Loading