From 5f06d1218bd8760510c79c45d8152052d2ab3a32 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Sat, 24 Aug 2024 02:03:29 -0600 Subject: [PATCH 1/2] preparing for build 3 review --- deploy.py | 37 ++++++++++++ punchpipe/controlsegment/db.py | 2 +- punchpipe/controlsegment/processor.py | 5 ++ punchpipe/controlsegment/scheduler.py | 33 +++++----- punchpipe/flows/level1.py | 8 +-- punchpipe/flows/level3.py | 86 +++++++++++++++++++++++++++ scripts/create_db.py | 6 +- 7 files changed, 152 insertions(+), 25 deletions(-) create mode 100644 deploy.py diff --git a/deploy.py b/deploy.py new file mode 100644 index 0000000..3c08ca7 --- /dev/null +++ b/deploy.py @@ -0,0 +1,37 @@ +from prefect import serve +from punchpipe.controlsegment.launcher import launcher_flow +from punchpipe.flows.level1 import level1_process_flow, level1_scheduler_flow +from punchpipe.flows.level2 import level2_process_flow, level2_scheduler_flow +from punchpipe.flows.level3 import level3_process_flow, level3_scheduler_flow + +launcher_deployment = launcher_flow.to_deployment(name="launcher-deployment", + description="Launch a pipeline segment.", + cron="* * * * *", + ) + +level1_scheduler_deployment = level1_scheduler_flow.to_deployment(name="level1-scheduler-deployment", + description="Schedule a Level 1 flow.", + cron="* * * * *", + ) +level1_process_deployment = level1_process_flow.to_deployment(name="level1_process_flow", + description="Process a file from Level 0 to Level 1.") + +level2_scheduler_deployment = level2_scheduler_flow.to_deployment(name="level2-scheduler-deployment", + description="Schedule a Level 2 flow.", + cron="* * * * *", + ) +level2_process_deployment = level2_process_flow.to_deployment(name="level2_process_flow", + description="Process files from Level 1 to Level 2.") + +level3_scheduler_deployment = level3_scheduler_flow.to_deployment(name="level3-scheduler-deployment", + description="Schedule a Level 3 flow.", + cron="* * * * *", + ) +level3_process_deployment = level3_process_flow.to_deployment(name="level3_process_flow", + description="Process files from Level 2 to Level 3.") + + +serve(launcher_deployment, + level1_scheduler_deployment, level1_process_deployment, + level2_scheduler_deployment, level2_process_deployment, + level3_scheduler_deployment, level3_process_deployment) diff --git a/punchpipe/controlsegment/db.py b/punchpipe/controlsegment/db.py index fd1cfed..3b05282 100644 --- a/punchpipe/controlsegment/db.py +++ b/punchpipe/controlsegment/db.py @@ -48,7 +48,7 @@ def directory(self, root: str): str the place to write the file """ - return os.path.join(root, str(self.level), self.file_type, self.date_obs.strftime("%Y/%m/%d")) + return os.path.join(root, str(self.level), self.file_type + self.observatory, self.date_obs.strftime("%Y/%m/%d")) class Flow(Base): diff --git a/punchpipe/controlsegment/processor.py b/punchpipe/controlsegment/processor.py index 9dddde2..a180409 100644 --- a/punchpipe/controlsegment/processor.py +++ b/punchpipe/controlsegment/processor.py @@ -1,6 +1,7 @@ import json from datetime import datetime +from prefect import get_run_logger from prefect.context import get_run_context from punchpipe.controlsegment.db import File, Flow @@ -15,12 +16,14 @@ def generic_process_flow_logic(flow_id: int, core_flow_to_launch, pipeline_config_path: str, session=None): if session is None: session = get_database_session() + logger = get_run_logger() # load pipeline configuration pipeline_config = load_pipeline_configuration(pipeline_config_path) # fetch the appropriate flow db entry flow_db_entry = session.query(Flow).where(Flow.flow_id == flow_id).one() + logger.info(f"Running on flow db entry with id={flow_db_entry.flow_id}.") # update the processing flow name with the flow run name from Prefect flow_run_context = get_run_context() @@ -43,10 +46,12 @@ def generic_process_flow_logic(flow_id: int, core_flow_to_launch, pipeline_confi flow_call_data = json.loads(flow_db_entry.call_data) output_file_ids = set() expected_file_ids = {entry.file_id for entry in file_db_entry_list} + logger.info(f"Expecting to output files with ids={expected_file_ids}.") try: results = core_flow_to_launch(**flow_call_data) for result in results: file_db_entry = match_data_with_file_db_entry(result, file_db_entry_list) + logger.info(f"Preparing to write {file_db_entry.file_id}.") output_file_ids.add(file_db_entry.file_id) write_file(result, file_db_entry, pipeline_config) diff --git a/punchpipe/controlsegment/scheduler.py b/punchpipe/controlsegment/scheduler.py index 78e9b2c..046d2e7 100644 --- a/punchpipe/controlsegment/scheduler.py +++ b/punchpipe/controlsegment/scheduler.py @@ -13,30 +13,31 @@ def generic_scheduler_flow_logic( session = get_database_session() # find all files that are ready to run - parent_files = [] ready_file_ids = query_ready_files_func(session, pipeline_config) if ready_file_ids: for file_id in ready_file_ids: + parent_files = [] + # mark the file as progressed so that there aren't duplicate processing flows update_file_state(session, file_id, "progressed") # get the prior level file's information parent_files += session.query(File).where(File.file_id == file_id).all() - # prepare the new level flow and file - children_files = construct_child_file_info(parent_files, pipeline_config) - database_flow_info = construct_child_flow_info(parent_files, children_files, pipeline_config) - session.add(*children_files) - session.add(database_flow_info) - session.commit() - - # set the processing flow now that we know the flow_id after committing the flow info - for child_file in children_files: - child_file.processing_flow = database_flow_info.flow_id - session.commit() + # prepare the new level flow and file + children_files = construct_child_file_info(parent_files, pipeline_config) + database_flow_info = construct_child_flow_info(parent_files, children_files, pipeline_config) + session.add(*children_files) + session.add(database_flow_info) + session.commit() - # create a file relationship between the prior and next levels - for parent_file in parent_files: + # set the processing flow now that we know the flow_id after committing the flow info for child_file in children_files: - session.add(FileRelationship(parent=parent_file.file_id, child=child_file.file_id)) - session.commit() + child_file.processing_flow = database_flow_info.flow_id + session.commit() + + # create a file relationship between the prior and next levels + for parent_file in parent_files: + for child_file in children_files: + session.add(FileRelationship(parent=parent_file.file_id, child=child_file.file_id)) + session.commit() diff --git a/punchpipe/flows/level1.py b/punchpipe/flows/level1.py index 2b4ad00..b50248f 100644 --- a/punchpipe/flows/level1.py +++ b/punchpipe/flows/level1.py @@ -30,10 +30,10 @@ def level1_construct_flow_info(level0_files: File, level1_files: File, pipeline_ os.path.join(level0_file.directory(pipeline_config["root"]), level0_file.filename()) for level0_file in level0_files ], - "output_filename": [ - os.path.join(level1_file.directory(pipeline_config["root"]), level1_file.filename()) - for level1_file in level1_files - ], + # "output_filename": [ + # os.path.join(level1_file.directory(pipeline_config["root"]), level1_file.filename()) + # for level1_file in level1_files + # ], } ) return Flow( diff --git a/punchpipe/flows/level3.py b/punchpipe/flows/level3.py index e69de29..cf46518 100644 --- a/punchpipe/flows/level3.py +++ b/punchpipe/flows/level3.py @@ -0,0 +1,86 @@ +import json +import os +import typing as t +from datetime import datetime, timedelta + +from prefect import flow, task +from punchbowl.level3.flow import level3_core_flow +from sqlalchemy import and_ + +from punchpipe import __version__ +from punchpipe.controlsegment.db import File, Flow +from punchpipe.controlsegment.processor import generic_process_flow_logic +from punchpipe.controlsegment.scheduler import generic_scheduler_flow_logic + + +@task +def level3_query_ready_files(session, pipeline_config: dict): + latency = pipeline_config["levels"]["level3_process_flow"]["schedule"]["latency"] + window_duration = pipeline_config["levels"]["level3_process_flow"]["schedule"]["window_duration_seconds"] + start_time = datetime.now() - timedelta(minutes=latency + window_duration) + end_time = datetime.now() - timedelta(minutes=latency) + return [ + f.file_id + for f in session.query(File) + .where(and_(File.state == "created", File.level == 2, File.date_obs > start_time, File.date_obs < end_time)) + .all() + ] + + +@task +def level3_construct_flow_info(level2_files: File, level3_file: File, pipeline_config: dict): + flow_type = "level3_process_flow" + state = "planned" + creation_time = datetime.now() + priority = pipeline_config["levels"][flow_type]["priority"]["initial"] + call_data = json.dumps( + { + "data_list": [ + os.path.join(level2_file.directory(pipeline_config["root"]), level2_file.filename()) + for level2_file in level2_files + ] + } + ) + return Flow( + flow_type=flow_type, + state=state, + flow_level=3, + creation_time=creation_time, + priority=priority, + call_data=call_data, + ) + + +@task +def level3_construct_file_info(level2_files: t.List[File], pipeline_config: dict) -> t.List[File]: + out_files = [] + for level2_file in level2_files: + out_files.append( + File( + level=3, + file_type=level2_file.file_type, + observatory=level2_file.observatory, + file_version=pipeline_config["file_version"], + software_version=__version__, + date_obs=level2_file.date_obs, + polarization=level2_file.polarization, + state="planned", + ) + ) + return out_files + + +@flow +def level3_scheduler_flow(pipeline_config_path="config.yaml", session=None): + generic_scheduler_flow_logic( + level3_query_ready_files, + level3_construct_file_info, + level3_construct_flow_info, + pipeline_config_path, + session=session, + ) + + +@flow +def level3_process_flow(flow_id: int, pipeline_config_path="config.yaml", session=None): + generic_process_flow_logic(flow_id, level3_core_flow, pipeline_config_path, session=session) diff --git a/scripts/create_db.py b/scripts/create_db.py index 77a28ea..6357fe4 100644 --- a/scripts/create_db.py +++ b/scripts/create_db.py @@ -5,8 +5,6 @@ if __name__ == "__main__": - # Base = declarative_base() - credentials = DatabaseCredentials.load("mysql-cred") - engine = create_engine( - f'mysql+pymysql://{credentials.user}:{credentials.password.get_secret_value()}@localhost/punchpipe') + credentials = DatabaseCredentials.load("mariadb-creds") + engine = credentials.get_engine() Base.metadata.create_all(engine) From 103d124ae91e2f9bf553c871bba464d22ba6ddeb Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Sat, 24 Aug 2024 12:59:22 -0600 Subject: [PATCH 2/2] preparing for build 3 review --- deploy.py | 15 +++++----- punchpipe/controlsegment/scheduler.py | 12 ++++---- punchpipe/flows/level1.py | 28 +++++++++++++++---- punchpipe/flows/level2.py | 40 ++++++++++++--------------- 4 files changed, 53 insertions(+), 42 deletions(-) diff --git a/deploy.py b/deploy.py index 3c08ca7..67ba404 100644 --- a/deploy.py +++ b/deploy.py @@ -23,15 +23,16 @@ level2_process_deployment = level2_process_flow.to_deployment(name="level2_process_flow", description="Process files from Level 1 to Level 2.") -level3_scheduler_deployment = level3_scheduler_flow.to_deployment(name="level3-scheduler-deployment", - description="Schedule a Level 3 flow.", - cron="* * * * *", - ) -level3_process_deployment = level3_process_flow.to_deployment(name="level3_process_flow", - description="Process files from Level 2 to Level 3.") +# level3_scheduler_deployment = level3_scheduler_flow.to_deployment(name="level3-scheduler-deployment", +# description="Schedule a Level 3 flow.", +# cron="* * * * *", +# ) +# level3_process_deployment = level3_process_flow.to_deployment(name="level3_process_flow", +# description="Process files from Level 2 to Level 3.") serve(launcher_deployment, level1_scheduler_deployment, level1_process_deployment, level2_scheduler_deployment, level2_process_deployment, - level3_scheduler_deployment, level3_process_deployment) + # level3_scheduler_deployment, level3_process_deployment + ) diff --git a/punchpipe/controlsegment/scheduler.py b/punchpipe/controlsegment/scheduler.py index 046d2e7..9be41f2 100644 --- a/punchpipe/controlsegment/scheduler.py +++ b/punchpipe/controlsegment/scheduler.py @@ -15,14 +15,14 @@ def generic_scheduler_flow_logic( # find all files that are ready to run ready_file_ids = query_ready_files_func(session, pipeline_config) if ready_file_ids: - for file_id in ready_file_ids: + for group in ready_file_ids: parent_files = [] + for file_id in group: + # mark the file as progressed so that there aren't duplicate processing flows + update_file_state(session, file_id, "progressed") - # mark the file as progressed so that there aren't duplicate processing flows - update_file_state(session, file_id, "progressed") - - # get the prior level file's information - parent_files += session.query(File).where(File.file_id == file_id).all() + # get the prior level file's information + parent_files += session.query(File).where(File.file_id == file_id).all() # prepare the new level flow and file children_files = construct_child_file_info(parent_files, pipeline_config) diff --git a/punchpipe/flows/level1.py b/punchpipe/flows/level1.py index b50248f..56024f6 100644 --- a/punchpipe/flows/level1.py +++ b/punchpipe/flows/level1.py @@ -2,6 +2,7 @@ import os import typing as t from datetime import datetime +from pathlib import Path from prefect import flow, task from punchbowl.level1.flow import level1_core_flow @@ -15,11 +16,28 @@ @task def level1_query_ready_files(session, pipeline_config: dict): - return [f.file_id for f in session.query(File).where(and_(File.state == "created", File.level == 0)).all()] + return [[f.file_id] for f in session.query(File).where(and_(File.state == "created", File.level == 0)).all()] +# TODO handle more robustly @task -def level1_construct_flow_info(level0_files: File, level1_files: File, pipeline_config: dict): +def get_vignetting_function(level0_file): + observatory = int(level0_file.observatory) + if observatory < 4: + vignetting_function_path = "/Users/jhughes/Desktop/repos/simpunch/PUNCH_L1_GM1_20240817174727_v2.fits" + else: + vignetting_function_path = "/Users/jhughes/Desktop/repos/simpunch/PUNCH_L1_GM4_20240819045110_v1.fits" + return vignetting_function_path + + +# TODO handle more robustly +@task +def get_psf_model_path(level0_file): + return "/Users/jhughes/Desktop/repos/punchbowl/test_run/synthetic_forward_psf.h5" + + +@task +def level1_construct_flow_info(level0_files: list[File], level1_files: File, pipeline_config: dict): flow_type = "level1_process_flow" state = "planned" creation_time = datetime.now() @@ -30,10 +48,8 @@ def level1_construct_flow_info(level0_files: File, level1_files: File, pipeline_ os.path.join(level0_file.directory(pipeline_config["root"]), level0_file.filename()) for level0_file in level0_files ], - # "output_filename": [ - # os.path.join(level1_file.directory(pipeline_config["root"]), level1_file.filename()) - # for level1_file in level1_files - # ], + "vignetting_function_path": get_vignetting_function(level0_files[0]), + "psf_model_path": get_psf_model_path(level0_files[0]), } ) return Flow( diff --git a/punchpipe/flows/level2.py b/punchpipe/flows/level2.py index 803662f..1d2f50a 100644 --- a/punchpipe/flows/level2.py +++ b/punchpipe/flows/level2.py @@ -3,7 +3,7 @@ import typing as t from datetime import datetime, timedelta -from prefect import flow, task +from prefect import flow, task, get_run_logger from punchbowl.level2.flow import level2_core_flow from sqlalchemy import and_ @@ -15,20 +15,20 @@ @task def level2_query_ready_files(session, pipeline_config: dict): - latency = pipeline_config["levels"]["level2_process_flow"]["schedule"]["latency"] - window_duration = pipeline_config["levels"]["level2_process_flow"]["schedule"]["window_duration_seconds"] - start_time = datetime.now() - timedelta(minutes=latency + window_duration) - end_time = datetime.now() - timedelta(minutes=latency) - return [ - f.file_id - for f in session.query(File) - .where(and_(File.state == "created", File.level == 1, File.date_obs > start_time, File.date_obs < end_time)) - .all() - ] + logger = get_run_logger() + all_ready_files = session.query(File).where(and_(File.state == "created", File.level == 1)).all() + logger.info(f"{len(all_ready_files)} ready files") + unique_times = set(f.date_obs for f in all_ready_files) + logger.info(f"{len(unique_times)} unique times: {unique_times}") + grouped_ready_files = [[f.file_id for f in all_ready_files if f.date_obs == time] for time in unique_times] + logger.info(f"{len(grouped_ready_files)} grouped ready files") + out = [g for g in grouped_ready_files if len(g) == 12] + logger.info(f"{len(out)} groups heading out") + return out @task -def level2_construct_flow_info(level1_files: File, level2_file: File, pipeline_config: dict): +def level2_construct_flow_info(level1_files: list[File], level2_file: File, pipeline_config: dict): flow_type = "level2_process_flow" state = "planned" creation_time = datetime.now() @@ -54,21 +54,15 @@ def level2_construct_flow_info(level1_files: File, level2_file: File, pipeline_c @task def level2_construct_file_info(level1_files: t.List[File], pipeline_config: dict) -> t.List[File]: # TODO: make realistic to level 2 products - out_files = [] - for level1_file in level1_files: - out_files.append( - File( + return [File( level=2, - file_type=level1_file.file_type, - observatory=level1_file.observatory, + file_type="PT", + observatory="M", file_version=pipeline_config["file_version"], software_version=__version__, - date_obs=level1_file.date_obs, - polarization=level1_file.polarization, + date_obs=level1_files[0].date_obs, state="planned", - ) - ) - return out_files + )] @flow