From f397e0a7217904706b2d9ae7d4aa96a915d02b4a Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Tue, 19 Nov 2024 04:34:47 -0700 Subject: [PATCH] add starfield and f corona --- punchpipe/cli.py | 47 ++++++++- punchpipe/control/db.py | 12 +++ punchpipe/flows/fcorona.py | 82 +++++++++++++++ punchpipe/flows/level3.py | 110 ++++++++++++++++++--- punchpipe/flows/starfield.py | 90 +++++++++++++++++ punchpipe/level0/decode_sqrt.py | 0 punchpipe/level0/tests/test_decode_sqrt.py | 0 7 files changed, 326 insertions(+), 15 deletions(-) create mode 100644 punchpipe/flows/fcorona.py create mode 100644 punchpipe/flows/starfield.py delete mode 100644 punchpipe/level0/decode_sqrt.py delete mode 100644 punchpipe/level0/tests/test_decode_sqrt.py diff --git a/punchpipe/cli.py b/punchpipe/cli.py index 8ea3f60..0cf4f30 100644 --- a/punchpipe/cli.py +++ b/punchpipe/cli.py @@ -13,8 +13,10 @@ from punchpipe.control.util import load_pipeline_configuration from punchpipe.flows.level1 import level1_process_flow, level1_scheduler_flow from punchpipe.flows.level2 import level2_process_flow, level2_scheduler_flow -from punchpipe.flows.level3 import level3_PTM_process_flow, level3_PTM_scheduler_flow +from punchpipe.flows.level3 import level3_PTM_process_flow, level3_PTM_scheduler_flow, level3_PIM_process_flow, level3_PIM_scheduler_flow from punchpipe.flows.levelq import levelq_process_flow, levelq_scheduler_flow +from punchpipe.flows.starfield import construct_starfield_background_process_flow, construct_starfield_background_scheduler_flow +from punchpipe.flows.fcorona import construct_f_corona_background_scheduler_flow, construct_f_corona_background_process_flow from punchpipe.monitor.app import create_app THIS_DIR = os.path.dirname(__file__) @@ -73,6 +75,46 @@ def serve_flows(configuration_path): parameters={"pipeline_config_path": configuration_path} ) + level3_PIM_scheduler_deployment = level3_PIM_scheduler_flow.to_deployment(name="level3-PIM-scheduler-deployment", + description="Schedule a Level 3 flow to make PIM.", + cron="* * * * *", + parameters={ + "pipeline_config_path": configuration_path} + + ) + level3_PIM_process_deployment = level3_PIM_process_flow.to_deployment(name="level3_PIM_process_flow", + description="Process to PIM files.", + parameters={ + "pipeline_config_path": configuration_path} + ) + + construct_f_corona_background_scheduler_deployment = construct_f_corona_background_scheduler_flow.to_deployment(name="construct_f_corona_background-scheduler-deployment", + description="Schedule an F corona background.", + cron="* * * * *", + parameters={ + "pipeline_config_path": configuration_path} + + ) + construct_f_corona_background_process_deployment = construct_f_corona_background_process_flow.to_deployment(name="construct_f_corona_background_process_flow", + description="Process F corona background.", + parameters={ + "pipeline_config_path": configuration_path} + ) + + construct_starfield_background_scheduler_deployment = construct_f_corona_background_scheduler_flow.to_deployment(name="construct_starfield-scheduler-deployment", + description="Schedule a starfield background.", + cron="* * * * *", + parameters={ + "pipeline_config_path": configuration_path} + + ) + construct_starfield_background_process_deployment = construct_f_corona_background_process_flow.to_deployment(name="construct_starfield_background_process_flow", + description="Create starfield background.", + parameters={ + "pipeline_config_path": configuration_path} + ) + + level3_PTM_scheduler_deployment = level3_PTM_scheduler_flow.to_deployment(name="level3-PTM-scheduler-deployment", description="Schedule a Level 3 flow to make PTM.", cron="* * * * *", @@ -95,6 +137,9 @@ def serve_flows(configuration_path): level2_scheduler_deployment, level2_process_deployment, levelq_scheduler_deployment, levelq_process_deployment, level3_PTM_scheduler_deployment, level3_PTM_process_deployment, + construct_f_corona_background_process_deployment, construct_f_corona_background_scheduler_deployment, + construct_starfield_background_process_deployment, construct_starfield_background_scheduler_deployment, + level3_PIM_scheduler_deployment, level3_PIM_process_deployment, health_deployment, limit=1000 ) diff --git a/punchpipe/control/db.py b/punchpipe/control/db.py index 432f6c7..1cef4e4 100644 --- a/punchpipe/control/db.py +++ b/punchpipe/control/db.py @@ -216,3 +216,15 @@ def get_closest_eng_packets(table, timestamp, spacecraft_id): gt_event = lt_event return lt_event, gt_event + + +def get_closest_file(f_target: File, f_others: list[File]) -> File: + return min(f_others, key=lambda o: abs((f_target.date_obs - o.date_obs).total_seconds())) + + +def get_closest_before_file(f_target: File, f_others: list[File]) -> File: + return get_closest_file(f_target, [o for o in f_others if f_target.date_obs >= o.date_obs]) + + +def get_closest_after_file(f_target: File, f_others: list[File]) -> File: + return get_closest_file(f_target, [o for o in f_others if f_target.date_obs <= o.date_obs]) diff --git a/punchpipe/flows/fcorona.py b/punchpipe/flows/fcorona.py new file mode 100644 index 0000000..2399e02 --- /dev/null +++ b/punchpipe/flows/fcorona.py @@ -0,0 +1,82 @@ +import json +import typing as t +import os +from datetime import datetime + +from prefect import flow, get_run_logger, task +from punchbowl.level3.f_corona_model import construct_f_corona_background +from sqlalchemy import and_ + +from punchpipe import __version__ +from punchpipe.control.db import File, Flow +from punchpipe.control.processor import generic_process_flow_logic +from punchpipe.control.scheduler import generic_scheduler_flow_logic +from punchpipe.control.util import get_database_session + + +@task +def f_corona_background_query_ready_files(session, pipeline_config: dict): + logger = get_run_logger() + all_ready_files = (session.query(File) + .filter(File.state == "created") + .filter(File.level == "2") + .filter(File.file_type == "PT") + .filter(File.observatory == "M").all()) + logger.info(f"{len(all_ready_files)} Level 2 PTM files will be used for F corona background modeling.") + if len(all_ready_files) > 30: # need at least 30 images + return [all_ready_files] + else: + return [] + +@task +def construct_f_corona_background_flow_info(level3_files: list[File], + level3_f_model_file: File, + pipeline_config: dict, + session=None): + flow_type = "construct_f_corona_background_process_flow" + state = "planned" + creation_time = datetime.now() + priority = pipeline_config["levels"][flow_type]["priority"]["initial"] + call_data = json.dumps( + { + "data_list": [ + os.path.join(level3_file.directory(pipeline_config["root"]), level3_file.filename()) + for level3_file in level3_files + ], + } + ) + return Flow( + flow_type=flow_type, + state=state, + flow_level="3", + creation_time=creation_time, + priority=priority, + call_data=call_data, + ) + + +@task +def construct_f_corona_background_file_info(level2_files: t.List[File], pipeline_config: dict) -> t.List[File]: + return [File( + level="3", + file_type="PF", + observatory="M", + file_version=pipeline_config["file_version"], + software_version=__version__, + date_obs=level2_files[0].date_obs, + state="planned", + )] + +@flow +def construct_f_corona_background_scheduler_flow(pipeline_config_path=None, session=None): + generic_scheduler_flow_logic( + f_corona_background_query_ready_files, + construct_f_corona_background_file_info, + construct_f_corona_background_flow_info, + pipeline_config_path, + session=session, + ) + +@flow +def construct_f_corona_background_process_flow(flow_id: int, pipeline_config_path=None, session=None): + generic_process_flow_logic(flow_id, construct_f_corona_background, pipeline_config_path, session=session) diff --git a/punchpipe/flows/level3.py b/punchpipe/flows/level3.py index eed20b6..3d5c8ce 100644 --- a/punchpipe/flows/level3.py +++ b/punchpipe/flows/level3.py @@ -4,11 +4,11 @@ from datetime import datetime, timedelta from prefect import flow, get_run_logger, task -from punchbowl.level3.flow import level3_core_flow +from punchbowl.level3.flow import level3_core_flow, level3_PIM_flow from sqlalchemy import and_ from punchpipe import __version__ -from punchpipe.control.db import File, Flow +from punchpipe.control.db import File, Flow, get_closest_file, get_closest_before_file, get_closest_after_file from punchpipe.control.processor import generic_process_flow_logic from punchpipe.control.scheduler import generic_scheduler_flow_logic from punchpipe.control.util import get_database_session @@ -30,15 +30,6 @@ def get_valid_fcorona_models(session, f: File, before_timedelta: timedelta, afte f.date_obs <= valid_fcorona_end)).all()) -def get_closest_file(f_target: File, f_others: list[File]) -> File: - return min(f_others, key=lambda o: abs((f_target.date_obs - o.date_obs).total_seconds())) - -def get_closest_before_file(f_target: File, f_others: list[File]) -> File: - return get_closest_file(f_target, [o for o in f_others if f_target.date_obs >= o.date_obs]) - -def get_closest_after_file(f_target: File, f_others: list[File]) -> File: - return get_closest_file(f_target, [o for o in f_others if f_target.date_obs <= o.date_obs]) - @task def level3_PTM_query_ready_files(session, pipeline_config: dict): logger = get_run_logger() @@ -85,18 +76,18 @@ def level3_PTM_construct_flow_info(level2_files: list[File], level3_file: File, ], # TODO put magic numbers in config "before_f_corona_model_path": get_closest_before_file(level2_files[0], - get_valid_fcorona_models(session, + get_valid_fcorona_models(session, level2_files[0], before_timedelta=timedelta(days=3), after_timedelta=timedelta(days=3))), "after_f_corona_model_path": get_closest_after_file(level2_files[0], - get_valid_fcorona_models(session, + get_valid_fcorona_models(session, level2_files[0], before_timedelta=timedelta(days=3), after_timedelta=timedelta(days=3))), # TODO put magic numbers in config "starfield_background_path": get_closest_file(level2_files[0], - get_valid_starfields(session, + get_valid_starfields(session, level2_files[0], timedelta_window=timedelta(days=14))), } @@ -138,3 +129,94 @@ def level3_PTM_scheduler_flow(pipeline_config_path=None, session=None): @flow def level3_PTM_process_flow(flow_id: int, pipeline_config_path=None, session=None): generic_process_flow_logic(flow_id, level3_core_flow, pipeline_config_path, session=session) + + +@task +def level3_PIM_query_ready_files(session, pipeline_config: dict): + logger = get_run_logger() + all_ready_files = session.query(File).where(and_(and_(File.state == "created", + File.level == "2"), + File.file_type == "PT")).all() + logger.info(f"{len(all_ready_files)} Level 3 PTM files need to be processed.") + + actually_ready_files = [] + for f in all_ready_files: + valid_before_fcorona_models = get_valid_fcorona_models(session, f, + before_timedelta=timedelta(days=3), + after_timedelta=timedelta(days=0)) + valid_after_fcorona_models = get_valid_fcorona_models(session, f, + before_timedelta=timedelta(days=0), + after_timedelta=timedelta(days=3)) + + if (len(valid_before_fcorona_models) >= 1 and len(valid_after_fcorona_models) >= 1): + actually_ready_files.append(f) + logger.info(f"{len(actually_ready_files)} Level 2 PTM files have necessary calibration data.") + + return [[f.file_id] for f in actually_ready_files] + + +@task +def level3_PIM_construct_flow_info(level2_files: list[File], level3_file: File, pipeline_config: dict, session=None): + session = get_database_session() # TODO: replace so this works in the tests by passing in a test + + flow_type = "level3_PIM_process_flow" + state = "planned" + creation_time = datetime.now() + priority = pipeline_config["levels"][flow_type]["priority"]["initial"] + call_data = json.dumps( + { + "data_list": [ + os.path.join(level2_file.directory(pipeline_config["root"]), level2_file.filename()) + for level2_file in level2_files + ], + # TODO put magic numbers in config + "before_f_corona_model_path": get_closest_before_file(level2_files[0], + get_valid_fcorona_models(session, + level2_files[0], + before_timedelta=timedelta(days=3), + after_timedelta=timedelta(days=3))), + "after_f_corona_model_path": get_closest_after_file(level2_files[0], + get_valid_fcorona_models(session, + level2_files[0], + before_timedelta=timedelta(days=3), + after_timedelta=timedelta(days=3))), + } + ) + return Flow( + flow_type=flow_type, + state=state, + flow_level="3", + creation_time=creation_time, + priority=priority, + call_data=call_data, + ) + + +@task +def level3_PIM_construct_file_info(level2_files: t.List[File], pipeline_config: dict) -> t.List[File]: + return [File( + level="3", + file_type="PI", + observatory="M", + file_version=pipeline_config["file_version"], + software_version=__version__, + date_obs=level2_files[0].date_obs, + state="planned", + )] + + +@flow +def level3_PIM_scheduler_flow(pipeline_config_path=None, session=None): + generic_scheduler_flow_logic( + level3_PIM_query_ready_files, + level3_PIM_construct_file_info, + level3_PIM_construct_flow_info, + pipeline_config_path, + session=session, + ) + + +@flow +def level3_PIM_process_flow(flow_id: int, pipeline_config_path=None, session=None): + generic_process_flow_logic(flow_id, level3_PIM_flow, pipeline_config_path, session=session) + diff --git a/punchpipe/flows/starfield.py b/punchpipe/flows/starfield.py new file mode 100644 index 0000000..f12641e --- /dev/null +++ b/punchpipe/flows/starfield.py @@ -0,0 +1,90 @@ +import typing as t + +import json +import os +from datetime import datetime + +from prefect import flow, get_run_logger, task +from punchbowl.level3.f_corona_model import construct_f_corona_background +from sqlalchemy import and_ + + +from punchpipe import __version__ +from punchpipe.control.db import File, Flow +from punchpipe.control.processor import generic_process_flow_logic +from punchpipe.control.scheduler import generic_scheduler_flow_logic +from punchpipe.control.util import get_database_session + + +@task +def starfield_background_query_ready_files(session, pipeline_config: dict): + logger = get_run_logger() + all_ready_files = (session.query(File) + .filter(File.state == "created") + .filter(File.level == "3") + .filter(File.file_type == "PI") + .filter(File.observatory == "M").all()) + logger.info(f"{len(all_ready_files)} Level 3 PIM files will be used for F corona background modeling.") + if len(all_ready_files) >= 30: + return [all_ready_files] + else: + return [] + + +@task +def construct_starfield_background_flow_info(level3_fcorona_subtracted_files: list[File], + level3_starfield_model_file: File, + pipeline_config: dict, + session=None): + flow_type = "construct_starfield_background_process_flow" + state = "planned" + creation_time = datetime.now() + priority = pipeline_config["levels"][flow_type]["priority"]["initial"] + call_data = json.dumps( + { + "data_list": [ + os.path.join(level3_file.directory(pipeline_config["root"]), level3_file.filename()) + for level3_file in level3_fcorona_subtracted_files + ], + } + ) + return Flow( + flow_type=flow_type, + state=state, + flow_level="3", + creation_time=creation_time, + priority=priority, + call_data=call_data, + ) + + +@task +def construct_starfield_background_file_info(level3_files: t.List[File], pipeline_config: dict) -> t.List[File]: + return [File( + level="3", + file_type="PS", + observatory="M", + file_version=pipeline_config["file_version"], + software_version=__version__, + date_obs=level3_files[0].date_obs, + state="planned", + )] + + +@flow +def construct_starfield_background_scheduler_flow(pipeline_config_path=None, session=None): + generic_scheduler_flow_logic( + starfield_background_query_ready_files, + construct_starfield_background_flow_info, + construct_starfield_background_file_info, + pipeline_config_path, + session=session, + ) + + +@flow +def construct_starfield_background_process_flow(flow_id: int, pipeline_config_path=None, session=None): + generic_process_flow_logic(flow_id, + construct_starfield_background_file_info, + pipeline_config_path, + session=session) diff --git a/punchpipe/level0/decode_sqrt.py b/punchpipe/level0/decode_sqrt.py deleted file mode 100644 index e69de29..0000000 diff --git a/punchpipe/level0/tests/test_decode_sqrt.py b/punchpipe/level0/tests/test_decode_sqrt.py deleted file mode 100644 index e69de29..0000000