Skip to content

Commit

Permalink
[CHANGE] Removing the constraint on 1 max concurrent DAG for each type
Browse files Browse the repository at this point in the history
  • Loading branch information
LucaCinquini committed Mar 7, 2024
1 parent 1fb7c9d commit 4e27343
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 16 deletions.
2 changes: 1 addition & 1 deletion airflow/dags/cwl_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
is_paused_upon_creation=False,
catchup=False,
schedule_interval=None,
max_active_runs=1,
# max_active_runs=1,
default_args=dag_default_args,
params={
"cwl_workflow": Param(
Expand Down
2 changes: 1 addition & 1 deletion airflow/dags/sbg_L1_to_L2_e2e_cwl_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
is_paused_upon_creation=False,
catchup=False,
schedule=None,
max_active_runs=1,
# max_active_runs=1,
default_args=dag_default_args,
params={
"cwl_workflow": Param(CWL_WORKFLOW, type="string"),
Expand Down
19 changes: 6 additions & 13 deletions airflow/dags/sbg_preprocess_cwl_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,17 @@
# The Kubernetes namespace within which the Pod is run (it must already exist)
POD_NAMESPACE = "airflow"

# The path of the working directory where the CWL workflow is executed
# (aka the starting directory for cwl-runner)
WORKING_DIR = "/scratch"

# Default DAG configuration
dag_default_args = {
"owner": "unity-sps",
"depends_on_past": False,
"start_date": datetime.utcfromtimestamp(0),
}
CWL_WORKFLOW = (
"https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.cwl"
)
CWL_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.cwl"
CMR_STAC = "https://cmr.earthdata.nasa.gov/search/granules.stac?collection_concept_id=C2408009906-LPCLOUD&temporal[]=2023-08-10T03:41:03.000Z,2023-08-10T03:41:03.000Z"

dag = DAG(
Expand All @@ -39,15 +40,11 @@
is_paused_upon_creation=False,
catchup=False,
schedule=None,
max_active_runs=1,
# max_active_runs=1,
default_args=dag_default_args,
params={
"cwl_workflow": Param(CWL_WORKFLOW, type="string"),
"input_cmr_stac": Param(CMR_STAC, type="string"),
# "input_processing_labels": Param(["label1", "label2"], type="string[]"),
# "input_cmr_collection_name": Param("C2408009906-LPCLOUD", type="string"),
# "input_cmr_search_start_time": Param("2024-01-03T13:19:36.000Z", type="string"),
# "input_cmr_search_stop_time": Param("2024-01-03T13:19:36.000Z", type="string"),
"input_unity_dapa_api": Param("https://d3vc8w9zcq658.cloudfront.net", type="string"),
"input_unity_dapa_client": Param("40c2s0ulbhp9i0fmaph3su9jch", type="string"),
"input_crid": Param("001", type="string"),
Expand All @@ -67,9 +64,6 @@ def setup(ti=None, **context):
task_dict = {
"input_processing_labels": ["label1", "label2"],
"input_cmr_stac": context["params"]["input_cmr_stac"],
# "input_cmr_collection_name": context["params"]["input_cmr_collection_name"],
# "input_cmr_search_start_time": context["params"]["input_cmr_search_start_time"],
# "input_cmr_search_stop_time": context["params"]["input_cmr_search_stop_time"],
"input_unity_dapa_api": context["params"]["input_unity_dapa_api"],
"input_unity_dapa_client": context["params"]["input_unity_dapa_client"],
"input_crid": context["params"]["input_crid"],
Expand All @@ -93,8 +87,7 @@ def setup(ti=None, **context):
task_id="SBG_Preprocess_CWL",
full_pod_spec=k8s.V1Pod(k8s.V1ObjectMeta(name=("sbg-preprocess-cwl-pod-" + uuid.uuid4().hex))),
pod_template_file=POD_TEMPLATE_FILE,
arguments=["{{ params.cwl_workflow }}", "{{ti.xcom_pull(task_ids='Setup', key='cwl_args')}}", "/scratch"],
# resources={"request_memory": "512Mi", "limit_memory": "1024Mi"},
arguments=["{{ params.cwl_workflow }}", "{{ti.xcom_pull(task_ids='Setup', key='cwl_args')}}", WORKING_DIR],
dag=dag,
volume_mounts=[
k8s.V1VolumeMount(name="workers-volume", mount_path="/scratch", sub_path="{{ dag_run.run_id }}")
Expand Down
2 changes: 1 addition & 1 deletion airflow/dags/sbg_preprocess_no_cwl.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
is_paused_upon_creation=True,
catchup=False,
schedule=None,
max_active_runs=1,
# max_active_runs=1,
default_args=dag_default_args,
params={
"input_cmr_collection_name": Param("C2408009906-LPCLOUD", type="string"),
Expand Down

0 comments on commit 4e27343

Please sign in to comment.