Skip to content

Commit

Permalink
fix config path confusion
Browse files Browse the repository at this point in the history
  • Loading branch information
jmbhughes committed Nov 17, 2024
1 parent 34f305b commit 708b35b
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 15 deletions.
28 changes: 24 additions & 4 deletions punchpipe/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,35 +36,55 @@ def serve_flows(configuration_path):
launcher_deployment = launcher_flow.to_deployment(name="launcher-deployment",
description="Launch a pipeline segment.",
cron=config['launcher'].get("schedule", "* * * * *"),
parameters={"pipeline_configuration_path": configuration_path}
)

level1_scheduler_deployment = level1_scheduler_flow.to_deployment(name="level1-scheduler-deployment",
description="Schedule a Level 1 flow.",
cron="* * * * *",
parameters={"pipeline_config_path": configuration_path}
)
level1_process_deployment = level1_process_flow.to_deployment(name="level1_process_flow",
description="Process a file from Level 0 to Level 1.")
description="Process a file from Level 0 to Level 1.",
parameters={"pipeline_config_path": configuration_path}
)

level2_scheduler_deployment = level2_scheduler_flow.to_deployment(name="level2-scheduler-deployment",
description="Schedule a Level 2 flow.",
cron="* * * * *",
parameters={
"pipeline_config_path": configuration_path}

)
level2_process_deployment = level2_process_flow.to_deployment(name="level2_process_flow",
description="Process files from Level 1 to Level 2.")
description="Process files from Level 1 to Level 2.",
parameters={"pipeline_config_path": configuration_path}
)

levelq_scheduler_deployment = levelq_scheduler_flow.to_deployment(name="levelq-scheduler-deployment",
description="Schedule a Level Q flow.",
cron="* * * * *",
parameters={
"pipeline_config_path": configuration_path}

)
levelq_process_deployment = levelq_process_flow.to_deployment(name="levelq_process_flow",
description="Process files from Level 1 to Level Q.")
description="Process files from Level 1 to Level Q.",
parameters={"pipeline_config_path": configuration_path}
)

level3_PTM_scheduler_deployment = level3_PTM_scheduler_flow.to_deployment(name="level3-PTM-scheduler-deployment",
description="Schedule a Level 3 flow to make PTM.",
cron="* * * * *",
parameters={
"pipeline_config_path": configuration_path}

)
level3_PTM_process_deployment = level3_PTM_process_flow.to_deployment(name="level3_PTM_process_flow",
description="Process PTM files from Level 2 to Level 3.")
description="Process PTM files from Level 2 to Level 3.",
parameters={
"pipeline_config_path": configuration_path}
)

health_deployment = update_machine_health_stats.to_deployment(name="update-health-stats-deployment",
description="Update the health stats table data.",
Expand Down
7 changes: 4 additions & 3 deletions punchpipe/control/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async def launch_ready_flows(session: Session, flow_ids: List[int]) -> List:


@flow
async def launcher_flow(pipeline_configuration_path="config.yaml"):
async def launcher_flow(pipeline_configuration_path=None):
"""The main launcher flow for Prefect, responsible for identifying flows, based on priority,
that are ready to run and creating flow runs for them. It also escalates long-waiting flows' priorities.
Expand All @@ -100,8 +100,9 @@ async def launcher_flow(pipeline_configuration_path="config.yaml"):
"""
logger = get_run_logger()

config_path = await Variable.get("punchpipe_config", "punchpipe_config.yaml")
pipeline_config = load_pipeline_configuration(config_path)
if pipeline_configuration_path is None:
pipeline_configuration_path = await Variable.get("punchpipe_config", "punchpipe_config.yaml")

Check warning on line 104 in punchpipe/control/launcher.py

View check run for this annotation

Codecov / codecov/patch

punchpipe/control/launcher.py#L103-L104

Added lines #L103 - L104 were not covered by tests
pipeline_config = load_pipeline_configuration(pipeline_configuration_path)

logger.info("Establishing database connection")
session = get_database_session()
Expand Down
4 changes: 2 additions & 2 deletions punchpipe/flows/level1.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def level1_construct_file_info(level0_files: t.List[File], pipeline_config: dict


@flow
def level1_scheduler_flow(pipeline_config_path="config.yaml", session=None):
def level1_scheduler_flow(pipeline_config_path=None, session=None):
generic_scheduler_flow_logic(
level1_query_ready_files,
level1_construct_file_info,
Expand All @@ -97,5 +97,5 @@ def level1_scheduler_flow(pipeline_config_path="config.yaml", session=None):


@flow
def level1_process_flow(flow_id: int, pipeline_config_path="config.yaml", session=None):
def level1_process_flow(flow_id: int, pipeline_config_path=None, session=None):
generic_process_flow_logic(flow_id, level1_core_flow, pipeline_config_path, session=session)
4 changes: 2 additions & 2 deletions punchpipe/flows/level2.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def level2_construct_file_info(level1_files: t.List[File], pipeline_config: dict


@flow
def level2_scheduler_flow(pipeline_config_path="config.yaml", session=None):
def level2_scheduler_flow(pipeline_config_path=None, session=None):
generic_scheduler_flow_logic(
level2_query_ready_files,
level2_construct_file_info,
Expand All @@ -80,5 +80,5 @@ def level2_scheduler_flow(pipeline_config_path="config.yaml", session=None):


@flow
def level2_process_flow(flow_id: int, pipeline_config_path="config.yaml", session=None):
def level2_process_flow(flow_id: int, pipeline_config_path=None, session=None):
generic_process_flow_logic(flow_id, level2_core_flow, pipeline_config_path, session=session)
4 changes: 2 additions & 2 deletions punchpipe/flows/level3.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def level3_PTM_construct_file_info(level2_files: t.List[File], pipeline_config:


@flow
def level3_PTM_scheduler_flow(pipeline_config_path="config.yaml", session=None):
def level3_PTM_scheduler_flow(pipeline_config_path=None, session=None):
generic_scheduler_flow_logic(
level3_PTM_query_ready_files,
level3_PTM_construct_file_info,
Expand All @@ -136,5 +136,5 @@ def level3_PTM_scheduler_flow(pipeline_config_path="config.yaml", session=None):


@flow
def level3_PTM_process_flow(flow_id: int, pipeline_config_path="config.yaml", session=None):
def level3_PTM_process_flow(flow_id: int, pipeline_config_path=None, session=None):
generic_process_flow_logic(flow_id, level3_core_flow, pipeline_config_path, session=session)
4 changes: 2 additions & 2 deletions punchpipe/flows/levelq.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def levelq_construct_file_info(level1_files: t.List[File], pipeline_config: dict


@flow
def levelq_scheduler_flow(pipeline_config_path="config.yaml", session=None):
def levelq_scheduler_flow(pipeline_config_path=None, session=None):
generic_scheduler_flow_logic(
levelq_query_ready_files,
levelq_construct_file_info,
Expand All @@ -87,5 +87,5 @@ def levelq_scheduler_flow(pipeline_config_path="config.yaml", session=None):


@flow
def levelq_process_flow(flow_id: int, pipeline_config_path="config.yaml", session=None):
def levelq_process_flow(flow_id: int, pipeline_config_path=None, session=None):
generic_process_flow_logic(flow_id, levelq_core_flow, pipeline_config_path, session=session)

0 comments on commit 708b35b

Please sign in to comment.