Skip to content

Commit

Permalink
preparing for build 3 review
Browse files Browse the repository at this point in the history
  • Loading branch information
jmbhughes committed Aug 24, 2024
1 parent 5f06d12 commit 103d124
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 42 deletions.
15 changes: 8 additions & 7 deletions deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@
level2_process_deployment = level2_process_flow.to_deployment(name="level2_process_flow",
description="Process files from Level 1 to Level 2.")

level3_scheduler_deployment = level3_scheduler_flow.to_deployment(name="level3-scheduler-deployment",
description="Schedule a Level 3 flow.",
cron="* * * * *",
)
level3_process_deployment = level3_process_flow.to_deployment(name="level3_process_flow",
description="Process files from Level 2 to Level 3.")
# level3_scheduler_deployment = level3_scheduler_flow.to_deployment(name="level3-scheduler-deployment",
# description="Schedule a Level 3 flow.",
# cron="* * * * *",
# )
# level3_process_deployment = level3_process_flow.to_deployment(name="level3_process_flow",
# description="Process files from Level 2 to Level 3.")


serve(launcher_deployment,
level1_scheduler_deployment, level1_process_deployment,
level2_scheduler_deployment, level2_process_deployment,
level3_scheduler_deployment, level3_process_deployment)
# level3_scheduler_deployment, level3_process_deployment
)
12 changes: 6 additions & 6 deletions punchpipe/controlsegment/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ def generic_scheduler_flow_logic(
# find all files that are ready to run
ready_file_ids = query_ready_files_func(session, pipeline_config)
if ready_file_ids:
for file_id in ready_file_ids:
for group in ready_file_ids:
parent_files = []
for file_id in group:
# mark the file as progressed so that there aren't duplicate processing flows
update_file_state(session, file_id, "progressed")

# mark the file as progressed so that there aren't duplicate processing flows
update_file_state(session, file_id, "progressed")

# get the prior level file's information
parent_files += session.query(File).where(File.file_id == file_id).all()
# get the prior level file's information
parent_files += session.query(File).where(File.file_id == file_id).all()

# prepare the new level flow and file
children_files = construct_child_file_info(parent_files, pipeline_config)
Expand Down
28 changes: 22 additions & 6 deletions punchpipe/flows/level1.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import typing as t
from datetime import datetime
from pathlib import Path

from prefect import flow, task
from punchbowl.level1.flow import level1_core_flow
Expand All @@ -15,11 +16,28 @@

@task
def level1_query_ready_files(session, pipeline_config: dict):
return [f.file_id for f in session.query(File).where(and_(File.state == "created", File.level == 0)).all()]
return [[f.file_id] for f in session.query(File).where(and_(File.state == "created", File.level == 0)).all()]


# TODO handle more robustly
@task
def level1_construct_flow_info(level0_files: File, level1_files: File, pipeline_config: dict):
def get_vignetting_function(level0_file):
observatory = int(level0_file.observatory)
if observatory < 4:
vignetting_function_path = "/Users/jhughes/Desktop/repos/simpunch/PUNCH_L1_GM1_20240817174727_v2.fits"
else:
vignetting_function_path = "/Users/jhughes/Desktop/repos/simpunch/PUNCH_L1_GM4_20240819045110_v1.fits"
return vignetting_function_path


# TODO handle more robustly
@task
def get_psf_model_path(level0_file):
return "/Users/jhughes/Desktop/repos/punchbowl/test_run/synthetic_forward_psf.h5"


@task
def level1_construct_flow_info(level0_files: list[File], level1_files: File, pipeline_config: dict):
flow_type = "level1_process_flow"
state = "planned"
creation_time = datetime.now()
Expand All @@ -30,10 +48,8 @@ def level1_construct_flow_info(level0_files: File, level1_files: File, pipeline_
os.path.join(level0_file.directory(pipeline_config["root"]), level0_file.filename())
for level0_file in level0_files
],
# "output_filename": [
# os.path.join(level1_file.directory(pipeline_config["root"]), level1_file.filename())
# for level1_file in level1_files
# ],
"vignetting_function_path": get_vignetting_function(level0_files[0]),
"psf_model_path": get_psf_model_path(level0_files[0]),
}
)
return Flow(
Expand Down
40 changes: 17 additions & 23 deletions punchpipe/flows/level2.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import typing as t
from datetime import datetime, timedelta

from prefect import flow, task
from prefect import flow, task, get_run_logger
from punchbowl.level2.flow import level2_core_flow
from sqlalchemy import and_

Expand All @@ -15,20 +15,20 @@

@task
def level2_query_ready_files(session, pipeline_config: dict):
latency = pipeline_config["levels"]["level2_process_flow"]["schedule"]["latency"]
window_duration = pipeline_config["levels"]["level2_process_flow"]["schedule"]["window_duration_seconds"]
start_time = datetime.now() - timedelta(minutes=latency + window_duration)
end_time = datetime.now() - timedelta(minutes=latency)
return [
f.file_id
for f in session.query(File)
.where(and_(File.state == "created", File.level == 1, File.date_obs > start_time, File.date_obs < end_time))
.all()
]
logger = get_run_logger()
all_ready_files = session.query(File).where(and_(File.state == "created", File.level == 1)).all()
logger.info(f"{len(all_ready_files)} ready files")
unique_times = set(f.date_obs for f in all_ready_files)
logger.info(f"{len(unique_times)} unique times: {unique_times}")
grouped_ready_files = [[f.file_id for f in all_ready_files if f.date_obs == time] for time in unique_times]
logger.info(f"{len(grouped_ready_files)} grouped ready files")
out = [g for g in grouped_ready_files if len(g) == 12]
logger.info(f"{len(out)} groups heading out")
return out


@task
def level2_construct_flow_info(level1_files: File, level2_file: File, pipeline_config: dict):
def level2_construct_flow_info(level1_files: list[File], level2_file: File, pipeline_config: dict):
flow_type = "level2_process_flow"
state = "planned"
creation_time = datetime.now()
Expand All @@ -54,21 +54,15 @@ def level2_construct_flow_info(level1_files: File, level2_file: File, pipeline_c
@task
def level2_construct_file_info(level1_files: t.List[File], pipeline_config: dict) -> t.List[File]:
# TODO: make realistic to level 2 products
out_files = []
for level1_file in level1_files:
out_files.append(
File(
return [File(
level=2,
file_type=level1_file.file_type,
observatory=level1_file.observatory,
file_type="PT",
observatory="M",
file_version=pipeline_config["file_version"],
software_version=__version__,
date_obs=level1_file.date_obs,
polarization=level1_file.polarization,
date_obs=level1_files[0].date_obs,
state="planned",
)
)
return out_files
)]


@flow
Expand Down

0 comments on commit 103d124

Please sign in to comment.