Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

preparing for build 3 review #69

Merged
merged 2 commits into from
Aug 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions deploy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from prefect import serve
from punchpipe.controlsegment.launcher import launcher_flow
from punchpipe.flows.level1 import level1_process_flow, level1_scheduler_flow
from punchpipe.flows.level2 import level2_process_flow, level2_scheduler_flow
from punchpipe.flows.level3 import level3_process_flow, level3_scheduler_flow

launcher_deployment = launcher_flow.to_deployment(name="launcher-deployment",
description="Launch a pipeline segment.",
cron="* * * * *",
)

level1_scheduler_deployment = level1_scheduler_flow.to_deployment(name="level1-scheduler-deployment",
description="Schedule a Level 1 flow.",
cron="* * * * *",
)
level1_process_deployment = level1_process_flow.to_deployment(name="level1_process_flow",
description="Process a file from Level 0 to Level 1.")

level2_scheduler_deployment = level2_scheduler_flow.to_deployment(name="level2-scheduler-deployment",
description="Schedule a Level 2 flow.",
cron="* * * * *",
)
level2_process_deployment = level2_process_flow.to_deployment(name="level2_process_flow",
description="Process files from Level 1 to Level 2.")

# level3_scheduler_deployment = level3_scheduler_flow.to_deployment(name="level3-scheduler-deployment",
# description="Schedule a Level 3 flow.",
# cron="* * * * *",
# )
# level3_process_deployment = level3_process_flow.to_deployment(name="level3_process_flow",
# description="Process files from Level 2 to Level 3.")


serve(launcher_deployment,
level1_scheduler_deployment, level1_process_deployment,
level2_scheduler_deployment, level2_process_deployment,
# level3_scheduler_deployment, level3_process_deployment
)
2 changes: 1 addition & 1 deletion punchpipe/controlsegment/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def directory(self, root: str):
str
the place to write the file
"""
return os.path.join(root, str(self.level), self.file_type, self.date_obs.strftime("%Y/%m/%d"))
return os.path.join(root, str(self.level), self.file_type + self.observatory, self.date_obs.strftime("%Y/%m/%d"))


class Flow(Base):
Expand Down
5 changes: 5 additions & 0 deletions punchpipe/controlsegment/processor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
from datetime import datetime

from prefect import get_run_logger
from prefect.context import get_run_context

from punchpipe.controlsegment.db import File, Flow
Expand All @@ -15,12 +16,14 @@
def generic_process_flow_logic(flow_id: int, core_flow_to_launch, pipeline_config_path: str, session=None):
if session is None:
session = get_database_session()
logger = get_run_logger()

# load pipeline configuration
pipeline_config = load_pipeline_configuration(pipeline_config_path)

# fetch the appropriate flow db entry
flow_db_entry = session.query(Flow).where(Flow.flow_id == flow_id).one()
logger.info(f"Running on flow db entry with id={flow_db_entry.flow_id}.")

# update the processing flow name with the flow run name from Prefect
flow_run_context = get_run_context()
Expand All @@ -43,10 +46,12 @@ def generic_process_flow_logic(flow_id: int, core_flow_to_launch, pipeline_confi
flow_call_data = json.loads(flow_db_entry.call_data)
output_file_ids = set()
expected_file_ids = {entry.file_id for entry in file_db_entry_list}
logger.info(f"Expecting to output files with ids={expected_file_ids}.")
try:
results = core_flow_to_launch(**flow_call_data)
for result in results:
file_db_entry = match_data_with_file_db_entry(result, file_db_entry_list)
logger.info(f"Preparing to write {file_db_entry.file_id}.")
output_file_ids.add(file_db_entry.file_id)
write_file(result, file_db_entry, pipeline_config)

Expand Down
49 changes: 25 additions & 24 deletions punchpipe/controlsegment/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,31 @@ def generic_scheduler_flow_logic(
session = get_database_session()

# find all files that are ready to run
parent_files = []
ready_file_ids = query_ready_files_func(session, pipeline_config)
if ready_file_ids:
for file_id in ready_file_ids:
# mark the file as progressed so that there aren't duplicate processing flows
update_file_state(session, file_id, "progressed")

# get the prior level file's information
parent_files += session.query(File).where(File.file_id == file_id).all()

# prepare the new level flow and file
children_files = construct_child_file_info(parent_files, pipeline_config)
database_flow_info = construct_child_flow_info(parent_files, children_files, pipeline_config)
session.add(*children_files)
session.add(database_flow_info)
session.commit()

# set the processing flow now that we know the flow_id after committing the flow info
for child_file in children_files:
child_file.processing_flow = database_flow_info.flow_id
session.commit()

# create a file relationship between the prior and next levels
for parent_file in parent_files:
for group in ready_file_ids:
parent_files = []
for file_id in group:
# mark the file as progressed so that there aren't duplicate processing flows
update_file_state(session, file_id, "progressed")

# get the prior level file's information
parent_files += session.query(File).where(File.file_id == file_id).all()

# prepare the new level flow and file
children_files = construct_child_file_info(parent_files, pipeline_config)
database_flow_info = construct_child_flow_info(parent_files, children_files, pipeline_config)
session.add(*children_files)
session.add(database_flow_info)
session.commit()

# set the processing flow now that we know the flow_id after committing the flow info
for child_file in children_files:
session.add(FileRelationship(parent=parent_file.file_id, child=child_file.file_id))
session.commit()
child_file.processing_flow = database_flow_info.flow_id
session.commit()

# create a file relationship between the prior and next levels
for parent_file in parent_files:
for child_file in children_files:
session.add(FileRelationship(parent=parent_file.file_id, child=child_file.file_id))
session.commit()
28 changes: 22 additions & 6 deletions punchpipe/flows/level1.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import typing as t
from datetime import datetime
from pathlib import Path

from prefect import flow, task
from punchbowl.level1.flow import level1_core_flow
Expand All @@ -15,11 +16,28 @@

@task
def level1_query_ready_files(session, pipeline_config: dict):
return [f.file_id for f in session.query(File).where(and_(File.state == "created", File.level == 0)).all()]
return [[f.file_id] for f in session.query(File).where(and_(File.state == "created", File.level == 0)).all()]


# TODO handle more robustly
@task
def level1_construct_flow_info(level0_files: File, level1_files: File, pipeline_config: dict):
def get_vignetting_function(level0_file):
observatory = int(level0_file.observatory)
if observatory < 4:
vignetting_function_path = "/Users/jhughes/Desktop/repos/simpunch/PUNCH_L1_GM1_20240817174727_v2.fits"
else:
vignetting_function_path = "/Users/jhughes/Desktop/repos/simpunch/PUNCH_L1_GM4_20240819045110_v1.fits"
return vignetting_function_path


# TODO handle more robustly
@task
def get_psf_model_path(level0_file):
return "/Users/jhughes/Desktop/repos/punchbowl/test_run/synthetic_forward_psf.h5"


@task
def level1_construct_flow_info(level0_files: list[File], level1_files: File, pipeline_config: dict):
flow_type = "level1_process_flow"
state = "planned"
creation_time = datetime.now()
Expand All @@ -30,10 +48,8 @@ def level1_construct_flow_info(level0_files: File, level1_files: File, pipeline_
os.path.join(level0_file.directory(pipeline_config["root"]), level0_file.filename())
for level0_file in level0_files
],
"output_filename": [
os.path.join(level1_file.directory(pipeline_config["root"]), level1_file.filename())
for level1_file in level1_files
],
"vignetting_function_path": get_vignetting_function(level0_files[0]),
"psf_model_path": get_psf_model_path(level0_files[0]),
}
)
return Flow(
Expand Down
40 changes: 17 additions & 23 deletions punchpipe/flows/level2.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import typing as t
from datetime import datetime, timedelta

from prefect import flow, task
from prefect import flow, task, get_run_logger
from punchbowl.level2.flow import level2_core_flow
from sqlalchemy import and_

Expand All @@ -15,20 +15,20 @@

@task
def level2_query_ready_files(session, pipeline_config: dict):
latency = pipeline_config["levels"]["level2_process_flow"]["schedule"]["latency"]
window_duration = pipeline_config["levels"]["level2_process_flow"]["schedule"]["window_duration_seconds"]
start_time = datetime.now() - timedelta(minutes=latency + window_duration)
end_time = datetime.now() - timedelta(minutes=latency)
return [
f.file_id
for f in session.query(File)
.where(and_(File.state == "created", File.level == 1, File.date_obs > start_time, File.date_obs < end_time))
.all()
]
logger = get_run_logger()
all_ready_files = session.query(File).where(and_(File.state == "created", File.level == 1)).all()
logger.info(f"{len(all_ready_files)} ready files")
unique_times = set(f.date_obs for f in all_ready_files)
logger.info(f"{len(unique_times)} unique times: {unique_times}")
grouped_ready_files = [[f.file_id for f in all_ready_files if f.date_obs == time] for time in unique_times]
logger.info(f"{len(grouped_ready_files)} grouped ready files")
out = [g for g in grouped_ready_files if len(g) == 12]
logger.info(f"{len(out)} groups heading out")
return out


@task
def level2_construct_flow_info(level1_files: File, level2_file: File, pipeline_config: dict):
def level2_construct_flow_info(level1_files: list[File], level2_file: File, pipeline_config: dict):
flow_type = "level2_process_flow"
state = "planned"
creation_time = datetime.now()
Expand All @@ -54,21 +54,15 @@ def level2_construct_flow_info(level1_files: File, level2_file: File, pipeline_c
@task
def level2_construct_file_info(level1_files: t.List[File], pipeline_config: dict) -> t.List[File]:
# TODO: make realistic to level 2 products
out_files = []
for level1_file in level1_files:
out_files.append(
File(
return [File(
level=2,
file_type=level1_file.file_type,
observatory=level1_file.observatory,
file_type="PT",
observatory="M",
file_version=pipeline_config["file_version"],
software_version=__version__,
date_obs=level1_file.date_obs,
polarization=level1_file.polarization,
date_obs=level1_files[0].date_obs,
state="planned",
)
)
return out_files
)]


@flow
Expand Down
86 changes: 86 additions & 0 deletions punchpipe/flows/level3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import json
import os
import typing as t
from datetime import datetime, timedelta

from prefect import flow, task
from punchbowl.level3.flow import level3_core_flow
from sqlalchemy import and_

from punchpipe import __version__
from punchpipe.controlsegment.db import File, Flow
from punchpipe.controlsegment.processor import generic_process_flow_logic
from punchpipe.controlsegment.scheduler import generic_scheduler_flow_logic


@task
def level3_query_ready_files(session, pipeline_config: dict):
latency = pipeline_config["levels"]["level3_process_flow"]["schedule"]["latency"]
window_duration = pipeline_config["levels"]["level3_process_flow"]["schedule"]["window_duration_seconds"]
start_time = datetime.now() - timedelta(minutes=latency + window_duration)
end_time = datetime.now() - timedelta(minutes=latency)
return [
f.file_id
for f in session.query(File)
.where(and_(File.state == "created", File.level == 2, File.date_obs > start_time, File.date_obs < end_time))
.all()
]


@task
def level3_construct_flow_info(level2_files: File, level3_file: File, pipeline_config: dict):
flow_type = "level3_process_flow"
state = "planned"
creation_time = datetime.now()
priority = pipeline_config["levels"][flow_type]["priority"]["initial"]
call_data = json.dumps(
{
"data_list": [
os.path.join(level2_file.directory(pipeline_config["root"]), level2_file.filename())
for level2_file in level2_files
]
}
)
return Flow(
flow_type=flow_type,
state=state,
flow_level=3,
creation_time=creation_time,
priority=priority,
call_data=call_data,
)


@task
def level3_construct_file_info(level2_files: t.List[File], pipeline_config: dict) -> t.List[File]:
out_files = []
for level2_file in level2_files:
out_files.append(
File(
level=3,
file_type=level2_file.file_type,
observatory=level2_file.observatory,
file_version=pipeline_config["file_version"],
software_version=__version__,
date_obs=level2_file.date_obs,
polarization=level2_file.polarization,
state="planned",
)
)
return out_files


@flow
def level3_scheduler_flow(pipeline_config_path="config.yaml", session=None):
generic_scheduler_flow_logic(
level3_query_ready_files,
level3_construct_file_info,
level3_construct_flow_info,
pipeline_config_path,
session=session,
)


@flow
def level3_process_flow(flow_id: int, pipeline_config_path="config.yaml", session=None):
generic_process_flow_logic(flow_id, level3_core_flow, pipeline_config_path, session=session)
6 changes: 2 additions & 4 deletions scripts/create_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@


if __name__ == "__main__":
# Base = declarative_base()
credentials = DatabaseCredentials.load("mysql-cred")
engine = create_engine(
f'mysql+pymysql://{credentials.user}:{credentials.password.get_secret_value()}@localhost/punchpipe')
credentials = DatabaseCredentials.load("mariadb-creds")
engine = credentials.get_engine()
Base.metadata.create_all(engine)
Loading