Skip to content

Commit

Permalink
Improve l0 (#88)
Browse files Browse the repository at this point in the history
* avoid concatenate empty error

* add packet history

* output quicklook always

* make sure l1 scheduler doesn't fail on no model

* provide a session to l1

* return the file_id

* add l0 schedules

* dynamically update cards

* limit the number of files launched at one time

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
jmbhughes and pre-commit-ci[bot] authored Dec 11, 2024
1 parent db58298 commit 1b3be82
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 31 deletions.
12 changes: 6 additions & 6 deletions punchpipe/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ def serve_flows(configuration_path):

level0_ingest_raw_packets_deployment = level0_ingest_raw_packets.to_deployment(name="level0_ingest_raw_packets",
description="Ingest raw packets.",
#cron="* * * * *",
cron="*/5 * * * *",
parameters={
"pipeline_config_path": configuration_path},
tags=['L0'])

level0_form_images_deployment = level0_form_images.to_deployment(name="level0_form_images",
description="Form images from packets.",
#cron="* * * * *",
cron="*/5 * * * *",
tags=['L0'],
parameters={"pipeline_config_path": configuration_path})

Expand Down Expand Up @@ -140,8 +140,8 @@ def serve_flows(configuration_path):
cron=config['levels']['construct_starfield_background_process_flow'].get("schedule", "* * * * *"),
tags=["L3", "scheduler"],
parameters={"pipeline_config_path": configuration_path}

)

starfield_process_dep = starfield_process_flow.to_deployment(name="construct_starfield_background_process_flow",
description="Create starfield background.",
tags=["L3", "process"],
Expand Down Expand Up @@ -196,7 +196,7 @@ def run(configuration_path):
try:
prefect_process = subprocess.Popen(["prefect", "server", "start"],
stdout=f, stderr=subprocess.STDOUT)
time.sleep(10)
time.sleep(5)
monitor_process = subprocess.Popen(["gunicorn",
"-b", "0.0.0.0:8050",
"--chdir", THIS_DIR,
Expand All @@ -213,14 +213,14 @@ def run(configuration_path):
except KeyboardInterrupt:
print("Shutting down.")
prefect_process.terminate()
time.sleep(10)
time.sleep(5)
monitor_process.terminate()
print()
print("punchpipe safely shut down.")
except Exception as e:
print(f"Received error: {e}")
prefect_process.terminate()
time.sleep(10)
time.sleep(5)
monitor_process.terminate()
print()
print("punchpipe abruptly shut down.")
Expand Down
8 changes: 8 additions & 0 deletions punchpipe/control/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,14 @@ class Health(Base):
num_pids = Column(Integer, nullable=False)


class PacketHistory(Base):
__tablename__ = "packet_history"
id = Column(Integer, primary_key=True)
datetime = Column(DATETIME(fsp=6), nullable=False)
num_images_succeeded = Column(Integer, nullable=False)
num_images_failed = Column(Integer, nullable=False)


def get_closest_eng_packets(table, timestamp, spacecraft_id, session):
# find the closest events which are greater/less than the timestamp
gt_event = session.query(table).filter(table.spacecraft_id == spacecraft_id).filter(table.timestamp > timestamp).order_by(table.timestamp.asc()).first()
Expand Down
5 changes: 4 additions & 1 deletion punchpipe/control/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from prefect import task
from prefect.variables import Variable
from prefect_sqlalchemy import SqlAlchemyConnector
from punchbowl.data import get_base_file_name, write_ndcube_to_fits
from punchbowl.data import get_base_file_name, write_ndcube_to_fits, write_ndcube_to_jp2
from sqlalchemy import or_
from sqlalchemy.orm import Session
from yaml.loader import FullLoader
Expand Down Expand Up @@ -48,6 +48,9 @@ def write_file(data: NDCube, corresponding_file_db_entry, pipeline_config) -> No
write_ndcube_to_fits(data, output_filename)
corresponding_file_db_entry.state = "created"

layer = 0 if len(data.data.shape) > 2 else None
write_ndcube_to_jp2(data, output_filename.replace(".fits", ".jp2"), layer=layer)


def match_data_with_file_db_entry(data: NDCube, file_db_entry_list):
# figure out which file_db_entry this corresponds to
Expand Down
24 changes: 13 additions & 11 deletions punchpipe/flows/level0.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from sqlalchemy import and_

from punchpipe import __version__ as software_version
from punchpipe.control.db import File, SciPacket, TLMFiles
from punchpipe.control.db import File, PacketHistory, SciPacket, TLMFiles
from punchpipe.control.util import get_database_session, load_pipeline_configuration
from punchpipe.level0.ccsds import unpack_compression_settings
from punchpipe.level0.core import (
Expand Down Expand Up @@ -76,6 +76,7 @@ def level0_form_images(session=None, pipeline_config_path=None):

already_parsed_tlms = {} # tlm_path maps to the parsed contents

skip_count, success_count = 0, 0
for spacecraft in distinct_spacecraft:
errors = []

Expand Down Expand Up @@ -107,12 +108,12 @@ def level0_form_images(session=None, pipeline_config_path=None):
tlm_content_index = needed_tlm_paths.index(tlm_id_to_tlm_path[packet_entry.source_tlm_file])
selected_tlm_contents = tlm_contents[tlm_content_index]
ordered_image_content.append(selected_tlm_contents[0x20]['SCI_XFI_IMG_DATA'][packet_entry.packet_num])
ordered_image_content = np.concatenate(ordered_image_content)

# Get the proper image
skip_image = False
if image_compression[0]['CMP_BYP'] == 0 and image_compression[0]['JPEG'] == 1: # this assumes the image compression is static for an image
try:
ordered_image_content = np.concatenate(ordered_image_content)
image = form_from_jpeg_compressed(ordered_image_content)
except (RuntimeError, ValueError):
skip_image = True
Expand All @@ -123,6 +124,7 @@ def level0_form_images(session=None, pipeline_config_path=None):
errors.append(error)
elif image_compression[0]['CMP_BYP'] == 1:
try:
ordered_image_content = np.concatenate(ordered_image_content)
logger.info(f"Packet shape {ordered_image_content.shape[0]}", )
image = form_from_raw(ordered_image_content)
except (RuntimeError, ValueError):
Expand All @@ -136,15 +138,6 @@ def level0_form_images(session=None, pipeline_config_path=None):
skip_image = True
print("Not implemented")

# check the quality of the image
# if not skip_image and not image_is_okay(image, config):
# skip_image = True
# error = {'start_time': image_packets_entries[0].timestamp.strftime("%Y-%m-%d %h:%m:%s"),
# 'start_block': image_packets_entries[0].flash_block,
# 'replay_length': image_packets_entries[-1].flash_block
# - image_packets_entries[0].flash_block}
# errors.append(error)

if not skip_image:
spacecraft_secrets = SpacecraftMapping.load("spacecraft-ids").mapping.get_secret_value()
moc_index = spacecraft_secrets["moc"].index(image_packets_entries[0].spacecraft_id)
Expand Down Expand Up @@ -181,6 +174,15 @@ def level0_form_images(session=None, pipeline_config_path=None):
image_packets_entries.is_used = True
session.add(l0_db_entry)
session.commit()
success_count += 1
else:
skip_count += 1
history = PacketHistory(datetime=datetime.now(),
num_images_succeeded=success_count,
num_images_failed=skip_count)
session.add(history)
session.commit()

df_errors = pd.DataFrame(errors)
date_str = datetime.now().strftime("%Y_%j")
df_path = os.path.join(config['root'], 'REPLAY', f'PUNCH_{str(spacecraft[0])}_REPLAY_{date_str}.csv')
Expand Down
27 changes: 19 additions & 8 deletions punchpipe/flows/level1.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from prefect import flow, task
from punchbowl.level1.flow import level1_core_flow
from sqlalchemy import and_

from punchpipe import __version__
from punchpipe.control.db import File, Flow
Expand All @@ -16,9 +15,15 @@

@task
def level1_query_ready_files(session, pipeline_config: dict, reference_time=None):
return [[f.file_id] for f in session.query(File).filter(File.file_type.in_(SCIENCE_LEVEL0_TYPE_CODES))
.where(and_(File.state == "created", File.level == "0")).all()]

max_start = pipeline_config['scheduler']['max_start']
ready = [f for f in session.query(File).filter(File.file_type.in_(SCIENCE_LEVEL0_TYPE_CODES))
.filter(File.state == "created").filter(File.level == "0").all()][:max_start*3]
actually_ready = []
for f in ready:
if (get_psf_model_path(f, pipeline_config, session=session) is not None
and get_psf_model_path(f, pipeline_config, session=session) is not None):
actually_ready.append([f.file_id])
return actually_ready

@task
def get_psf_model_path(level0_file, pipeline_config: dict, session=None, reference_time=None):
Expand All @@ -32,7 +37,7 @@ def get_psf_model_path(level0_file, pipeline_config: dict, session=None, referen
.filter(File.observatory == level0_file.observatory)
.where(File.date_obs <= level0_file.date_obs)
.order_by(File.date_obs.desc()).first())
return os.path.join(best_model.directory(pipeline_config['root']), best_model.filename())
return best_model

@task
def get_quartic_model_path(level0_file, pipeline_config: dict, session=None, reference_time=None):
Expand All @@ -41,7 +46,7 @@ def get_quartic_model_path(level0_file, pipeline_config: dict, session=None, ref
.filter(File.observatory == level0_file.observatory)
.where(File.date_obs <= level0_file.date_obs)
.order_by(File.date_obs.desc()).first())
return os.path.join(best_model.directory(pipeline_config['root']), best_model.filename())
return best_model

@task
def level1_construct_flow_info(level0_files: list[File], level1_files: File,
Expand All @@ -50,14 +55,20 @@ def level1_construct_flow_info(level0_files: list[File], level1_files: File,
state = "planned"
creation_time = datetime.now()
priority = pipeline_config["levels"][flow_type]["priority"]["initial"]

best_psf_model = get_psf_model_path(level0_files[0], pipeline_config, session=session)
best_quartic_model = get_quartic_model_path(level0_files[0], pipeline_config, session=session)

call_data = json.dumps(
{
"input_data": [
os.path.join(level0_file.directory(pipeline_config["root"]), level0_file.filename())
for level0_file in level0_files
],
"psf_model_path": get_psf_model_path(level0_files[0], pipeline_config, session=session),
"quartic_coefficient_path": get_quartic_model_path(level0_files[0], pipeline_config, session=session),
"psf_model_path": os.path.join(best_psf_model.directory(pipeline_config['root']),
best_psf_model.filename()),
"quartic_coefficient_path": os.path.join(best_quartic_model.directory(pipeline_config['root']),
best_quartic_model.filename()),
}
)
return Flow(
Expand Down
25 changes: 20 additions & 5 deletions punchpipe/monitor/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,9 @@ def update_flows(n, page_current, page_size, sort_by, filter):

def create_card_content(level: int, status: str):
return [
# dbc.CardHeader(f"Level {level} Flow Pressure"),
dbc.CardBody(
[
html.H5(f"Level {level} Flow Pressure", className="card-title"),
html.H5(f"Level {level} Status", className="card-title"),
html.P(
status,
className="card-text",
Expand All @@ -143,14 +142,30 @@ def create_card_content(level: int, status: str):
Input('interval-component', 'n_intervals'),
)
def update_cards(n):
now = datetime.now()
with get_database_session() as session:
reference_time = now - timedelta(hours=24)
query = (f"SELECT SUM(num_images_succeeded), SUM(num_images_failed) "
f"FROM packet_history WHERE datetime > '{reference_time}';")
df = pd.read_sql_query(query, session.connection())
num_l0_success = df['SUM(num_images_succeeded)'].sum()
num_l0_fails = df['SUM(num_images_failed)'].sum()
l0_fraction = num_l0_success / (1 + num_l0_success + num_l0_fails) # add one to avoid div by 0 errors
if l0_fraction > 0.95:
l0_status = f"Good ({num_l0_success} : {num_l0_fails})"
l0_color = "success"
else:
l0_status = f"Bad ({num_l0_success} : {num_l0_fails})"
l0_color = "danger"

cards = html.Div(
[
dbc.Row(
[
dbc.Col(dbc.Card(create_card_content(0, "Good"), color="success", inverse=True)),
dbc.Col(dbc.Card(create_card_content(0, l0_status), color=l0_color, inverse=True)),
dbc.Col(dbc.Card(create_card_content(1, "Good"), color="success", inverse=True)),
dbc.Col(dbc.Card(create_card_content(2, "Warning"), color="warning", inverse=True)),
dbc.Col(dbc.Card(create_card_content(3, "Bad"), color="danger", inverse=True)),
dbc.Col(dbc.Card(create_card_content(2, "Good"), color="success", inverse=True)),
dbc.Col(dbc.Card(create_card_content(3, "Good"), color="success", inverse=True)),
],
className="mb-4",
),
Expand Down

0 comments on commit 1b3be82

Please sign in to comment.