diff --git a/airflow/dags/cwl_dag.py b/airflow/dags/cwl_dag.py index 11ed8d6b..d7eb2a93 100644 --- a/airflow/dags/cwl_dag.py +++ b/airflow/dags/cwl_dag.py @@ -86,7 +86,7 @@ def setup(ti=None, **context): cwl_task = KubernetesPodOperator( namespace=POD_NAMESPACE, name="cwl-task", - is_delete_operator_pod=True, + on_finish_action="delete_succeeded_pod", hostnetwork=False, startup_timeout_seconds=1000, get_logs=True, diff --git a/airflow/dags/docker_cwl_pod.yaml b/airflow/dags/docker_cwl_pod.yaml index b6975c19..d75dfdbc 100644 --- a/airflow/dags/docker_cwl_pod.yaml +++ b/airflow/dags/docker_cwl_pod.yaml @@ -2,13 +2,26 @@ apiVersion: v1 kind: Pod metadata: name: docker-cwl-pod + labels: + task-type: sbg-task spec: restartPolicy: Never serviceAccountName: airflow-worker + affinity: + podAntiAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + - labelSelector: + matchExpressions: + - key: task-type + operator: In + values: + - sbg-task + topologyKey: kubernetes.io/hostname + containers: - name: cwl-docker - image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.0.0-beta.3 + image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.0.0 imagePullPolicy: Always command: ["/usr/share/cwl/docker_cwl_entrypoint.sh"] securityContext: diff --git a/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py b/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py index 4290b76b..5820b160 100644 --- a/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py +++ b/airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py @@ -30,8 +30,8 @@ # Resources needed by each Task # EC2 r6a.xlarge 4vCPU 32GiB CONTAINER_RESOURCES = k8s.V1ResourceRequirements( - # limits={"memory": "4Gi", "cpu": "500m", "ephemeral-storage": "50G"}, - # requests={"memory": "2Gi", "cpu": "250m", "ephemeral-storage": "25G"}, + # limits={"memory": "16G", "cpu": "2000m", "ephemeral-storage": "50G"}, + # requests={"memory": "8G", "cpu": "1000m", "ephemeral-storage": "25G"}, limits={"ephemeral-storage": "50G"}, requests={"ephemeral-storage": "50G"}, ) @@ -53,7 +53,8 @@ is_paused_upon_creation=False, catchup=False, schedule=None, - max_active_runs=100, + max_active_runs=2, + max_active_tasks=4, default_args=dag_default_args, params={ # For step: PREPROCESS @@ -190,10 +191,10 @@ def setup(ti=None, **context): ) preprocess_task = KubernetesPodOperator( namespace=POD_NAMESPACE, - name="Preprocess", - on_finish_action="delete_pod", + name="SBG_Preprocess", + on_finish_action="delete_succeeded_pod", hostnetwork=False, - startup_timeout_seconds=1000, + startup_timeout_seconds=14400, get_logs=True, task_id="SBG_Preprocess", full_pod_spec=k8s.V1Pod(k8s.V1ObjectMeta(name=("sbg-preprocess-pod-" + uuid.uuid4().hex))), @@ -221,10 +222,10 @@ def setup(ti=None, **context): # SBG_ISOFIT_CWL = "https://raw.githubusercontent.com/LucaCinquini/sbg-workflows/devel/isofit/sbg-isofit-workflow.cwl" isofit_task = KubernetesPodOperator( namespace=POD_NAMESPACE, - name="Isofit", - on_finish_action="delete_pod", + name="SBG_Isofit", + on_finish_action="delete_succeeded_pod", hostnetwork=False, - startup_timeout_seconds=1000, + startup_timeout_seconds=14400, get_logs=True, task_id="SBG_Isofit", full_pod_spec=k8s.V1Pod(k8s.V1ObjectMeta(name=("sbg-isofit-pod-" + uuid.uuid4().hex))), @@ -252,10 +253,10 @@ def setup(ti=None, **context): # SBG_RESAMPLE_ARGS = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/resample/sbg-resample-workflow.dev.yml" resample_task = KubernetesPodOperator( namespace=POD_NAMESPACE, - name="Resample", - on_finish_action="delete_pod", + name="SBG_Resample", + on_finish_action="delete_succeeded_pod", hostnetwork=False, - startup_timeout_seconds=1000, + startup_timeout_seconds=14400, get_logs=True, task_id="SBG_Resample", full_pod_spec=k8s.V1Pod(k8s.V1ObjectMeta(name=("sbg-resample-pod-" + uuid.uuid4().hex))), @@ -285,10 +286,10 @@ def setup(ti=None, **context): # SBG_REFLECT_CORRECT_ARGS = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/reflect-correct/sbg-reflect-correct-workflow.dev.yml" reflect_correct_task = KubernetesPodOperator( namespace=POD_NAMESPACE, - name="Reflect_Correct", - on_finish_action="delete_pod", + name="SBG_Reflect_Correct", + on_finish_action="delete_succeeded_pod", hostnetwork=False, - startup_timeout_seconds=1000, + startup_timeout_seconds=14400, get_logs=True, task_id="SBG_Reflect_Correct", full_pod_spec=k8s.V1Pod(k8s.V1ObjectMeta(name=("sbg-reflect-correct-pod-" + uuid.uuid4().hex))), @@ -321,10 +322,10 @@ def setup(ti=None, **context): # SBG_FRCOVER_ARGS = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/frcover/sbg-frcover-workflow.dev.yml" frcover_task = KubernetesPodOperator( namespace=POD_NAMESPACE, - name="Frcover", - on_finish_action="delete_pod", + name="SBG_Frcover", + on_finish_action="delete_succeeded_pod", hostnetwork=False, - startup_timeout_seconds=1000, + startup_timeout_seconds=14400, get_logs=True, task_id="SBG_Frcover", full_pod_spec=k8s.V1Pod(k8s.V1ObjectMeta(name=("sbg-frcover-pod-" + uuid.uuid4().hex))), @@ -364,8 +365,8 @@ def cleanup(**context): task_id="Cleanup", python_callable=cleanup, trigger_rule=TriggerRule.ALL_DONE, - priority_weight=1, - weight_rule="upstream", + # priority_weight=1, + # weight_rule="upstream", dag=dag, )