Skip to content

Commit

Permalink
add starfield and f corona
Browse files Browse the repository at this point in the history
  • Loading branch information
jmbhughes committed Nov 19, 2024
1 parent fa7d854 commit f397e0a
Show file tree
Hide file tree
Showing 7 changed files with 326 additions and 15 deletions.
47 changes: 46 additions & 1 deletion punchpipe/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
from punchpipe.control.util import load_pipeline_configuration
from punchpipe.flows.level1 import level1_process_flow, level1_scheduler_flow
from punchpipe.flows.level2 import level2_process_flow, level2_scheduler_flow
from punchpipe.flows.level3 import level3_PTM_process_flow, level3_PTM_scheduler_flow
from punchpipe.flows.level3 import level3_PTM_process_flow, level3_PTM_scheduler_flow, level3_PIM_process_flow, level3_PIM_scheduler_flow
from punchpipe.flows.levelq import levelq_process_flow, levelq_scheduler_flow
from punchpipe.flows.starfield import construct_starfield_background_process_flow, construct_starfield_background_scheduler_flow
from punchpipe.flows.fcorona import construct_f_corona_background_scheduler_flow, construct_f_corona_background_process_flow
from punchpipe.monitor.app import create_app

THIS_DIR = os.path.dirname(__file__)
Expand Down Expand Up @@ -73,6 +75,46 @@ def serve_flows(configuration_path):
parameters={"pipeline_config_path": configuration_path}
)

level3_PIM_scheduler_deployment = level3_PIM_scheduler_flow.to_deployment(name="level3-PIM-scheduler-deployment",
description="Schedule a Level 3 flow to make PIM.",
cron="* * * * *",
parameters={
"pipeline_config_path": configuration_path}

)
level3_PIM_process_deployment = level3_PIM_process_flow.to_deployment(name="level3_PIM_process_flow",
description="Process to PIM files.",
parameters={
"pipeline_config_path": configuration_path}
)

construct_f_corona_background_scheduler_deployment = construct_f_corona_background_scheduler_flow.to_deployment(name="construct_f_corona_background-scheduler-deployment",
description="Schedule an F corona background.",
cron="* * * * *",
parameters={
"pipeline_config_path": configuration_path}

)
construct_f_corona_background_process_deployment = construct_f_corona_background_process_flow.to_deployment(name="construct_f_corona_background_process_flow",
description="Process F corona background.",
parameters={
"pipeline_config_path": configuration_path}
)

construct_starfield_background_scheduler_deployment = construct_f_corona_background_scheduler_flow.to_deployment(name="construct_starfield-scheduler-deployment",
description="Schedule a starfield background.",
cron="* * * * *",
parameters={
"pipeline_config_path": configuration_path}

)
construct_starfield_background_process_deployment = construct_f_corona_background_process_flow.to_deployment(name="construct_starfield_background_process_flow",
description="Create starfield background.",
parameters={
"pipeline_config_path": configuration_path}
)


level3_PTM_scheduler_deployment = level3_PTM_scheduler_flow.to_deployment(name="level3-PTM-scheduler-deployment",
description="Schedule a Level 3 flow to make PTM.",
cron="* * * * *",
Expand All @@ -95,6 +137,9 @@ def serve_flows(configuration_path):
level2_scheduler_deployment, level2_process_deployment,
levelq_scheduler_deployment, levelq_process_deployment,
level3_PTM_scheduler_deployment, level3_PTM_process_deployment,
construct_f_corona_background_process_deployment, construct_f_corona_background_scheduler_deployment,
construct_starfield_background_process_deployment, construct_starfield_background_scheduler_deployment,
level3_PIM_scheduler_deployment, level3_PIM_process_deployment,
health_deployment,
limit=1000
)
Expand Down
12 changes: 12 additions & 0 deletions punchpipe/control/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,3 +216,15 @@ def get_closest_eng_packets(table, timestamp, spacecraft_id):
gt_event = lt_event

return lt_event, gt_event


def get_closest_file(f_target: File, f_others: list[File]) -> File:
return min(f_others, key=lambda o: abs((f_target.date_obs - o.date_obs).total_seconds()))


def get_closest_before_file(f_target: File, f_others: list[File]) -> File:
return get_closest_file(f_target, [o for o in f_others if f_target.date_obs >= o.date_obs])


def get_closest_after_file(f_target: File, f_others: list[File]) -> File:
return get_closest_file(f_target, [o for o in f_others if f_target.date_obs <= o.date_obs])
82 changes: 82 additions & 0 deletions punchpipe/flows/fcorona.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import json
import typing as t
import os
from datetime import datetime

from prefect import flow, get_run_logger, task
from punchbowl.level3.f_corona_model import construct_f_corona_background
from sqlalchemy import and_

from punchpipe import __version__
from punchpipe.control.db import File, Flow
from punchpipe.control.processor import generic_process_flow_logic
from punchpipe.control.scheduler import generic_scheduler_flow_logic
from punchpipe.control.util import get_database_session


@task
def f_corona_background_query_ready_files(session, pipeline_config: dict):
logger = get_run_logger()
all_ready_files = (session.query(File)
.filter(File.state == "created")
.filter(File.level == "2")
.filter(File.file_type == "PT")
.filter(File.observatory == "M").all())
logger.info(f"{len(all_ready_files)} Level 2 PTM files will be used for F corona background modeling.")
if len(all_ready_files) > 30: # need at least 30 images
return [all_ready_files]
else:
return []

@task
def construct_f_corona_background_flow_info(level3_files: list[File],
level3_f_model_file: File,
pipeline_config: dict,
session=None):
flow_type = "construct_f_corona_background_process_flow"
state = "planned"
creation_time = datetime.now()
priority = pipeline_config["levels"][flow_type]["priority"]["initial"]
call_data = json.dumps(
{
"data_list": [
os.path.join(level3_file.directory(pipeline_config["root"]), level3_file.filename())
for level3_file in level3_files
],
}
)
return Flow(
flow_type=flow_type,
state=state,
flow_level="3",
creation_time=creation_time,
priority=priority,
call_data=call_data,
)


@task
def construct_f_corona_background_file_info(level2_files: t.List[File], pipeline_config: dict) -> t.List[File]:
return [File(
level="3",
file_type="PF",
observatory="M",
file_version=pipeline_config["file_version"],
software_version=__version__,
date_obs=level2_files[0].date_obs,
state="planned",
)]

@flow
def construct_f_corona_background_scheduler_flow(pipeline_config_path=None, session=None):
generic_scheduler_flow_logic(
f_corona_background_query_ready_files,
construct_f_corona_background_file_info,
construct_f_corona_background_flow_info,
pipeline_config_path,
session=session,
)

@flow
def construct_f_corona_background_process_flow(flow_id: int, pipeline_config_path=None, session=None):
generic_process_flow_logic(flow_id, construct_f_corona_background, pipeline_config_path, session=session)
110 changes: 96 additions & 14 deletions punchpipe/flows/level3.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
from datetime import datetime, timedelta

from prefect import flow, get_run_logger, task
from punchbowl.level3.flow import level3_core_flow
from punchbowl.level3.flow import level3_core_flow, level3_PIM_flow
from sqlalchemy import and_

from punchpipe import __version__
from punchpipe.control.db import File, Flow
from punchpipe.control.db import File, Flow, get_closest_file, get_closest_before_file, get_closest_after_file
from punchpipe.control.processor import generic_process_flow_logic
from punchpipe.control.scheduler import generic_scheduler_flow_logic
from punchpipe.control.util import get_database_session
Expand All @@ -30,15 +30,6 @@ def get_valid_fcorona_models(session, f: File, before_timedelta: timedelta, afte
f.date_obs <= valid_fcorona_end)).all())


def get_closest_file(f_target: File, f_others: list[File]) -> File:
return min(f_others, key=lambda o: abs((f_target.date_obs - o.date_obs).total_seconds()))

def get_closest_before_file(f_target: File, f_others: list[File]) -> File:
return get_closest_file(f_target, [o for o in f_others if f_target.date_obs >= o.date_obs])

def get_closest_after_file(f_target: File, f_others: list[File]) -> File:
return get_closest_file(f_target, [o for o in f_others if f_target.date_obs <= o.date_obs])

@task
def level3_PTM_query_ready_files(session, pipeline_config: dict):
logger = get_run_logger()
Expand Down Expand Up @@ -85,18 +76,18 @@ def level3_PTM_construct_flow_info(level2_files: list[File], level3_file: File,
],
# TODO put magic numbers in config
"before_f_corona_model_path": get_closest_before_file(level2_files[0],
get_valid_fcorona_models(session,
get_valid_fcorona_models(session,
level2_files[0],
before_timedelta=timedelta(days=3),
after_timedelta=timedelta(days=3))),
"after_f_corona_model_path": get_closest_after_file(level2_files[0],
get_valid_fcorona_models(session,
get_valid_fcorona_models(session,
level2_files[0],
before_timedelta=timedelta(days=3),
after_timedelta=timedelta(days=3))),
# TODO put magic numbers in config
"starfield_background_path": get_closest_file(level2_files[0],
get_valid_starfields(session,
get_valid_starfields(session,
level2_files[0],
timedelta_window=timedelta(days=14))),
}
Expand Down Expand Up @@ -138,3 +129,94 @@ def level3_PTM_scheduler_flow(pipeline_config_path=None, session=None):
@flow
def level3_PTM_process_flow(flow_id: int, pipeline_config_path=None, session=None):
generic_process_flow_logic(flow_id, level3_core_flow, pipeline_config_path, session=session)


@task
def level3_PIM_query_ready_files(session, pipeline_config: dict):
logger = get_run_logger()
all_ready_files = session.query(File).where(and_(and_(File.state == "created",
File.level == "2"),
File.file_type == "PT")).all()
logger.info(f"{len(all_ready_files)} Level 3 PTM files need to be processed.")

actually_ready_files = []
for f in all_ready_files:
valid_before_fcorona_models = get_valid_fcorona_models(session, f,
before_timedelta=timedelta(days=3),
after_timedelta=timedelta(days=0))
valid_after_fcorona_models = get_valid_fcorona_models(session, f,
before_timedelta=timedelta(days=0),
after_timedelta=timedelta(days=3))

if (len(valid_before_fcorona_models) >= 1 and len(valid_after_fcorona_models) >= 1):
actually_ready_files.append(f)
logger.info(f"{len(actually_ready_files)} Level 2 PTM files have necessary calibration data.")

return [[f.file_id] for f in actually_ready_files]


@task
def level3_PIM_construct_flow_info(level2_files: list[File], level3_file: File, pipeline_config: dict, session=None):
session = get_database_session() # TODO: replace so this works in the tests by passing in a test

flow_type = "level3_PIM_process_flow"
state = "planned"
creation_time = datetime.now()
priority = pipeline_config["levels"][flow_type]["priority"]["initial"]
call_data = json.dumps(
{
"data_list": [
os.path.join(level2_file.directory(pipeline_config["root"]), level2_file.filename())
for level2_file in level2_files
],
# TODO put magic numbers in config
"before_f_corona_model_path": get_closest_before_file(level2_files[0],
get_valid_fcorona_models(session,
level2_files[0],
before_timedelta=timedelta(days=3),
after_timedelta=timedelta(days=3))),
"after_f_corona_model_path": get_closest_after_file(level2_files[0],
get_valid_fcorona_models(session,
level2_files[0],
before_timedelta=timedelta(days=3),
after_timedelta=timedelta(days=3))),
}
)
return Flow(
flow_type=flow_type,
state=state,
flow_level="3",
creation_time=creation_time,
priority=priority,
call_data=call_data,
)


@task
def level3_PIM_construct_file_info(level2_files: t.List[File], pipeline_config: dict) -> t.List[File]:
return [File(
level="3",
file_type="PI",
observatory="M",
file_version=pipeline_config["file_version"],
software_version=__version__,
date_obs=level2_files[0].date_obs,
state="planned",
)]


@flow
def level3_PIM_scheduler_flow(pipeline_config_path=None, session=None):
generic_scheduler_flow_logic(
level3_PIM_query_ready_files,
level3_PIM_construct_file_info,
level3_PIM_construct_flow_info,
pipeline_config_path,
session=session,
)


@flow
def level3_PIM_process_flow(flow_id: int, pipeline_config_path=None, session=None):
generic_process_flow_logic(flow_id, level3_PIM_flow, pipeline_config_path, session=session)

Loading

0 comments on commit f397e0a

Please sign in to comment.