From 91476c074ae5e4fc8a36ed7f469c5aa6bfd5edf9 Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Wed, 3 Apr 2024 12:07:04 -0600 Subject: [PATCH 01/15] Adding resources to pods --- airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py b/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py index 4290b76..c2e9cf5 100644 --- a/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py +++ b/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py @@ -30,10 +30,10 @@ # Resources needed by each Task # EC2 r6a.xlarge 4vCPU 32GiB CONTAINER_RESOURCES = k8s.V1ResourceRequirements( - # limits={"memory": "4Gi", "cpu": "500m", "ephemeral-storage": "50G"}, - # requests={"memory": "2Gi", "cpu": "250m", "ephemeral-storage": "25G"}, - limits={"ephemeral-storage": "50G"}, - requests={"ephemeral-storage": "50G"}, + limits={"memory": "12GiB", "cpu": "500m", "ephemeral-storage": "50G"}, + requests={"memory": "12GiB", "cpu": "500m", "ephemeral-storage": "50G"}, + # limits={"ephemeral-storage": "50G"}, + # requests={"ephemeral-storage": "50G"}, ) # Default DAG configuration From 6374f2ef3a0525f754a94c6bb5a701857aabe912 Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Wed, 3 Apr 2024 12:49:18 -0600 Subject: [PATCH 02/15] Using G for memory --- airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py b/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py index c2e9cf5..f092079 100644 --- a/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py +++ b/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py @@ -30,8 +30,8 @@ # Resources needed by each Task # EC2 r6a.xlarge 4vCPU 32GiB CONTAINER_RESOURCES = k8s.V1ResourceRequirements( - limits={"memory": "12GiB", "cpu": "500m", "ephemeral-storage": "50G"}, - requests={"memory": "12GiB", "cpu": "500m", "ephemeral-storage": "50G"}, + limits={"memory": "8G", "cpu": "1000m", "ephemeral-storage": "50G"}, + requests={"memory": "8G", "cpu": "1000m", "ephemeral-storage": "50G"}, # limits={"ephemeral-storage": "50G"}, # requests={"ephemeral-storage": "50G"}, ) From 3d594fde662eb2359aa9a1caf67d2ac84921689b Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Wed, 3 Apr 2024 13:41:19 -0600 Subject: [PATCH 03/15] Increasingmemory to 12G --- airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py b/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py index f092079..4e1eee1 100644 --- a/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py +++ b/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py @@ -30,8 +30,8 @@ # Resources needed by each Task # EC2 r6a.xlarge 4vCPU 32GiB CONTAINER_RESOURCES = k8s.V1ResourceRequirements( - limits={"memory": "8G", "cpu": "1000m", "ephemeral-storage": "50G"}, - requests={"memory": "8G", "cpu": "1000m", "ephemeral-storage": "50G"}, + limits={"memory": "12G", "cpu": "1000m", "ephemeral-storage": "50G"}, + requests={"memory": "12G", "cpu": "1000m", "ephemeral-storage": "50G"}, # limits={"ephemeral-storage": "50G"}, # requests={"ephemeral-storage": "50G"}, ) From ce4817008120f9647a5d81b1666a99aa40d1baf0 Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Wed, 3 Apr 2024 16:13:49 -0600 Subject: [PATCH 04/15] Remove memory and cpu limits --- airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py b/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py index 4e1eee1..ae2a256 100644 --- a/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py +++ b/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py @@ -30,10 +30,10 @@ # Resources needed by each Task # EC2 r6a.xlarge 4vCPU 32GiB CONTAINER_RESOURCES = k8s.V1ResourceRequirements( - limits={"memory": "12G", "cpu": "1000m", "ephemeral-storage": "50G"}, - requests={"memory": "12G", "cpu": "1000m", "ephemeral-storage": "50G"}, - # limits={"ephemeral-storage": "50G"}, - # requests={"ephemeral-storage": "50G"}, + # limits={"memory": "12G", "cpu": "1000m", "ephemeral-storage": "50G"}, + # requests={"memory": "12G", "cpu": "1000m", "ephemeral-storage": "50G"}, + limits={"ephemeral-storage": "50G"}, + requests={"ephemeral-storage": "50G"}, ) # Default DAG configuration From 3d36dbcc6fe061c33e60f357ce63b1c035457d87 Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Thu, 4 Apr 2024 08:34:22 -0600 Subject: [PATCH 05/15] Version develop74: "memory": "24G", "cpu": "2000m", "ephemeral-storage": "50G" --- airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py b/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py index ae2a256..723c50b 100644 --- a/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py +++ b/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py @@ -30,10 +30,10 @@ # Resources needed by each Task # EC2 r6a.xlarge 4vCPU 32GiB CONTAINER_RESOURCES = k8s.V1ResourceRequirements( - # limits={"memory": "12G", "cpu": "1000m", "ephemeral-storage": "50G"}, - # requests={"memory": "12G", "cpu": "1000m", "ephemeral-storage": "50G"}, - limits={"ephemeral-storage": "50G"}, - requests={"ephemeral-storage": "50G"}, + limits={"memory": "24G", "cpu": "2000m", "ephemeral-storage": "50G"}, + requests={"memory": "24G", "cpu": "2000m", "ephemeral-storage": "50G"}, + # limits={"ephemeral-storage": "50G"}, + # requests={"ephemeral-storage": "50G"}, ) # Default DAG configuration From fedeb1705bb12dcbae0f98e1aa14eebbcfce06ca Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Thu, 4 Apr 2024 10:23:05 -0600 Subject: [PATCH 06/15] Playing with resource constraints... --- airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py b/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py index 723c50b..96138f1 100644 --- a/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py +++ b/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py @@ -30,8 +30,8 @@ # Resources needed by each Task # EC2 r6a.xlarge 4vCPU 32GiB CONTAINER_RESOURCES = k8s.V1ResourceRequirements( - limits={"memory": "24G", "cpu": "2000m", "ephemeral-storage": "50G"}, - requests={"memory": "24G", "cpu": "2000m", "ephemeral-storage": "50G"}, + limits={"memory": "16G", "cpu": "2000m", "ephemeral-storage": "50G"}, + requests={"memory": "8G", "cpu": "1000m", "ephemeral-storage": "25G"}, # limits={"ephemeral-storage": "50G"}, # requests={"ephemeral-storage": "50G"}, ) @@ -53,7 +53,7 @@ is_paused_upon_creation=False, catchup=False, schedule=None, - max_active_runs=100, + max_active_runs=2, default_args=dag_default_args, params={ # For step: PREPROCESS @@ -364,8 +364,8 @@ def cleanup(**context): task_id="Cleanup", python_callable=cleanup, trigger_rule=TriggerRule.ALL_DONE, - priority_weight=1, - weight_rule="upstream", + # priority_weight=1, + # weight_rule="upstream", dag=dag, ) From a1f2d6f63d0ed17bdd3237985ee60083557bff09 Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Thu, 4 Apr 2024 13:56:59 -0600 Subject: [PATCH 07/15] Trying a new combination of paraneters --- airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py b/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py index 96138f1..5f22146 100644 --- a/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py +++ b/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py @@ -30,10 +30,10 @@ # Resources needed by each Task # EC2 r6a.xlarge 4vCPU 32GiB CONTAINER_RESOURCES = k8s.V1ResourceRequirements( - limits={"memory": "16G", "cpu": "2000m", "ephemeral-storage": "50G"}, - requests={"memory": "8G", "cpu": "1000m", "ephemeral-storage": "25G"}, - # limits={"ephemeral-storage": "50G"}, - # requests={"ephemeral-storage": "50G"}, + # limits={"memory": "16G", "cpu": "2000m", "ephemeral-storage": "50G"}, + # requests={"memory": "8G", "cpu": "1000m", "ephemeral-storage": "25G"}, + limits={"ephemeral-storage": "50G"}, + requests={"ephemeral-storage": "50G"}, ) # Default DAG configuration @@ -54,6 +54,7 @@ catchup=False, schedule=None, max_active_runs=2, + max_active_tasks=4, default_args=dag_default_args, params={ # For step: PREPROCESS From 1dec8d3a66090c637a77b17b37c56a8776d14a80 Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Sun, 7 Apr 2024 04:49:49 -0600 Subject: [PATCH 08/15] Trying pod anti-affinity --- airflow/dags/docker_cwl_pod.yaml | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/airflow/dags/docker_cwl_pod.yaml b/airflow/dags/docker_cwl_pod.yaml index b6975c1..65ef0e3 100644 --- a/airflow/dags/docker_cwl_pod.yaml +++ b/airflow/dags/docker_cwl_pod.yaml @@ -2,10 +2,23 @@ apiVersion: v1 kind: Pod metadata: name: docker-cwl-pod + labels: + task-type: sbg-task spec: restartPolicy: Never serviceAccountName: airflow-worker + affinity: + podAntiAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + - labelSelector: + matchExpressions: + - key: task-type + operator: In + values: + - sbg-task + topologyKey: topology.kubernetes.io/zone + containers: - name: cwl-docker image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.0.0-beta.3 From 6824e748e4734a212e2fb1ac1d1cfbe65f256593 Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Mon, 8 Apr 2024 13:33:22 -0600 Subject: [PATCH 09/15] Using on_finish_action="delete_succeeded_pod" --- airflow/dags/cwl_dag.py | 2 +- airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/airflow/dags/cwl_dag.py b/airflow/dags/cwl_dag.py index 11ed8d6..d7eb2a9 100644 --- a/airflow/dags/cwl_dag.py +++ b/airflow/dags/cwl_dag.py @@ -86,7 +86,7 @@ def setup(ti=None, **context): cwl_task = KubernetesPodOperator( namespace=POD_NAMESPACE, name="cwl-task", - is_delete_operator_pod=True, + on_finish_action="delete_succeeded_pod", hostnetwork=False, startup_timeout_seconds=1000, get_logs=True, diff --git a/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py b/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py index 5f22146..bfea428 100644 --- a/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py +++ b/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py @@ -192,7 +192,7 @@ def setup(ti=None, **context): preprocess_task = KubernetesPodOperator( namespace=POD_NAMESPACE, name="Preprocess", - on_finish_action="delete_pod", + on_finish_action="delete_succeeded_pod", hostnetwork=False, startup_timeout_seconds=1000, get_logs=True, @@ -223,7 +223,7 @@ def setup(ti=None, **context): isofit_task = KubernetesPodOperator( namespace=POD_NAMESPACE, name="Isofit", - on_finish_action="delete_pod", + on_finish_action="delete_succeeded_pod", hostnetwork=False, startup_timeout_seconds=1000, get_logs=True, @@ -254,7 +254,7 @@ def setup(ti=None, **context): resample_task = KubernetesPodOperator( namespace=POD_NAMESPACE, name="Resample", - on_finish_action="delete_pod", + on_finish_action="delete_succeeded_pod", hostnetwork=False, startup_timeout_seconds=1000, get_logs=True, @@ -287,7 +287,7 @@ def setup(ti=None, **context): reflect_correct_task = KubernetesPodOperator( namespace=POD_NAMESPACE, name="Reflect_Correct", - on_finish_action="delete_pod", + on_finish_action="delete_succeeded_pod", hostnetwork=False, startup_timeout_seconds=1000, get_logs=True, @@ -323,7 +323,7 @@ def setup(ti=None, **context): frcover_task = KubernetesPodOperator( namespace=POD_NAMESPACE, name="Frcover", - on_finish_action="delete_pod", + on_finish_action="delete_succeeded_pod", hostnetwork=False, startup_timeout_seconds=1000, get_logs=True, From d341f3c8070aa9481922839fb8da1f058b47a9c6 Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Mon, 8 Apr 2024 13:36:52 -0600 Subject: [PATCH 10/15] Using startup_timeout_seconds=300 --- airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py b/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py index bfea428..5a4ccf4 100644 --- a/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py +++ b/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py @@ -194,7 +194,7 @@ def setup(ti=None, **context): name="Preprocess", on_finish_action="delete_succeeded_pod", hostnetwork=False, - startup_timeout_seconds=1000, + startup_timeout_seconds=300, get_logs=True, task_id="SBG_Preprocess", full_pod_spec=k8s.V1Pod(k8s.V1ObjectMeta(name=("sbg-preprocess-pod-" + uuid.uuid4().hex))), @@ -225,7 +225,7 @@ def setup(ti=None, **context): name="Isofit", on_finish_action="delete_succeeded_pod", hostnetwork=False, - startup_timeout_seconds=1000, + startup_timeout_seconds=300, get_logs=True, task_id="SBG_Isofit", full_pod_spec=k8s.V1Pod(k8s.V1ObjectMeta(name=("sbg-isofit-pod-" + uuid.uuid4().hex))), @@ -256,7 +256,7 @@ def setup(ti=None, **context): name="Resample", on_finish_action="delete_succeeded_pod", hostnetwork=False, - startup_timeout_seconds=1000, + startup_timeout_seconds=300, get_logs=True, task_id="SBG_Resample", full_pod_spec=k8s.V1Pod(k8s.V1ObjectMeta(name=("sbg-resample-pod-" + uuid.uuid4().hex))), @@ -289,7 +289,7 @@ def setup(ti=None, **context): name="Reflect_Correct", on_finish_action="delete_succeeded_pod", hostnetwork=False, - startup_timeout_seconds=1000, + startup_timeout_seconds=300, get_logs=True, task_id="SBG_Reflect_Correct", full_pod_spec=k8s.V1Pod(k8s.V1ObjectMeta(name=("sbg-reflect-correct-pod-" + uuid.uuid4().hex))), @@ -325,7 +325,7 @@ def setup(ti=None, **context): name="Frcover", on_finish_action="delete_succeeded_pod", hostnetwork=False, - startup_timeout_seconds=1000, + startup_timeout_seconds=300, get_logs=True, task_id="SBG_Frcover", full_pod_spec=k8s.V1Pod(k8s.V1ObjectMeta(name=("sbg-frcover-pod-" + uuid.uuid4().hex))), From bb0516bb200ef2df72b23fc3fe6488e105eeda18 Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Mon, 8 Apr 2024 14:54:35 -0600 Subject: [PATCH 11/15] Never delete the pods --- airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py b/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py index 5a4ccf4..86c4058 100644 --- a/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py +++ b/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py @@ -192,7 +192,7 @@ def setup(ti=None, **context): preprocess_task = KubernetesPodOperator( namespace=POD_NAMESPACE, name="Preprocess", - on_finish_action="delete_succeeded_pod", + # on_finish_action="delete_succeeded_pod", hostnetwork=False, startup_timeout_seconds=300, get_logs=True, @@ -223,7 +223,7 @@ def setup(ti=None, **context): isofit_task = KubernetesPodOperator( namespace=POD_NAMESPACE, name="Isofit", - on_finish_action="delete_succeeded_pod", + # on_finish_action="delete_succeeded_pod", hostnetwork=False, startup_timeout_seconds=300, get_logs=True, @@ -254,7 +254,7 @@ def setup(ti=None, **context): resample_task = KubernetesPodOperator( namespace=POD_NAMESPACE, name="Resample", - on_finish_action="delete_succeeded_pod", + # on_finish_action="delete_succeeded_pod", hostnetwork=False, startup_timeout_seconds=300, get_logs=True, @@ -287,7 +287,7 @@ def setup(ti=None, **context): reflect_correct_task = KubernetesPodOperator( namespace=POD_NAMESPACE, name="Reflect_Correct", - on_finish_action="delete_succeeded_pod", + # on_finish_action="delete_succeeded_pod", hostnetwork=False, startup_timeout_seconds=300, get_logs=True, @@ -323,7 +323,7 @@ def setup(ti=None, **context): frcover_task = KubernetesPodOperator( namespace=POD_NAMESPACE, name="Frcover", - on_finish_action="delete_succeeded_pod", + # on_finish_action="delete_succeeded_pod", hostnetwork=False, startup_timeout_seconds=300, get_logs=True, From 61aa62923b3eb9b7dacb65108024049c9ee98d06 Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Tue, 9 Apr 2024 07:09:09 -0600 Subject: [PATCH 12/15] Trying a new combinationof parameters --- .../sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py b/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py index 86c4058..42c96a8 100644 --- a/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py +++ b/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py @@ -191,10 +191,10 @@ def setup(ti=None, **context): ) preprocess_task = KubernetesPodOperator( namespace=POD_NAMESPACE, - name="Preprocess", + name="SBG_Preprocess", # on_finish_action="delete_succeeded_pod", hostnetwork=False, - startup_timeout_seconds=300, + startup_timeout_seconds=14400, get_logs=True, task_id="SBG_Preprocess", full_pod_spec=k8s.V1Pod(k8s.V1ObjectMeta(name=("sbg-preprocess-pod-" + uuid.uuid4().hex))), @@ -222,10 +222,10 @@ def setup(ti=None, **context): # SBG_ISOFIT_CWL = "https://raw.githubusercontent.com/LucaCinquini/sbg-workflows/devel/isofit/sbg-isofit-workflow.cwl" isofit_task = KubernetesPodOperator( namespace=POD_NAMESPACE, - name="Isofit", + name="SBG_Isofit", # on_finish_action="delete_succeeded_pod", hostnetwork=False, - startup_timeout_seconds=300, + startup_timeout_seconds=14400, get_logs=True, task_id="SBG_Isofit", full_pod_spec=k8s.V1Pod(k8s.V1ObjectMeta(name=("sbg-isofit-pod-" + uuid.uuid4().hex))), @@ -253,10 +253,10 @@ def setup(ti=None, **context): # SBG_RESAMPLE_ARGS = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/resample/sbg-resample-workflow.dev.yml" resample_task = KubernetesPodOperator( namespace=POD_NAMESPACE, - name="Resample", + name="SBG_Resample", # on_finish_action="delete_succeeded_pod", hostnetwork=False, - startup_timeout_seconds=300, + startup_timeout_seconds=14400, get_logs=True, task_id="SBG_Resample", full_pod_spec=k8s.V1Pod(k8s.V1ObjectMeta(name=("sbg-resample-pod-" + uuid.uuid4().hex))), @@ -286,10 +286,10 @@ def setup(ti=None, **context): # SBG_REFLECT_CORRECT_ARGS = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/reflect-correct/sbg-reflect-correct-workflow.dev.yml" reflect_correct_task = KubernetesPodOperator( namespace=POD_NAMESPACE, - name="Reflect_Correct", + name="SBG_Reflect_Correct", # on_finish_action="delete_succeeded_pod", hostnetwork=False, - startup_timeout_seconds=300, + startup_timeout_seconds=14400, get_logs=True, task_id="SBG_Reflect_Correct", full_pod_spec=k8s.V1Pod(k8s.V1ObjectMeta(name=("sbg-reflect-correct-pod-" + uuid.uuid4().hex))), @@ -322,10 +322,10 @@ def setup(ti=None, **context): # SBG_FRCOVER_ARGS = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/frcover/sbg-frcover-workflow.dev.yml" frcover_task = KubernetesPodOperator( namespace=POD_NAMESPACE, - name="Frcover", + name="SBG_Frcover", # on_finish_action="delete_succeeded_pod", hostnetwork=False, - startup_timeout_seconds=300, + startup_timeout_seconds=14400, get_logs=True, task_id="SBG_Frcover", full_pod_spec=k8s.V1Pod(k8s.V1ObjectMeta(name=("sbg-frcover-pod-" + uuid.uuid4().hex))), From 4668f64e8e05271020718f42e69f0c0a30741fe2 Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Tue, 9 Apr 2024 07:26:15 -0600 Subject: [PATCH 13/15] Using topologyKey: topology.kubernetes.io/hostname --- airflow/dags/docker_cwl_pod.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/dags/docker_cwl_pod.yaml b/airflow/dags/docker_cwl_pod.yaml index 65ef0e3..f79e78e 100644 --- a/airflow/dags/docker_cwl_pod.yaml +++ b/airflow/dags/docker_cwl_pod.yaml @@ -17,7 +17,7 @@ spec: operator: In values: - sbg-task - topologyKey: topology.kubernetes.io/zone + topologyKey: topology.kubernetes.io/hostname containers: - name: cwl-docker From aab4d928961f9d58119fbad09d4ea32b7f68f67d Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Tue, 9 Apr 2024 08:37:07 -0600 Subject: [PATCH 14/15] Fixing the topology key to kubernetes.io/hostname --- airflow/dags/docker_cwl_pod.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/dags/docker_cwl_pod.yaml b/airflow/dags/docker_cwl_pod.yaml index f79e78e..f47af59 100644 --- a/airflow/dags/docker_cwl_pod.yaml +++ b/airflow/dags/docker_cwl_pod.yaml @@ -17,7 +17,7 @@ spec: operator: In values: - sbg-task - topologyKey: topology.kubernetes.io/hostname + topologyKey: kubernetes.io/hostname containers: - name: cwl-docker From 7d9b3f0d0c369b9382723fa6a922714bdffbcbf4 Mon Sep 17 00:00:00 2001 From: Luca Cinquini Date: Tue, 9 Apr 2024 09:59:20 -0600 Subject: [PATCH 15/15] Final cleanup before issuing release 2.0.0 --- airflow/dags/docker_cwl_pod.yaml | 2 +- airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/airflow/dags/docker_cwl_pod.yaml b/airflow/dags/docker_cwl_pod.yaml index f47af59..d75dfdb 100644 --- a/airflow/dags/docker_cwl_pod.yaml +++ b/airflow/dags/docker_cwl_pod.yaml @@ -21,7 +21,7 @@ spec: containers: - name: cwl-docker - image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.0.0-beta.3 + image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.0.0 imagePullPolicy: Always command: ["/usr/share/cwl/docker_cwl_entrypoint.sh"] securityContext: diff --git a/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py b/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py index 42c96a8..5820b16 100644 --- a/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py +++ b/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py @@ -192,7 +192,7 @@ def setup(ti=None, **context): preprocess_task = KubernetesPodOperator( namespace=POD_NAMESPACE, name="SBG_Preprocess", - # on_finish_action="delete_succeeded_pod", + on_finish_action="delete_succeeded_pod", hostnetwork=False, startup_timeout_seconds=14400, get_logs=True, @@ -223,7 +223,7 @@ def setup(ti=None, **context): isofit_task = KubernetesPodOperator( namespace=POD_NAMESPACE, name="SBG_Isofit", - # on_finish_action="delete_succeeded_pod", + on_finish_action="delete_succeeded_pod", hostnetwork=False, startup_timeout_seconds=14400, get_logs=True, @@ -254,7 +254,7 @@ def setup(ti=None, **context): resample_task = KubernetesPodOperator( namespace=POD_NAMESPACE, name="SBG_Resample", - # on_finish_action="delete_succeeded_pod", + on_finish_action="delete_succeeded_pod", hostnetwork=False, startup_timeout_seconds=14400, get_logs=True, @@ -287,7 +287,7 @@ def setup(ti=None, **context): reflect_correct_task = KubernetesPodOperator( namespace=POD_NAMESPACE, name="SBG_Reflect_Correct", - # on_finish_action="delete_succeeded_pod", + on_finish_action="delete_succeeded_pod", hostnetwork=False, startup_timeout_seconds=14400, get_logs=True, @@ -323,7 +323,7 @@ def setup(ti=None, **context): frcover_task = KubernetesPodOperator( namespace=POD_NAMESPACE, name="SBG_Frcover", - # on_finish_action="delete_succeeded_pod", + on_finish_action="delete_succeeded_pod", hostnetwork=False, startup_timeout_seconds=14400, get_logs=True,