Skip to content

Commit

Permalink
Merge pull request #149 from unity-sds/24.2-fix-tests
Browse files Browse the repository at this point in the history
Fix Tests before 24.2 Release
  • Loading branch information
LucaCinquini authored Jul 3, 2024
2 parents 0859baa + 0b47d6c commit 218516b
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 93 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/integration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ jobs:
pip install -e ".[test]"
- name: MCP Venue Dev - Integration tests
id: mcp_venue_dev_integration_tests
continue-on-error: true
env:
AIRFLOW_WEBSERVER_PASSWORD: ${{ secrets.MCP_VENUE_DEV_AIRFLOW_WEBSERVER_PASSWORD }}
Expand All @@ -39,6 +40,7 @@ jobs:
--airflow-endpoint=${{ github.event.inputs.MCP_VENUE_DEV_AIRFLOW_ENDPOINT || vars.MCP_VENUE_DEV_AIRFLOW_ENDPOINT }}
# - name: MCP Venue Test - Integration tests
# id: mcp_venue_test_integration_tests
# continue-on-error: true
# run: |
# pytest -vv --gherkin-terminal-reporter \
Expand All @@ -58,7 +60,7 @@ jobs:
exit 1
fi
# Uncomment this block if MCP Venue Test Integration tests are re-enabled
# Uncomment this block when MCP Venue Test Integration tests are re-enabled
# if [ "$test_status" != "success" ]; then
# echo "MCP Venue Test Integration Tests failed."
# exit 1
Expand Down
32 changes: 15 additions & 17 deletions .github/workflows/smoke_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,17 @@ jobs:
--airflow-endpoint=${{ github.event.inputs.MCP_VENUE_DEV_AIRFLOW_ENDPOINT || vars.MCP_VENUE_DEV_AIRFLOW_ENDPOINT }} \
--ogc-processes-endpoint=${{ github.event.inputs.MCP_VENUE_DEV_OGC_PROCESSES_ENDPOINT || vars.MCP_VENUE_DEV_OGC_PROCESSES_ENDPOINT }}
- name: MCP Venue Test - Smoke tests
id: mcp_venue_test_smoke_tests
env:
AIRFLOW_WEBSERVER_PASSWORD: ${{ secrets.MCP_VENUE_TEST_AIRFLOW_WEBSERVER_PASSWORD }}
continue-on-error: true
run: |
pytest -vv --gherkin-terminal-reporter \
unity-test/system/smoke \
--airflow-endpoint=${{ github.event.inputs.MCP_VENUE_TEST_AIRFLOW_ENDPOINT || vars.MCP_VENUE_TEST_AIRFLOW_ENDPOINT }} \
--ogc-processes-endpoint=${{ github.event.inputs.MCP_VENUE_TEST_OGC_PROCESSES_ENDPOINT || vars.MCP_VENUE_TEST_OGC_PROCESSES_ENDPOINT }}
# Temporary: comment out checks on MCP venue test until the SPS is redeployed
# - name: MCP Venue Test - Smoke tests
# id: mcp_venue_test_smoke_tests
# env:
# AIRFLOW_WEBSERVER_PASSWORD: ${{ secrets.MCP_VENUE_TEST_AIRFLOW_WEBSERVER_PASSWORD }}
# continue-on-error: true
# run: |
# pytest -vv --gherkin-terminal-reporter \
# unity-test/system/smoke \
# --airflow-endpoint=${{ github.event.inputs.MCP_VENUE_TEST_AIRFLOW_ENDPOINT || vars.MCP_VENUE_TEST_AIRFLOW_ENDPOINT }} \
# --ogc-processes-endpoint=${{ github.event.inputs.MCP_VENUE_TEST_OGC_PROCESSES_ENDPOINT || vars.MCP_VENUE_TEST_OGC_PROCESSES_ENDPOINT }}

- name: MCP Venue Ops - Smoke tests
id: mcp_venue_ops_smoke_tests
Expand All @@ -77,8 +78,9 @@ jobs:
pytest -vv --gherkin-terminal-reporter \
unity-test/system/smoke/step_defs/test_airflow_api_health.py \
--airflow-endpoint=${{ github.event.inputs.MCP_VENUE_OPS_AIRFLOW_ENDPOINT || vars.MCP_VENUE_OPS_AIRFLOW_ENDPOINT }} \
--ogc-processes-endpoint=${{ github.event.inputs.MCP_VENUE_OPS_OGC_PROCESSES_ENDPOINT || vars.MCP_VENUE_OPS_OGC_PROCESSES_ENDPOINT }}
- name: MCP Venue SBG Dev - Smoke tests
- name: MCP SBG DEV - Smoke tests
id: mcp_sbg_dev_smoke_tests
env:
AIRFLOW_WEBSERVER_PASSWORD: ${{ secrets.MCP_VENUE_SBG_DEV_AIRFLOW_WEBSERVER_PASSWORD }}
Expand All @@ -94,22 +96,18 @@ jobs:
if: always()
run: |
dev_status=${{ steps.mcp_venue_dev_smoke_tests.outcome }}
test_status=${{ steps.mcp_venue_test_smoke_tests.outcome }}
ops_status=${{ steps.mcp_venue_ops_smoke_tests.outcome }}
sbg_dev_status=${{ steps.mcp_sbg_dev_smoke_tests.outcome }}
echo "Dev Smoke Tests: $dev_status"
echo "Test Smoke Tests: $test_status"
echo "Ops Smoke Tests: $ops_status"
echo "SBG Dev Smoke Tests: $sbg_dev_status"
if [ "$dev_status" != "success" ] || [ "$test_status" != "success" ] || [ "$ops_status" != "success" ] || [ "$sbg_dev_status" != "success" ]; then
# FIXME: must re-enable [ "$test_status" != "success" ]
if [ "$dev_status" != "success" ] || [ "$ops_status" != "success" ] || [ "$sbg_dev_status" != "success" ]; then
echo "One or more smoke tests failed."
if [ "$dev_status" != "success" ]; then
echo "MCP Venue Dev Smoke Tests failed."
fi
if [ "$test_status" != "success" ]; then
echo "MCP Venue Test Smoke Tests failed."
fi
if [ "$ops_status" != "success" ]; then
echo "MCP Venue Ops Smoke Tests failed."
fi
Expand Down
12 changes: 7 additions & 5 deletions airflow/dags/cwl_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,20 @@
DEFAULT_CWL_ARGUMENTS = json.dumps({"message": "Hello Unity"})

# Alternative arguments to execute SBG Pre-Process
# DEFAULT_CWL_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.cwl"
# DEFAULT_CWL_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.cwl"
# DEFAULT_CWL_ARGUMENTS = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.dev.yml"

# Alternative arguments to execute SBG end-to-end
# DEFAULT_CWL_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.cwl"
# DEFAULT_CWL_ARGUMENTS = json.dumps({"input_processing_labels": ["label1", "label2"], "input_cmr_stac": "https://cmr.earthdata.nasa.gov/search/granules.stac?collection_concept_id=C2408009906-LPCLOUD&temporal[]=2023-08-10T03:41:03.000Z,2023-08-10T03:41:03.000Z", "input_unity_dapa_client": "40c2s0ulbhp9i0fmaph3su9jch", "input_unity_dapa_api": "https://d3vc8w9zcq658.cloudfront.net", "input_crid": "001", "output_collection_id": "urn:nasa:unity:unity:dev:SBG-L1B_PRE___1", "output_data_bucket": "sps-dev-ds-storage"})
# DEFAULT_CWL_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/L1-to-L2-e2e.cwl"
# DEFAULT_CWL_ARGUMENTS = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/L1-to-L2-e2e.dev.yml"

# Alternative arguments to execute SBG end-to-end
# unity_sps_sbg_debug.txt
CONTAINER_RESOURCES = k8s.V1ResourceRequirements(
requests={
# "cpu": "2660m", # 2.67 vCPUs, specified in milliCPUs
# "memory": "22Gi", # Rounded to 22 GiB for easier specification
"ephemeral-storage": "30Gi"
"ephemeral-storage": "10Gi"
},
# limits={
# # "cpu": "2660m", # Optional: set the same as requests if you want a fixed allocation
Expand Down Expand Up @@ -120,7 +122,7 @@ def setup(ti=None, **context):
startup_timeout_seconds=1800,
arguments=["{{ params.cwl_workflow }}", "{{ params.cwl_args }}"],
container_security_context={"privileged": True},
# container_resources=CONTAINER_RESOURCES,
container_resources=CONTAINER_RESOURCES,
container_logs=True,
volume_mounts=[
k8s.V1VolumeMount(name="workers-volume", mount_path=WORKING_DIR, sub_path="{{ dag_run.run_id }}")
Expand Down
20 changes: 10 additions & 10 deletions airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,15 +365,15 @@ def setup(ti=None, **context):
"https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.cwl"
)
preprocess_task = KubernetesPodOperator(
retries=5,
retries=0,
task_id="SBG_Preprocess",
namespace=POD_NAMESPACE,
name="sbg-preprocess-pod",
image=SPS_DOCKER_CWL_IMAGE,
service_account_name="airflow-worker",
in_cluster=True,
get_logs=True,
startup_timeout_seconds=1200,
startup_timeout_seconds=1800,
arguments=[SBG_PREPROCESS_CWL, "{{ti.xcom_pull(task_ids='Setup', key='preprocess_args')}}"],
container_security_context={"privileged": True},
container_resources=CONTAINER_RESOURCES,
Expand Down Expand Up @@ -403,15 +403,15 @@ def setup(ti=None, **context):
)
isofit_task = KubernetesPodOperator(
# wait_until_job_complete=True,
retries=5,
retries=0,
task_id="SBG_Isofit",
namespace=POD_NAMESPACE,
name="sbg-isofit",
image=SPS_DOCKER_CWL_IMAGE,
service_account_name="airflow-worker",
in_cluster=True,
get_logs=True,
startup_timeout_seconds=1200,
startup_timeout_seconds=1800,
arguments=[SBG_ISOFIT_CWL, "{{ti.xcom_pull(task_ids='Setup', key='isofit_args')}}"],
container_security_context={"privileged": True},
container_resources=CONTAINER_RESOURCES,
Expand Down Expand Up @@ -441,15 +441,15 @@ def setup(ti=None, **context):
)
resample_task = KubernetesPodOperator(
# wait_until_job_complete=True,=True,
retries=5,
retries=0,
task_id="SBG_Resample",
namespace=POD_NAMESPACE,
name="sbg-resample-pod",
image=SPS_DOCKER_CWL_IMAGE,
service_account_name="airflow-worker",
in_cluster=True,
get_logs=True,
startup_timeout_seconds=1200,
startup_timeout_seconds=1800,
arguments=[SBG_RESAMPLE_CWL, "{{ti.xcom_pull(task_ids='Setup', key='resample_args')}}"],
container_security_context={"privileged": True},
container_resources=CONTAINER_RESOURCES,
Expand Down Expand Up @@ -477,15 +477,15 @@ def setup(ti=None, **context):
SBG_REFLECT_CORRECT_CWL = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/reflect-correct/sbg-reflect-correct-workflow.cwl"
reflect_correct_task = KubernetesPodOperator(
# wait_until_job_complete=True,=True,
retries=5,
retries=0,
task_id="SBG_Reflect",
namespace=POD_NAMESPACE,
name="sbg-reflect-pod",
image=SPS_DOCKER_CWL_IMAGE,
service_account_name="airflow-worker",
in_cluster=True,
get_logs=True,
startup_timeout_seconds=1200,
startup_timeout_seconds=1800,
arguments=[SBG_REFLECT_CORRECT_CWL, "{{ti.xcom_pull(task_ids='Setup', key='reflect_correct_args')}}"],
container_security_context={"privileged": True},
container_resources=CONTAINER_RESOURCES,
Expand Down Expand Up @@ -515,15 +515,15 @@ def setup(ti=None, **context):
)
frcover_task = KubernetesPodOperator(
# wait_until_job_complete=True,=True,
retries=5,
retries=0,
task_id="SBG_Frcover",
namespace=POD_NAMESPACE,
name="sbg-frcover-pod",
image=SPS_DOCKER_CWL_IMAGE,
service_account_name="airflow-worker",
in_cluster=True,
get_logs=True,
startup_timeout_seconds=1200,
startup_timeout_seconds=1800,
arguments=[SBG_FRCOVER_CWL, "{{ti.xcom_pull(task_ids='Setup', key='frcover_args')}}"],
container_security_context={"privileged": True},
container_resources=CONTAINER_RESOURCES,
Expand Down
108 changes: 64 additions & 44 deletions airflow/dags/sbg_preprocess_cwl_dag.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
# DAG for executing the SBG Preprocess Workflow
# See https://github.com/unity-sds/sbg-workflows/blob/main/preprocess/sbg-preprocess-workflow.cwl
import json
import logging
import os
import shutil
import uuid
from datetime import datetime

from airflow.models.param import Param
from airflow.operators.python import PythonOperator, get_current_context
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.utils.trigger_rule import TriggerRule
from kubernetes.client import models as k8s
from unity_sps_utils import get_affinity

from airflow import DAG

# The Kubernetes Pod that executes the CWL-Docker container
# Must use elevated privileges to start/stop the Docker engine
POD_TEMPLATE_FILE = "/opt/airflow/dags/docker_cwl_pod.yaml"

# The Kubernetes namespace within which the Pod is run (it must already exist)
POD_NAMESPACE = "airflow"
POD_LABEL = "sbg_preprocess_task"
SPS_DOCKER_CWL_IMAGE = "ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.1.0"

# The path of the working directory where the CWL workflow is executed
# (aka the starting directory for cwl-runner).
Expand All @@ -31,13 +30,21 @@
"depends_on_past": False,
"start_date": datetime.utcfromtimestamp(0),
}
CWL_WORKFLOW = (
# Alternative arguments to execute SBG Pre-Process
DEFAULT_CWL_WORKFLOW = (
"https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.cwl"
)
CMR_STAC = "https://cmr.earthdata.nasa.gov/search/granules.stac?collection_concept_id=C2408009906-LPCLOUD&temporal[]=2023-08-10T03:41:03.000Z,2023-08-10T03:41:03.000Z"
DEFAULT_CWL_ARGUMENTS = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.dev.yml"
# CMR_STAC = "https://cmr.earthdata.nasa.gov/search/granules.stac?collection_concept_id=C2408009906-LPCLOUD&temporal[]=2023-08-10T03:41:03.000Z,2023-08-10T03:41:03.000Z"

# common parameters
CONTAINER_RESOURCES = k8s.V1ResourceRequirements(
requests={"ephemeral-storage": "5Gi"},
)
INPUT_PROCESSING_LABELS = ["SBG", "CWL"]

dag = DAG(
dag_id="sbg-preprocess-cwl-dag",
dag_id="sbg_preprocess_cwl_dag",
description="SBG Preprocess Workflow as CWL",
tags=["SBG", "Unity", "SPS", "NASA", "JPL"],
is_paused_upon_creation=False,
Expand All @@ -46,52 +53,52 @@
max_active_runs=100,
default_args=dag_default_args,
params={
"cwl_workflow": Param(CWL_WORKFLOW, type="string"),
"input_cmr_stac": Param(CMR_STAC, type="string"),
"input_unity_dapa_api": Param("https://d3vc8w9zcq658.cloudfront.net", type="string"),
"input_unity_dapa_client": Param("40c2s0ulbhp9i0fmaph3su9jch", type="string"),
"input_crid": Param("001", type="string"),
"output_collection_id": Param("urn:nasa:unity:unity:dev:SBG-L1B_PRE___1", type="string"),
"output_data_bucket": Param("sps-dev-ds-storage", type="string"),
"cwl_workflow": Param(
DEFAULT_CWL_WORKFLOW,
type="string",
title="CWL workflow",
description="The SBG Pre-process CWL workflow URL",
),
"cwl_args": Param(
DEFAULT_CWL_ARGUMENTS,
type="string",
title="CWL workflow parameters",
description="The SBG Pre-process YAML parameters URL",
),
},
)


# Task that serializes the job arguments into a JSON string
def setup(ti=None, **context):
"""
Task that creates the working directory on the shared volume.
"""
context = get_current_context()
dag_run_id = context["dag_run"].run_id
local_dir = os.path.dirname(f"/shared-task-data/{dag_run_id}")
local_dir = f"/shared-task-data/{dag_run_id}"
logging.info(f"Creating directory: {local_dir}")
os.makedirs(local_dir, exist_ok=True)

task_dict = {
"input_processing_labels": ["label1", "label2"],
"input_cmr_stac": context["params"]["input_cmr_stac"],
"input_unity_dapa_api": context["params"]["input_unity_dapa_api"],
"input_unity_dapa_client": context["params"]["input_unity_dapa_client"],
"input_crid": context["params"]["input_crid"],
"output_collection_id": context["params"]["output_collection_id"],
"output_data_bucket": context["params"]["output_data_bucket"],
}
ti.xcom_push(key="cwl_args", value=json.dumps(task_dict))
logging.info(f"Created directory: {local_dir}")


setup_task = PythonOperator(task_id="Setup", python_callable=setup, dag=dag)


# Task that executes the specific CWL workflow with the previous arguments
# Task that executes the SBG Preprocess CWL workflow
cwl_task = KubernetesPodOperator(
retries=0,
task_id="sbg_preprocess_task",
namespace=POD_NAMESPACE,
name="SBG_Preprocess_CWL",
on_finish_action="delete_pod",
hostnetwork=False,
startup_timeout_seconds=1000,
name="sbg-preprocess-pod",
image=SPS_DOCKER_CWL_IMAGE,
service_account_name="airflow-worker",
in_cluster=True,
get_logs=True,
task_id="SBG_Preprocess_CWL",
full_pod_spec=k8s.V1Pod(k8s.V1ObjectMeta(name=("sbg-preprocess-cwl-pod-" + uuid.uuid4().hex))),
pod_template_file=POD_TEMPLATE_FILE,
arguments=["{{ params.cwl_workflow }}", "{{ti.xcom_pull(task_ids='Setup', key='cwl_args')}}"],
dag=dag,
startup_timeout_seconds=1800,
arguments=["{{ params.cwl_workflow }}", "{{ params.cwl_args }}"],
container_security_context={"privileged": True},
container_resources=CONTAINER_RESOURCES,
container_logs=True,
volume_mounts=[
k8s.V1VolumeMount(name="workers-volume", mount_path=WORKING_DIR, sub_path="{{ dag_run.run_id }}")
],
Expand All @@ -101,23 +108,36 @@ def setup(ti=None, **context):
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="airflow-kpo"),
)
],
dag=dag,
node_selector={"karpenter.sh/nodepool": "airflow-kubernetes-pod-operator"},
labels={"app": POD_LABEL},
annotations={"karpenter.sh/do-not-disrupt": "true"},
affinity=get_affinity(
capacity_type=["spot"],
instance_type=["r7i.xlarge"],
anti_affinity_label=POD_LABEL,
),
on_finish_action="keep_pod",
is_delete_operator_pod=False,
)


def cleanup(**context):
"""
Tasks that deletes all data shared between Tasks
from the Kubernetes PersistentVolume
"""
dag_run_id = context["dag_run"].run_id
local_dir = f"/shared-task-data/{dag_run_id}"
if os.path.exists(local_dir):
shutil.rmtree(local_dir)
print(f"Deleted directory: {local_dir}")
logging.info(f"Deleted directory: {local_dir}")
else:
print(f"Directory does not exist, no need to delete: {local_dir}")
logging.info(f"Directory does not exist, no need to delete: {local_dir}")


cleanup_task = PythonOperator(
task_id="Cleanup",
python_callable=cleanup,
dag=dag,
task_id="Cleanup", python_callable=cleanup, dag=dag, trigger_rule=TriggerRule.ALL_DONE
)


Expand Down
Loading

0 comments on commit 218516b

Please sign in to comment.