diff --git a/punchpipe/cli.py b/punchpipe/cli.py index 6a34b74..8ea3f60 100644 --- a/punchpipe/cli.py +++ b/punchpipe/cli.py @@ -36,35 +36,55 @@ def serve_flows(configuration_path): launcher_deployment = launcher_flow.to_deployment(name="launcher-deployment", description="Launch a pipeline segment.", cron=config['launcher'].get("schedule", "* * * * *"), + parameters={"pipeline_configuration_path": configuration_path} ) level1_scheduler_deployment = level1_scheduler_flow.to_deployment(name="level1-scheduler-deployment", description="Schedule a Level 1 flow.", cron="* * * * *", + parameters={"pipeline_config_path": configuration_path} ) level1_process_deployment = level1_process_flow.to_deployment(name="level1_process_flow", - description="Process a file from Level 0 to Level 1.") + description="Process a file from Level 0 to Level 1.", + parameters={"pipeline_config_path": configuration_path} + ) level2_scheduler_deployment = level2_scheduler_flow.to_deployment(name="level2-scheduler-deployment", description="Schedule a Level 2 flow.", cron="* * * * *", + parameters={ + "pipeline_config_path": configuration_path} + ) level2_process_deployment = level2_process_flow.to_deployment(name="level2_process_flow", - description="Process files from Level 1 to Level 2.") + description="Process files from Level 1 to Level 2.", + parameters={"pipeline_config_path": configuration_path} + ) levelq_scheduler_deployment = levelq_scheduler_flow.to_deployment(name="levelq-scheduler-deployment", description="Schedule a Level Q flow.", cron="* * * * *", + parameters={ + "pipeline_config_path": configuration_path} + ) levelq_process_deployment = levelq_process_flow.to_deployment(name="levelq_process_flow", - description="Process files from Level 1 to Level Q.") + description="Process files from Level 1 to Level Q.", + parameters={"pipeline_config_path": configuration_path} + ) level3_PTM_scheduler_deployment = level3_PTM_scheduler_flow.to_deployment(name="level3-PTM-scheduler-deployment", description="Schedule a Level 3 flow to make PTM.", cron="* * * * *", + parameters={ + "pipeline_config_path": configuration_path} + ) level3_PTM_process_deployment = level3_PTM_process_flow.to_deployment(name="level3_PTM_process_flow", - description="Process PTM files from Level 2 to Level 3.") + description="Process PTM files from Level 2 to Level 3.", + parameters={ + "pipeline_config_path": configuration_path} + ) health_deployment = update_machine_health_stats.to_deployment(name="update-health-stats-deployment", description="Update the health stats table data.", diff --git a/punchpipe/control/launcher.py b/punchpipe/control/launcher.py index 9d0bb71..baa9dd1 100644 --- a/punchpipe/control/launcher.py +++ b/punchpipe/control/launcher.py @@ -88,7 +88,7 @@ async def launch_ready_flows(session: Session, flow_ids: List[int]) -> List: @flow -async def launcher_flow(pipeline_configuration_path="config.yaml"): +async def launcher_flow(pipeline_configuration_path=None): """The main launcher flow for Prefect, responsible for identifying flows, based on priority, that are ready to run and creating flow runs for them. It also escalates long-waiting flows' priorities. @@ -100,8 +100,9 @@ async def launcher_flow(pipeline_configuration_path="config.yaml"): """ logger = get_run_logger() - config_path = await Variable.get("punchpipe_config", "punchpipe_config.yaml") - pipeline_config = load_pipeline_configuration(config_path) + if pipeline_configuration_path is None: + pipeline_configuration_path = await Variable.get("punchpipe_config", "punchpipe_config.yaml") + pipeline_config = load_pipeline_configuration(pipeline_configuration_path) logger.info("Establishing database connection") session = get_database_session() diff --git a/punchpipe/flows/level1.py b/punchpipe/flows/level1.py index a86a85a..1dee922 100644 --- a/punchpipe/flows/level1.py +++ b/punchpipe/flows/level1.py @@ -86,7 +86,7 @@ def level1_construct_file_info(level0_files: t.List[File], pipeline_config: dict @flow -def level1_scheduler_flow(pipeline_config_path="config.yaml", session=None): +def level1_scheduler_flow(pipeline_config_path=None, session=None): generic_scheduler_flow_logic( level1_query_ready_files, level1_construct_file_info, @@ -97,5 +97,5 @@ def level1_scheduler_flow(pipeline_config_path="config.yaml", session=None): @flow -def level1_process_flow(flow_id: int, pipeline_config_path="config.yaml", session=None): +def level1_process_flow(flow_id: int, pipeline_config_path=None, session=None): generic_process_flow_logic(flow_id, level1_core_flow, pipeline_config_path, session=session) diff --git a/punchpipe/flows/level2.py b/punchpipe/flows/level2.py index 8aa86c9..d80240a 100644 --- a/punchpipe/flows/level2.py +++ b/punchpipe/flows/level2.py @@ -69,7 +69,7 @@ def level2_construct_file_info(level1_files: t.List[File], pipeline_config: dict @flow -def level2_scheduler_flow(pipeline_config_path="config.yaml", session=None): +def level2_scheduler_flow(pipeline_config_path=None, session=None): generic_scheduler_flow_logic( level2_query_ready_files, level2_construct_file_info, @@ -80,5 +80,5 @@ def level2_scheduler_flow(pipeline_config_path="config.yaml", session=None): @flow -def level2_process_flow(flow_id: int, pipeline_config_path="config.yaml", session=None): +def level2_process_flow(flow_id: int, pipeline_config_path=None, session=None): generic_process_flow_logic(flow_id, level2_core_flow, pipeline_config_path, session=session) diff --git a/punchpipe/flows/level3.py b/punchpipe/flows/level3.py index 3302b64..eed20b6 100644 --- a/punchpipe/flows/level3.py +++ b/punchpipe/flows/level3.py @@ -125,7 +125,7 @@ def level3_PTM_construct_file_info(level2_files: t.List[File], pipeline_config: @flow -def level3_PTM_scheduler_flow(pipeline_config_path="config.yaml", session=None): +def level3_PTM_scheduler_flow(pipeline_config_path=None, session=None): generic_scheduler_flow_logic( level3_PTM_query_ready_files, level3_PTM_construct_file_info, @@ -136,5 +136,5 @@ def level3_PTM_scheduler_flow(pipeline_config_path="config.yaml", session=None): @flow -def level3_PTM_process_flow(flow_id: int, pipeline_config_path="config.yaml", session=None): +def level3_PTM_process_flow(flow_id: int, pipeline_config_path=None, session=None): generic_process_flow_logic(flow_id, level3_core_flow, pipeline_config_path, session=session) diff --git a/punchpipe/flows/levelq.py b/punchpipe/flows/levelq.py index e3c0d0d..521b023 100644 --- a/punchpipe/flows/levelq.py +++ b/punchpipe/flows/levelq.py @@ -76,7 +76,7 @@ def levelq_construct_file_info(level1_files: t.List[File], pipeline_config: dict @flow -def levelq_scheduler_flow(pipeline_config_path="config.yaml", session=None): +def levelq_scheduler_flow(pipeline_config_path=None, session=None): generic_scheduler_flow_logic( levelq_query_ready_files, levelq_construct_file_info, @@ -87,5 +87,5 @@ def levelq_scheduler_flow(pipeline_config_path="config.yaml", session=None): @flow -def levelq_process_flow(flow_id: int, pipeline_config_path="config.yaml", session=None): +def levelq_process_flow(flow_id: int, pipeline_config_path=None, session=None): generic_process_flow_logic(flow_id, levelq_core_flow, pipeline_config_path, session=session)