Skip to content

Commit

Permalink
Merge pull request #52 from unity-sds/devel-luca-1
Browse files Browse the repository at this point in the history
Devel luca 1
  • Loading branch information
drewm-jpl authored Apr 9, 2024
2 parents 9dedc43 + 7d9b3f0 commit 7347f7e
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 22 deletions.
2 changes: 1 addition & 1 deletion airflow/dags/cwl_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def setup(ti=None, **context):
cwl_task = KubernetesPodOperator(
namespace=POD_NAMESPACE,
name="cwl-task",
is_delete_operator_pod=True,
on_finish_action="delete_succeeded_pod",
hostnetwork=False,
startup_timeout_seconds=1000,
get_logs=True,
Expand Down
15 changes: 14 additions & 1 deletion airflow/dags/docker_cwl_pod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,26 @@ apiVersion: v1
kind: Pod
metadata:
name: docker-cwl-pod
labels:
task-type: sbg-task
spec:
restartPolicy: Never
serviceAccountName: airflow-worker

affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: task-type
operator: In
values:
- sbg-task
topologyKey: kubernetes.io/hostname

containers:
- name: cwl-docker
image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.0.0-beta.3
image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.0.0
imagePullPolicy: Always
command: ["/usr/share/cwl/docker_cwl_entrypoint.sh"]
securityContext:
Expand Down
41 changes: 21 additions & 20 deletions airflow/dags/sbg_L1_to_L2_e2e_cwl_step_by_step_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
# Resources needed by each Task
# EC2 r6a.xlarge 4vCPU 32GiB
CONTAINER_RESOURCES = k8s.V1ResourceRequirements(
# limits={"memory": "4Gi", "cpu": "500m", "ephemeral-storage": "50G"},
# requests={"memory": "2Gi", "cpu": "250m", "ephemeral-storage": "25G"},
# limits={"memory": "16G", "cpu": "2000m", "ephemeral-storage": "50G"},
# requests={"memory": "8G", "cpu": "1000m", "ephemeral-storage": "25G"},
limits={"ephemeral-storage": "50G"},
requests={"ephemeral-storage": "50G"},
)
Expand All @@ -53,7 +53,8 @@
is_paused_upon_creation=False,
catchup=False,
schedule=None,
max_active_runs=100,
max_active_runs=2,
max_active_tasks=4,
default_args=dag_default_args,
params={
# For step: PREPROCESS
Expand Down Expand Up @@ -190,10 +191,10 @@ def setup(ti=None, **context):
)
preprocess_task = KubernetesPodOperator(
namespace=POD_NAMESPACE,
name="Preprocess",
on_finish_action="delete_pod",
name="SBG_Preprocess",
on_finish_action="delete_succeeded_pod",
hostnetwork=False,
startup_timeout_seconds=1000,
startup_timeout_seconds=14400,
get_logs=True,
task_id="SBG_Preprocess",
full_pod_spec=k8s.V1Pod(k8s.V1ObjectMeta(name=("sbg-preprocess-pod-" + uuid.uuid4().hex))),
Expand Down Expand Up @@ -221,10 +222,10 @@ def setup(ti=None, **context):
# SBG_ISOFIT_CWL = "https://raw.githubusercontent.com/LucaCinquini/sbg-workflows/devel/isofit/sbg-isofit-workflow.cwl"
isofit_task = KubernetesPodOperator(
namespace=POD_NAMESPACE,
name="Isofit",
on_finish_action="delete_pod",
name="SBG_Isofit",
on_finish_action="delete_succeeded_pod",
hostnetwork=False,
startup_timeout_seconds=1000,
startup_timeout_seconds=14400,
get_logs=True,
task_id="SBG_Isofit",
full_pod_spec=k8s.V1Pod(k8s.V1ObjectMeta(name=("sbg-isofit-pod-" + uuid.uuid4().hex))),
Expand Down Expand Up @@ -252,10 +253,10 @@ def setup(ti=None, **context):
# SBG_RESAMPLE_ARGS = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/resample/sbg-resample-workflow.dev.yml"
resample_task = KubernetesPodOperator(
namespace=POD_NAMESPACE,
name="Resample",
on_finish_action="delete_pod",
name="SBG_Resample",
on_finish_action="delete_succeeded_pod",
hostnetwork=False,
startup_timeout_seconds=1000,
startup_timeout_seconds=14400,
get_logs=True,
task_id="SBG_Resample",
full_pod_spec=k8s.V1Pod(k8s.V1ObjectMeta(name=("sbg-resample-pod-" + uuid.uuid4().hex))),
Expand Down Expand Up @@ -285,10 +286,10 @@ def setup(ti=None, **context):
# SBG_REFLECT_CORRECT_ARGS = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/reflect-correct/sbg-reflect-correct-workflow.dev.yml"
reflect_correct_task = KubernetesPodOperator(
namespace=POD_NAMESPACE,
name="Reflect_Correct",
on_finish_action="delete_pod",
name="SBG_Reflect_Correct",
on_finish_action="delete_succeeded_pod",
hostnetwork=False,
startup_timeout_seconds=1000,
startup_timeout_seconds=14400,
get_logs=True,
task_id="SBG_Reflect_Correct",
full_pod_spec=k8s.V1Pod(k8s.V1ObjectMeta(name=("sbg-reflect-correct-pod-" + uuid.uuid4().hex))),
Expand Down Expand Up @@ -321,10 +322,10 @@ def setup(ti=None, **context):
# SBG_FRCOVER_ARGS = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/frcover/sbg-frcover-workflow.dev.yml"
frcover_task = KubernetesPodOperator(
namespace=POD_NAMESPACE,
name="Frcover",
on_finish_action="delete_pod",
name="SBG_Frcover",
on_finish_action="delete_succeeded_pod",
hostnetwork=False,
startup_timeout_seconds=1000,
startup_timeout_seconds=14400,
get_logs=True,
task_id="SBG_Frcover",
full_pod_spec=k8s.V1Pod(k8s.V1ObjectMeta(name=("sbg-frcover-pod-" + uuid.uuid4().hex))),
Expand Down Expand Up @@ -364,8 +365,8 @@ def cleanup(**context):
task_id="Cleanup",
python_callable=cleanup,
trigger_rule=TriggerRule.ALL_DONE,
priority_weight=1,
weight_rule="upstream",
# priority_weight=1,
# weight_rule="upstream",
dag=dag,
)

Expand Down

0 comments on commit 7347f7e

Please sign in to comment.