From 1b3be821c1f7710baa50e73a65d8ae73777597c6 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Wed, 11 Dec 2024 02:30:07 -0700 Subject: [PATCH] Improve l0 (#88) * avoid concatenate empty error * add packet history * output quicklook always * make sure l1 scheduler doesn't fail on no model * provide a session to l1 * return the file_id * add l0 schedules * dynamically update cards * limit the number of files launched at one time * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- punchpipe/cli.py | 12 ++++++------ punchpipe/control/db.py | 8 ++++++++ punchpipe/control/util.py | 5 ++++- punchpipe/flows/level0.py | 24 +++++++++++++----------- punchpipe/flows/level1.py | 27 +++++++++++++++++++-------- punchpipe/monitor/app.py | 25 ++++++++++++++++++++----- 6 files changed, 70 insertions(+), 31 deletions(-) diff --git a/punchpipe/cli.py b/punchpipe/cli.py index 84546e1..a8f9357 100644 --- a/punchpipe/cli.py +++ b/punchpipe/cli.py @@ -54,14 +54,14 @@ def serve_flows(configuration_path): level0_ingest_raw_packets_deployment = level0_ingest_raw_packets.to_deployment(name="level0_ingest_raw_packets", description="Ingest raw packets.", - #cron="* * * * *", + cron="*/5 * * * *", parameters={ "pipeline_config_path": configuration_path}, tags=['L0']) level0_form_images_deployment = level0_form_images.to_deployment(name="level0_form_images", description="Form images from packets.", - #cron="* * * * *", + cron="*/5 * * * *", tags=['L0'], parameters={"pipeline_config_path": configuration_path}) @@ -140,8 +140,8 @@ def serve_flows(configuration_path): cron=config['levels']['construct_starfield_background_process_flow'].get("schedule", "* * * * *"), tags=["L3", "scheduler"], parameters={"pipeline_config_path": configuration_path} - ) + starfield_process_dep = starfield_process_flow.to_deployment(name="construct_starfield_background_process_flow", description="Create starfield background.", tags=["L3", "process"], @@ -196,7 +196,7 @@ def run(configuration_path): try: prefect_process = subprocess.Popen(["prefect", "server", "start"], stdout=f, stderr=subprocess.STDOUT) - time.sleep(10) + time.sleep(5) monitor_process = subprocess.Popen(["gunicorn", "-b", "0.0.0.0:8050", "--chdir", THIS_DIR, @@ -213,14 +213,14 @@ def run(configuration_path): except KeyboardInterrupt: print("Shutting down.") prefect_process.terminate() - time.sleep(10) + time.sleep(5) monitor_process.terminate() print() print("punchpipe safely shut down.") except Exception as e: print(f"Received error: {e}") prefect_process.terminate() - time.sleep(10) + time.sleep(5) monitor_process.terminate() print() print("punchpipe abruptly shut down.") diff --git a/punchpipe/control/db.py b/punchpipe/control/db.py index 3dcb7a6..fdf5b57 100644 --- a/punchpipe/control/db.py +++ b/punchpipe/control/db.py @@ -204,6 +204,14 @@ class Health(Base): num_pids = Column(Integer, nullable=False) +class PacketHistory(Base): + __tablename__ = "packet_history" + id = Column(Integer, primary_key=True) + datetime = Column(DATETIME(fsp=6), nullable=False) + num_images_succeeded = Column(Integer, nullable=False) + num_images_failed = Column(Integer, nullable=False) + + def get_closest_eng_packets(table, timestamp, spacecraft_id, session): # find the closest events which are greater/less than the timestamp gt_event = session.query(table).filter(table.spacecraft_id == spacecraft_id).filter(table.timestamp > timestamp).order_by(table.timestamp.asc()).first() diff --git a/punchpipe/control/util.py b/punchpipe/control/util.py index eaeaf67..eac61c2 100644 --- a/punchpipe/control/util.py +++ b/punchpipe/control/util.py @@ -6,7 +6,7 @@ from prefect import task from prefect.variables import Variable from prefect_sqlalchemy import SqlAlchemyConnector -from punchbowl.data import get_base_file_name, write_ndcube_to_fits +from punchbowl.data import get_base_file_name, write_ndcube_to_fits, write_ndcube_to_jp2 from sqlalchemy import or_ from sqlalchemy.orm import Session from yaml.loader import FullLoader @@ -48,6 +48,9 @@ def write_file(data: NDCube, corresponding_file_db_entry, pipeline_config) -> No write_ndcube_to_fits(data, output_filename) corresponding_file_db_entry.state = "created" + layer = 0 if len(data.data.shape) > 2 else None + write_ndcube_to_jp2(data, output_filename.replace(".fits", ".jp2"), layer=layer) + def match_data_with_file_db_entry(data: NDCube, file_db_entry_list): # figure out which file_db_entry this corresponds to diff --git a/punchpipe/flows/level0.py b/punchpipe/flows/level0.py index 60b4f91..526f5ab 100644 --- a/punchpipe/flows/level0.py +++ b/punchpipe/flows/level0.py @@ -13,7 +13,7 @@ from sqlalchemy import and_ from punchpipe import __version__ as software_version -from punchpipe.control.db import File, SciPacket, TLMFiles +from punchpipe.control.db import File, PacketHistory, SciPacket, TLMFiles from punchpipe.control.util import get_database_session, load_pipeline_configuration from punchpipe.level0.ccsds import unpack_compression_settings from punchpipe.level0.core import ( @@ -76,6 +76,7 @@ def level0_form_images(session=None, pipeline_config_path=None): already_parsed_tlms = {} # tlm_path maps to the parsed contents + skip_count, success_count = 0, 0 for spacecraft in distinct_spacecraft: errors = [] @@ -107,12 +108,12 @@ def level0_form_images(session=None, pipeline_config_path=None): tlm_content_index = needed_tlm_paths.index(tlm_id_to_tlm_path[packet_entry.source_tlm_file]) selected_tlm_contents = tlm_contents[tlm_content_index] ordered_image_content.append(selected_tlm_contents[0x20]['SCI_XFI_IMG_DATA'][packet_entry.packet_num]) - ordered_image_content = np.concatenate(ordered_image_content) # Get the proper image skip_image = False if image_compression[0]['CMP_BYP'] == 0 and image_compression[0]['JPEG'] == 1: # this assumes the image compression is static for an image try: + ordered_image_content = np.concatenate(ordered_image_content) image = form_from_jpeg_compressed(ordered_image_content) except (RuntimeError, ValueError): skip_image = True @@ -123,6 +124,7 @@ def level0_form_images(session=None, pipeline_config_path=None): errors.append(error) elif image_compression[0]['CMP_BYP'] == 1: try: + ordered_image_content = np.concatenate(ordered_image_content) logger.info(f"Packet shape {ordered_image_content.shape[0]}", ) image = form_from_raw(ordered_image_content) except (RuntimeError, ValueError): @@ -136,15 +138,6 @@ def level0_form_images(session=None, pipeline_config_path=None): skip_image = True print("Not implemented") - # check the quality of the image - # if not skip_image and not image_is_okay(image, config): - # skip_image = True - # error = {'start_time': image_packets_entries[0].timestamp.strftime("%Y-%m-%d %h:%m:%s"), - # 'start_block': image_packets_entries[0].flash_block, - # 'replay_length': image_packets_entries[-1].flash_block - # - image_packets_entries[0].flash_block} - # errors.append(error) - if not skip_image: spacecraft_secrets = SpacecraftMapping.load("spacecraft-ids").mapping.get_secret_value() moc_index = spacecraft_secrets["moc"].index(image_packets_entries[0].spacecraft_id) @@ -181,6 +174,15 @@ def level0_form_images(session=None, pipeline_config_path=None): image_packets_entries.is_used = True session.add(l0_db_entry) session.commit() + success_count += 1 + else: + skip_count += 1 + history = PacketHistory(datetime=datetime.now(), + num_images_succeeded=success_count, + num_images_failed=skip_count) + session.add(history) + session.commit() + df_errors = pd.DataFrame(errors) date_str = datetime.now().strftime("%Y_%j") df_path = os.path.join(config['root'], 'REPLAY', f'PUNCH_{str(spacecraft[0])}_REPLAY_{date_str}.csv') diff --git a/punchpipe/flows/level1.py b/punchpipe/flows/level1.py index b4d9e44..4399c08 100644 --- a/punchpipe/flows/level1.py +++ b/punchpipe/flows/level1.py @@ -5,7 +5,6 @@ from prefect import flow, task from punchbowl.level1.flow import level1_core_flow -from sqlalchemy import and_ from punchpipe import __version__ from punchpipe.control.db import File, Flow @@ -16,9 +15,15 @@ @task def level1_query_ready_files(session, pipeline_config: dict, reference_time=None): - return [[f.file_id] for f in session.query(File).filter(File.file_type.in_(SCIENCE_LEVEL0_TYPE_CODES)) - .where(and_(File.state == "created", File.level == "0")).all()] - + max_start = pipeline_config['scheduler']['max_start'] + ready = [f for f in session.query(File).filter(File.file_type.in_(SCIENCE_LEVEL0_TYPE_CODES)) + .filter(File.state == "created").filter(File.level == "0").all()][:max_start*3] + actually_ready = [] + for f in ready: + if (get_psf_model_path(f, pipeline_config, session=session) is not None + and get_psf_model_path(f, pipeline_config, session=session) is not None): + actually_ready.append([f.file_id]) + return actually_ready @task def get_psf_model_path(level0_file, pipeline_config: dict, session=None, reference_time=None): @@ -32,7 +37,7 @@ def get_psf_model_path(level0_file, pipeline_config: dict, session=None, referen .filter(File.observatory == level0_file.observatory) .where(File.date_obs <= level0_file.date_obs) .order_by(File.date_obs.desc()).first()) - return os.path.join(best_model.directory(pipeline_config['root']), best_model.filename()) + return best_model @task def get_quartic_model_path(level0_file, pipeline_config: dict, session=None, reference_time=None): @@ -41,7 +46,7 @@ def get_quartic_model_path(level0_file, pipeline_config: dict, session=None, ref .filter(File.observatory == level0_file.observatory) .where(File.date_obs <= level0_file.date_obs) .order_by(File.date_obs.desc()).first()) - return os.path.join(best_model.directory(pipeline_config['root']), best_model.filename()) + return best_model @task def level1_construct_flow_info(level0_files: list[File], level1_files: File, @@ -50,14 +55,20 @@ def level1_construct_flow_info(level0_files: list[File], level1_files: File, state = "planned" creation_time = datetime.now() priority = pipeline_config["levels"][flow_type]["priority"]["initial"] + + best_psf_model = get_psf_model_path(level0_files[0], pipeline_config, session=session) + best_quartic_model = get_quartic_model_path(level0_files[0], pipeline_config, session=session) + call_data = json.dumps( { "input_data": [ os.path.join(level0_file.directory(pipeline_config["root"]), level0_file.filename()) for level0_file in level0_files ], - "psf_model_path": get_psf_model_path(level0_files[0], pipeline_config, session=session), - "quartic_coefficient_path": get_quartic_model_path(level0_files[0], pipeline_config, session=session), + "psf_model_path": os.path.join(best_psf_model.directory(pipeline_config['root']), + best_psf_model.filename()), + "quartic_coefficient_path": os.path.join(best_quartic_model.directory(pipeline_config['root']), + best_quartic_model.filename()), } ) return Flow( diff --git a/punchpipe/monitor/app.py b/punchpipe/monitor/app.py index d32058e..7c079f2 100644 --- a/punchpipe/monitor/app.py +++ b/punchpipe/monitor/app.py @@ -126,10 +126,9 @@ def update_flows(n, page_current, page_size, sort_by, filter): def create_card_content(level: int, status: str): return [ - # dbc.CardHeader(f"Level {level} Flow Pressure"), dbc.CardBody( [ - html.H5(f"Level {level} Flow Pressure", className="card-title"), + html.H5(f"Level {level} Status", className="card-title"), html.P( status, className="card-text", @@ -143,14 +142,30 @@ def create_card_content(level: int, status: str): Input('interval-component', 'n_intervals'), ) def update_cards(n): + now = datetime.now() + with get_database_session() as session: + reference_time = now - timedelta(hours=24) + query = (f"SELECT SUM(num_images_succeeded), SUM(num_images_failed) " + f"FROM packet_history WHERE datetime > '{reference_time}';") + df = pd.read_sql_query(query, session.connection()) + num_l0_success = df['SUM(num_images_succeeded)'].sum() + num_l0_fails = df['SUM(num_images_failed)'].sum() + l0_fraction = num_l0_success / (1 + num_l0_success + num_l0_fails) # add one to avoid div by 0 errors + if l0_fraction > 0.95: + l0_status = f"Good ({num_l0_success} : {num_l0_fails})" + l0_color = "success" + else: + l0_status = f"Bad ({num_l0_success} : {num_l0_fails})" + l0_color = "danger" + cards = html.Div( [ dbc.Row( [ - dbc.Col(dbc.Card(create_card_content(0, "Good"), color="success", inverse=True)), + dbc.Col(dbc.Card(create_card_content(0, l0_status), color=l0_color, inverse=True)), dbc.Col(dbc.Card(create_card_content(1, "Good"), color="success", inverse=True)), - dbc.Col(dbc.Card(create_card_content(2, "Warning"), color="warning", inverse=True)), - dbc.Col(dbc.Card(create_card_content(3, "Bad"), color="danger", inverse=True)), + dbc.Col(dbc.Card(create_card_content(2, "Good"), color="success", inverse=True)), + dbc.Col(dbc.Card(create_card_content(3, "Good"), color="success", inverse=True)), ], className="mb-4", ),