From b18b9b086331ea2c886b9e63f6cf98ad08fe5310 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Mon, 11 Nov 2024 08:53:34 -0700 Subject: [PATCH 01/28] adds health monitoring --- punchpipe/cli.py | 11 ++- punchpipe/controlsegment/db.py | 13 +++- punchpipe/monitor/app.py | 128 +++++++++++++++++---------------- pyproject.toml | 3 +- 4 files changed, 89 insertions(+), 66 deletions(-) diff --git a/punchpipe/cli.py b/punchpipe/cli.py index ab39bc1..8dc4770 100644 --- a/punchpipe/cli.py +++ b/punchpipe/cli.py @@ -2,11 +2,18 @@ from waitress import serve import subprocess -from .monitor.app import server +from .monitor.app import create_app @click.command def run(): print("Launching punchpipe monitor on http://localhost:8050/.") subprocess.Popen(["prefect", "server", "start"]) - serve(server, host='0.0.0.0', port=8050) + # serve(server, host='0.0.0.0', port=8050) print("\npunchpipe Prefect flows must be stopped manually in Prefect.") + + app = create_app() + app.run_server(debug=False, port=8051) + + # if not math.isinf(menu.duration): + # time.sleep(menu.duration) + # sys.exit(0) \ No newline at end of file diff --git a/punchpipe/controlsegment/db.py b/punchpipe/controlsegment/db.py index b460a1b..026ad5f 100644 --- a/punchpipe/controlsegment/db.py +++ b/punchpipe/controlsegment/db.py @@ -1,6 +1,6 @@ import os -from sqlalchemy import TEXT, Column, DateTime, Integer, String, Boolean +from sqlalchemy import TEXT, Column, DateTime, Integer, String, Boolean, Float from sqlalchemy.orm import declarative_base Base = declarative_base() @@ -103,6 +103,17 @@ class EngPacket(Base): l0_version = Column(Integer) +class Health(Base): + __tablename__ = "health" + health_id = Column(Integer, primary_key=True) + datetime = Column(DateTime, nullable=False) + cpu_usage = Column(Float, nullable=False) + memory_usage = Column(Float, nullable=False) + memory_percentage = Column(Float, nullable=False) + disk_usage = Column(Float, nullable=False) + disk_percentage = Column(Float, nullable=False) + num_pids = Column(Integer, nullable=False) + # def json_numpy_obj_hook(dct): # """Decodes a previously encoded numpy ndarray with proper shape and dtype. # diff --git a/punchpipe/monitor/app.py b/punchpipe/monitor/app.py index 6cc3d14..3740d67 100644 --- a/punchpipe/monitor/app.py +++ b/punchpipe/monitor/app.py @@ -1,75 +1,79 @@ -import json -from datetime import date, timedelta +from datetime import datetime, timedelta +import dash_bootstrap_components as dbc import pandas as pd +from dash import Dash, Input, Output, callback, dash_table, dcc, html +import psutil import plotly.express as px -from dash import Dash, Input, Output, dash_table, dcc, html -from dash.exceptions import PreventUpdate -app = Dash(__name__) +from punchpipe.controlsegment.db import Health +from punchpipe.controlsegment.util import get_database_session -df = pd.read_csv( - "/Users/jhughes/Desktop/repos/punchpipe/punchpipe/monitor/sample.csv", - parse_dates=["creation_time", "start_time", "end_time"], -) -df.set_index("flow_id") -df["duration"] = (df["end_time"] - df["start_time"]).map(timedelta.total_seconds) +REFRESH_RATE = 60 # seconds -fig = px.histogram(df, x="duration") - -app.layout = html.Div( - [ - dcc.DatePickerRange( - id="date_picker_range", - min_date_allowed=date(2022, 1, 1), - max_date_allowed=date.today(), - initial_visible_month=date(2022, 1, 1), - start_date=date.today() - timedelta(days=1), - end_date=date.today(), - ), - dcc.Graph(id="duration", figure=fig), - dash_table.DataTable( - id="flow_table", - data=df.to_dict("records"), - columns=[{"name": i, "id": i} for i in df.columns], - page_action="none", - style_table={"height": "300px", "overflowY": "auto"}, - sort_action="native", - ), - html.Pre(id="relayout-data"), - ] -) +column_names = ["call_data", "creation_time", "end_time", "flow_id", "flow_level", "flow_run_id", + "flow_run_name", "flow_type", "priority", "start_time", "state"] +schedule_columns =[{'name': v, 'id': v} for v in column_names] -@app.callback( - Output("duration", "figure"), Input("date_picker_range", "start_date"), Input("date_picker_range", "end_date") -) -def update_histogram(start_date, end_date): - filtered_df = df[(df["start_time"] > start_date) * (df["end_time"] < end_date)] - fig = px.histogram(filtered_df, x="duration") - fig.update_layout(transition_duration=500) - - return fig - +def create_app(): + app = Dash(external_stylesheets=[dbc.themes.BOOTSTRAP]) + app.layout = html.Div([ + dcc.Graph(id='machine-graph'), + dcc.Dropdown( + id="machine-stat", + options=["cpu_usage", "memory_usage", "memory_percentage", "disk_usage", "disk_percentage", "num_pids"], + value="cpu_usage", + clearable=False, + ), + dash_table.DataTable(id='flows', + data=pd.DataFrame({name: [] for name in column_names}).to_dict('records'), + columns=schedule_columns), + dcc.Interval( + id='interval-component', + interval=REFRESH_RATE * 1000, # in milliseconds + n_intervals=0) + ]) -@app.callback( - Output("flow_table", "data"), Input("date_picker_range", "start_date"), Input("date_picker_range", "end_date") -) -def update_table(start_date, end_date): - return df[(df["start_time"] > start_date) * (df["end_time"] < end_date)].to_dict("records") + @callback( + Output('files', 'data'), + Input('interval-component', 'n_intervals'), + ) + def update_flows(n): + query = "SELECT * FROM flows;" + with get_database_session() as session: + df = pd.read_sql_query(query, session.connection()) + return df.to_dict('records') + @callback( + Output('machine-graph', 'figure'), + Input('interval-component', 'n_intervals'), + Input('machine-stat', 'value'), + ) + def update_machine_stats(n, machine_stat): + now = datetime.now() + cpu_usage = psutil.cpu_percent(interval=None) + memory_usage = psutil.virtual_memory().used + memory_percentage = psutil.virtual_memory().percent + disk_usage = psutil.disk_usage('/').used + disk_percentage = psutil.disk_usage('/').percent + num_pids = len(psutil.pids()) -@app.callback(Output("relayout-data", "children"), Input("duration", "relayoutData")) -def display_relayout_data(relayoutData): - if relayoutData is None: - raise PreventUpdate - elif "xaxis.range[0]" not in relayoutData.keys(): - raise PreventUpdate - else: - # get the relevant axis ranges, you can use to drop columns from the datatable - print(relayoutData, type(relayoutData)) - return json.dumps(relayoutData, indent=2) + with get_database_session() as session: + new_health_entry = Health(datetime=now, + cpu_usage=cpu_usage, + memory_usage=memory_usage, + memory_percentage=memory_percentage, + disk_usage=disk_usage, + disk_percentage=disk_percentage, + num_pids=num_pids) + session.add(new_health_entry) + session.commit() + reference_time = now - timedelta(hours=24) + query = f"SELECT datetime, {machine_stat} FROM health WHERE datetime > '{reference_time}';" + df = pd.read_sql_query(query, session.connection()) + fig = px.line(df, x='datetime', y=machine_stat) -if __name__ == "__main__": - app.run_server(debug=False) + return fig + return app diff --git a/pyproject.toml b/pyproject.toml index 78180e1..59e5e82 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,8 @@ dependencies = [ "pyyaml", "click", "waitress", - "pylibjpeg[libjpeg]" + "pylibjpeg[libjpeg]", + "psutil" ] requires-python = ">=3.11" authors = [ From f923b3817db2e7c73e5de82bf12efce99c6ddd62 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Mon, 11 Nov 2024 08:55:06 -0700 Subject: [PATCH 02/28] make run command, remove waitress --- punchpipe/cli.py | 12 +++++------- pyproject.toml | 1 - 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/punchpipe/cli.py b/punchpipe/cli.py index 8dc4770..08afd32 100644 --- a/punchpipe/cli.py +++ b/punchpipe/cli.py @@ -1,19 +1,17 @@ import click -from waitress import serve import subprocess from .monitor.app import create_app -@click.command +@click.group +def main(): + """Run the PUNCH automated pipeline""" + +@main.command def run(): print("Launching punchpipe monitor on http://localhost:8050/.") subprocess.Popen(["prefect", "server", "start"]) - # serve(server, host='0.0.0.0', port=8050) print("\npunchpipe Prefect flows must be stopped manually in Prefect.") app = create_app() app.run_server(debug=False, port=8051) - - # if not math.isinf(menu.duration): - # time.sleep(menu.duration) - # sys.exit(0) \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 59e5e82..b6f43bd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,7 +23,6 @@ dependencies = [ "plotly", "pyyaml", "click", - "waitress", "pylibjpeg[libjpeg]", "psutil" ] From 7b8290bb0d935ebbcd2ddac49226479a6ab81284 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 11 Nov 2024 15:57:34 +0000 Subject: [PATCH 03/28] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- README.md | 5 ++--- config.yaml | 2 +- deploy.py | 3 ++- docs/source/conf.py | 1 + punchpipe/cli.py | 4 +++- punchpipe/controlsegment/db.py | 4 ++-- punchpipe/controlsegment/launcher.py | 2 +- punchpipe/controlsegment/tests/__init__.py | 2 +- punchpipe/controlsegment/tests/test_launcher.py | 10 ++++++---- punchpipe/controlsegment/tests/test_processor.py | 4 ++-- punchpipe/controlsegment/util.py | 10 +++++----- punchpipe/flows/level1.py | 2 +- punchpipe/flows/level2.py | 4 ++-- punchpipe/flows/level3.py | 4 ++-- punchpipe/flows/levelq.py | 4 ++-- punchpipe/flows/tests/test_level1.py | 12 +++++++----- punchpipe/flows/tests/test_level2.py | 12 +++++++----- punchpipe/level0/ccsds.py | 3 +-- punchpipe/level0/decode_sqrt.py | 3 +-- punchpipe/level0/flow.py | 12 ++++++------ punchpipe/level0/tests/test_decode_sqrt.py | 4 ++-- punchpipe/monitor/app.py | 4 ++-- scripts/create_db.py | 3 +-- scripts/prepare_packet_definition_csv_files.py | 3 ++- scripts/test_create_ready_level0.py | 7 +++---- 25 files changed, 65 insertions(+), 59 deletions(-) diff --git a/README.md b/README.md index 311b14b..aff838a 100644 --- a/README.md +++ b/README.md @@ -14,11 +14,11 @@ e.g. `level1` means the Level 0 to Level 1 segment. ## Accessing the data -Coming soon. +Coming soon. ## First-time setup -Coming soon. +Coming soon. ## Running @@ -33,4 +33,3 @@ Please open an issue or discussion on this repo. We encourage all contributions. If you have a problem with the code or would like to see a new feature, please open an issue. Or you can submit a pull request. - diff --git a/config.yaml b/config.yaml index cdf68ec..947660f 100644 --- a/config.yaml +++ b/config.yaml @@ -48,4 +48,4 @@ levels: seconds: [1] escalation: [10000] schedule: - options: \ No newline at end of file + options: diff --git a/deploy.py b/deploy.py index ecebeda..64e2df8 100644 --- a/deploy.py +++ b/deploy.py @@ -1,10 +1,11 @@ from prefect import serve + from punchpipe.controlsegment.launcher import launcher_flow +from punchpipe.deliver import create_noaa_delivery from punchpipe.flows.level1 import level1_process_flow, level1_scheduler_flow from punchpipe.flows.level2 import level2_process_flow, level2_scheduler_flow from punchpipe.flows.level3 import level3_PTM_process_flow, level3_PTM_scheduler_flow from punchpipe.flows.levelq import levelq_process_flow, levelq_scheduler_flow -from punchpipe.deliver import create_noaa_delivery launcher_deployment = launcher_flow.to_deployment(name="launcher-deployment", description="Launch a pipeline segment.", diff --git a/docs/source/conf.py b/docs/source/conf.py index dc86360..143532d 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -1,4 +1,5 @@ from importlib.metadata import version as get_version + from packaging.version import Version # Configuration file for the Sphinx documentation builder. diff --git a/punchpipe/cli.py b/punchpipe/cli.py index 08afd32..aeadfa4 100644 --- a/punchpipe/cli.py +++ b/punchpipe/cli.py @@ -1,8 +1,10 @@ -import click import subprocess +import click + from .monitor.app import create_app + @click.group def main(): """Run the PUNCH automated pipeline""" diff --git a/punchpipe/controlsegment/db.py b/punchpipe/controlsegment/db.py index 026ad5f..28c4120 100644 --- a/punchpipe/controlsegment/db.py +++ b/punchpipe/controlsegment/db.py @@ -1,6 +1,6 @@ import os -from sqlalchemy import TEXT, Column, DateTime, Integer, String, Boolean, Float +from sqlalchemy import TEXT, Boolean, Column, DateTime, Float, Integer, String from sqlalchemy.orm import declarative_base Base = declarative_base() @@ -123,4 +123,4 @@ class Health(Base): # if isinstance(dct, dict) and '__ndarray__' in dct: # data = base64.b64decode(dct['__ndarray__']) # return np.frombuffer(data, dct['dtype']).reshape(dct['shape']) -# return dct \ No newline at end of file +# return dct diff --git a/punchpipe/controlsegment/launcher.py b/punchpipe/controlsegment/launcher.py index c3f7af5..3be06a1 100644 --- a/punchpipe/controlsegment/launcher.py +++ b/punchpipe/controlsegment/launcher.py @@ -1,5 +1,5 @@ -from datetime import datetime, timedelta from typing import List +from datetime import datetime, timedelta from prefect import flow, get_run_logger, task from prefect.client import get_client diff --git a/punchpipe/controlsegment/tests/__init__.py b/punchpipe/controlsegment/tests/__init__.py index 3026fb1..bf1f44d 100644 --- a/punchpipe/controlsegment/tests/__init__.py +++ b/punchpipe/controlsegment/tests/__init__.py @@ -4,8 +4,8 @@ import pytest from astropy.nddata import StdDevUncertainty from astropy.wcs import WCS -from punchbowl.data import NormalizedMetadata from ndcube import NDCube +from punchbowl.data import NormalizedMetadata from punchpipe.controlsegment.db import File from punchpipe.controlsegment.util import match_data_with_file_db_entry diff --git a/punchpipe/controlsegment/tests/test_launcher.py b/punchpipe/controlsegment/tests/test_launcher.py index 6ff6bd5..6ab6f2b 100644 --- a/punchpipe/controlsegment/tests/test_launcher.py +++ b/punchpipe/controlsegment/tests/test_launcher.py @@ -7,10 +7,12 @@ from pytest_mock_resources import create_mysql_fixture from punchpipe.controlsegment.db import Base, File, Flow -from punchpipe.controlsegment.launcher import (count_running_flows, - escalate_long_waiting_flows, - filter_for_launchable_flows, - gather_planned_flows) +from punchpipe.controlsegment.launcher import ( + count_running_flows, + escalate_long_waiting_flows, + filter_for_launchable_flows, + gather_planned_flows, +) from punchpipe.controlsegment.util import load_pipeline_configuration TEST_DIR = os.path.dirname(__file__) diff --git a/punchpipe/controlsegment/tests/test_processor.py b/punchpipe/controlsegment/tests/test_processor.py index 6cbdfa2..fb1ec9e 100644 --- a/punchpipe/controlsegment/tests/test_processor.py +++ b/punchpipe/controlsegment/tests/test_processor.py @@ -1,5 +1,5 @@ -import json import os +import json import shutil from datetime import datetime @@ -7,10 +7,10 @@ import pytest from astropy.nddata import StdDevUncertainty from astropy.wcs import WCS +from ndcube import NDCube from prefect import flow from prefect.testing.utilities import prefect_test_harness from punchbowl.data import NormalizedMetadata -from ndcube import NDCube from pytest_mock_resources import create_mysql_fixture from punchpipe.controlsegment.db import Base, File, Flow diff --git a/punchpipe/controlsegment/util.py b/punchpipe/controlsegment/util.py index 7fc00b1..e37bf55 100644 --- a/punchpipe/controlsegment/util.py +++ b/punchpipe/controlsegment/util.py @@ -2,13 +2,13 @@ from datetime import datetime import yaml +from ndcube import NDCube from prefect import task -from sqlalchemy.orm import Session -from yaml.loader import FullLoader from prefect_sqlalchemy import SqlAlchemyConnector -from ndcube import NDCube -from punchbowl.data import write_ndcube_to_fits, get_base_file_name +from punchbowl.data import get_base_file_name, write_ndcube_to_fits from sqlalchemy import or_ +from sqlalchemy.orm import Session +from yaml.loader import FullLoader from punchpipe.controlsegment.db import File @@ -75,4 +75,4 @@ def get_files_in_time_window(level: str, .filter(File.file_type == file_type)) .filter(File.observatory == obs_code)) .filter(File.date_obs > start_time)) - .filter(File.date_obs <= end_time).all()) \ No newline at end of file + .filter(File.date_obs <= end_time).all()) diff --git a/punchpipe/flows/level1.py b/punchpipe/flows/level1.py index 7b4b27d..b4e2e0f 100644 --- a/punchpipe/flows/level1.py +++ b/punchpipe/flows/level1.py @@ -1,5 +1,5 @@ -import json import os +import json import typing as t from datetime import datetime diff --git a/punchpipe/flows/level2.py b/punchpipe/flows/level2.py index 2c4da64..40159a8 100644 --- a/punchpipe/flows/level2.py +++ b/punchpipe/flows/level2.py @@ -1,9 +1,9 @@ -import json import os +import json import typing as t from datetime import datetime -from prefect import flow, task, get_run_logger +from prefect import flow, get_run_logger, task from punchbowl.level2.flow import level2_core_flow from punchpipe import __version__ diff --git a/punchpipe/flows/level3.py b/punchpipe/flows/level3.py index fff041f..af5cf39 100644 --- a/punchpipe/flows/level3.py +++ b/punchpipe/flows/level3.py @@ -1,9 +1,9 @@ -import json import os +import json import typing as t from datetime import datetime, timedelta -from prefect import flow, task, get_run_logger +from prefect import flow, get_run_logger, task from punchbowl.level3.flow import level3_core_flow from sqlalchemy import and_ diff --git a/punchpipe/flows/levelq.py b/punchpipe/flows/levelq.py index 67a5854..4700d17 100644 --- a/punchpipe/flows/levelq.py +++ b/punchpipe/flows/levelq.py @@ -1,9 +1,9 @@ -import json import os +import json import typing as t from datetime import datetime -from prefect import flow, task, get_run_logger +from prefect import flow, get_run_logger, task from punchbowl.level2.flow import levelq_core_flow from punchpipe import __version__ diff --git a/punchpipe/flows/tests/test_level1.py b/punchpipe/flows/tests/test_level1.py index 7e5c8a1..d634487 100644 --- a/punchpipe/flows/tests/test_level1.py +++ b/punchpipe/flows/tests/test_level1.py @@ -1,17 +1,19 @@ import os from datetime import datetime +import pytest from prefect.testing.utilities import prefect_test_harness from pytest_mock_resources import create_mysql_fixture -import pytest from punchpipe import __version__ from punchpipe.controlsegment.db import Base, File, Flow from punchpipe.controlsegment.util import load_pipeline_configuration -from punchpipe.flows.level1 import (level1_construct_file_info, - level1_construct_flow_info, - level1_query_ready_files, - level1_scheduler_flow) +from punchpipe.flows.level1 import ( + level1_construct_file_info, + level1_construct_flow_info, + level1_query_ready_files, + level1_scheduler_flow, +) TEST_DIR = os.path.dirname(__file__) diff --git a/punchpipe/flows/tests/test_level2.py b/punchpipe/flows/tests/test_level2.py index a0a2a6c..61f7b80 100644 --- a/punchpipe/flows/tests/test_level2.py +++ b/punchpipe/flows/tests/test_level2.py @@ -2,17 +2,19 @@ from datetime import datetime from freezegun import freeze_time +from prefect.logging import disable_run_logger from prefect.testing.utilities import prefect_test_harness from pytest_mock_resources import create_mysql_fixture -from prefect.logging import disable_run_logger from punchpipe import __version__ from punchpipe.controlsegment.db import Base, File, Flow from punchpipe.controlsegment.util import load_pipeline_configuration -from punchpipe.flows.level2 import (level2_construct_file_info, - level2_construct_flow_info, - level2_query_ready_files, - level2_scheduler_flow) +from punchpipe.flows.level2 import ( + level2_construct_file_info, + level2_construct_flow_info, + level2_query_ready_files, + level2_scheduler_flow, +) TEST_DIR = os.path.dirname(__file__) diff --git a/punchpipe/level0/ccsds.py b/punchpipe/level0/ccsds.py index cd2913a..7079fd8 100644 --- a/punchpipe/level0/ccsds.py +++ b/punchpipe/level0/ccsds.py @@ -3,9 +3,9 @@ import ccsdspy import numpy as np +import pylibjpeg from ccsdspy.utils import split_by_apid from matplotlib import pyplot as plt -import pylibjpeg PACKET_NAME2APID = { "ENG_LZ": 0x60, @@ -128,4 +128,3 @@ def unpack_acquisition_settings(acq_set_val: "bytes|int"): img = np.concatenate(parsed[0x20]['SCI_XFI_IMG_DATA'][22:44]) img = pylibjpeg.decode(img.tobytes()) - diff --git a/punchpipe/level0/decode_sqrt.py b/punchpipe/level0/decode_sqrt.py index a4f52f2..8bd9376 100644 --- a/punchpipe/level0/decode_sqrt.py +++ b/punchpipe/level0/decode_sqrt.py @@ -2,9 +2,8 @@ from typing import Tuple, Union import numpy as np -from prefect import get_run_logger, task from ndcube import NDCube - +from prefect import get_run_logger, task TABLE_PATH = os.path.dirname(__file__) + "/decoding_tables/" diff --git a/punchpipe/level0/flow.py b/punchpipe/level0/flow.py index 75bc2eb..fd3d0c0 100644 --- a/punchpipe/level0/flow.py +++ b/punchpipe/level0/flow.py @@ -1,19 +1,19 @@ import json import base64 -from datetime import datetime, timedelta import warnings +from datetime import datetime, timedelta import numpy as np +import pylibjpeg +import pymysql import sqlalchemy.exc from prefect import flow, task from sqlalchemy import and_ -import pymysql -import pylibjpeg -from punchpipe.level0.ccsds import process_telemetry_file, PACKET_APID2NAME, unpack_compression_settings -from punchpipe.controlsegment.db import SciPacket, EngPacket -from punchpipe.controlsegment.util import (get_database_session) +from punchpipe.controlsegment.db import EngPacket, SciPacket +from punchpipe.controlsegment.util import get_database_session from punchpipe.error import CCSDSPacketConstructionWarning, CCSDSPacketDatabaseUpdateWarning +from punchpipe.level0.ccsds import PACKET_APID2NAME, process_telemetry_file, unpack_compression_settings class PacketEncoder(json.JSONEncoder): diff --git a/punchpipe/level0/tests/test_decode_sqrt.py b/punchpipe/level0/tests/test_decode_sqrt.py index 78040a8..178e7a7 100644 --- a/punchpipe/level0/tests/test_decode_sqrt.py +++ b/punchpipe/level0/tests/test_decode_sqrt.py @@ -4,11 +4,11 @@ import pytest from astropy.nddata import StdDevUncertainty from astropy.wcs import WCS +from ndcube import NDCube from prefect.logging import disable_run_logger +from punchbowl.data import NormalizedMetadata from pytest import fixture -from ndcube import NDCube -from punchbowl.data import NormalizedMetadata from punchpipe.level0.decode_sqrt import decode_sqrt, decode_sqrt_data, decode_sqrt_simple, encode_sqrt diff --git a/punchpipe/monitor/app.py b/punchpipe/monitor/app.py index 3740d67..58a08c1 100644 --- a/punchpipe/monitor/app.py +++ b/punchpipe/monitor/app.py @@ -2,9 +2,9 @@ import dash_bootstrap_components as dbc import pandas as pd -from dash import Dash, Input, Output, callback, dash_table, dcc, html -import psutil import plotly.express as px +import psutil +from dash import Dash, Input, Output, callback, dash_table, dcc, html from punchpipe.controlsegment.db import Health from punchpipe.controlsegment.util import get_database_session diff --git a/scripts/create_db.py b/scripts/create_db.py index d84cf73..9045f00 100644 --- a/scripts/create_db.py +++ b/scripts/create_db.py @@ -1,8 +1,7 @@ +from prefect_sqlalchemy import SqlAlchemyConnector from sqlalchemy import create_engine, text from punchpipe.controlsegment.db import Base -from prefect_sqlalchemy import SqlAlchemyConnector - if __name__ == "__main__": credentials = SqlAlchemyConnector.load("mariadb-creds") diff --git a/scripts/prepare_packet_definition_csv_files.py b/scripts/prepare_packet_definition_csv_files.py index ded6e6b..71b3f17 100644 --- a/scripts/prepare_packet_definition_csv_files.py +++ b/scripts/prepare_packet_definition_csv_files.py @@ -1,4 +1,5 @@ import os + import click import pandas as pd @@ -41,4 +42,4 @@ def convert_tlm_to_csv(tlm_path: str, output_dir: str) -> None: if __name__ == "__main__": - convert_tlm_to_csv() \ No newline at end of file + convert_tlm_to_csv() diff --git a/scripts/test_create_ready_level0.py b/scripts/test_create_ready_level0.py index 289e6f6..eef8163 100644 --- a/scripts/test_create_ready_level0.py +++ b/scripts/test_create_ready_level0.py @@ -1,20 +1,19 @@ -import json import os +import json from datetime import datetime import numpy as np from astropy.nddata import StdDevUncertainty from astropy.wcs import WCS +from ndcube import NDCube from prefect import flow, task +from prefect_sqlalchemy import SqlAlchemyConnector from punchbowl.data import NormalizedMetadata -from ndcube import NDCube from sqlalchemy.orm import Session -from prefect_sqlalchemy import SqlAlchemyConnector from punchpipe.controlsegment.db import File, Flow - @task def construct_fake_entries(): fake_file = File(level=0, From 547a7c9451cf1f0d296ce13926886e8c9e2ce331 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Mon, 11 Nov 2024 21:44:35 -0700 Subject: [PATCH 04/28] change to dynamic version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index b6f43bd..5fcc582 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta" [project] name = "punchpipe" -version = "0.0.1" +dynamic = ["version"] dependencies = [ "click", "ccsdspy", From 7723ef4e043231fd91a136e3c1205426353fe81d Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Mon, 11 Nov 2024 21:49:42 -0700 Subject: [PATCH 05/28] add database drop function --- scripts/delete_db.py | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 scripts/delete_db.py diff --git a/scripts/delete_db.py b/scripts/delete_db.py new file mode 100644 index 0000000..036f71a --- /dev/null +++ b/scripts/delete_db.py @@ -0,0 +1,9 @@ +from prefect_sqlalchemy import SqlAlchemyConnector +from sqlalchemy import create_engine, text + +from punchpipe.controlsegment.db import Base + +if __name__ == "__main__": + credentials = SqlAlchemyConnector.load("mariadb-creds") + engine = credentials.get_engine() + Base.metadata.drop_all(bind=engine) From ae3fc6f14f747237aac562c4157473d570e09482 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Mon, 11 Nov 2024 21:53:13 -0700 Subject: [PATCH 06/28] remove old files, fix run command --- old_config.yaml | 28 --------- pyproject.toml | 2 +- scripts/deploy.sh | 10 ---- scripts/erase_db.sql | 11 ---- scripts/setup_db.sql | 51 ----------------- scripts/test_create_ready_level0.py | 89 ----------------------------- 6 files changed, 1 insertion(+), 190 deletions(-) delete mode 100644 old_config.yaml delete mode 100755 scripts/deploy.sh delete mode 100644 scripts/erase_db.sql delete mode 100644 scripts/setup_db.sql delete mode 100644 scripts/test_create_ready_level0.py diff --git a/old_config.yaml b/old_config.yaml deleted file mode 100644 index 4aa6b4f..0000000 --- a/old_config.yaml +++ /dev/null @@ -1,28 +0,0 @@ -root: "/home/marcus.hughes/running_test/" - -launcher: - max_seconds_waiting: 100 - max_flows_running: 30 - -priority: - level0_process_flow: - initial: 5 - seconds: [30, 120, 600] - escalation: [10, 20, 30] - level1_process_flow: - initial: 6 - seconds: [ 30, 120, 600 ] - escalation: [ 11, 21, 31] - level2_process_flow: - initial: 7 - seconds: [30, 120, 600 ] - escalation: [ 12, 22, 32 ] - -scheduler: - level2_process_flow: - latency: 3 - window_duration: 3 - -process_options: - pointing: - num_quads: 100 diff --git a/pyproject.toml b/pyproject.toml index 5fcc582..51cec46 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -61,7 +61,7 @@ docs = ["packaging", dev = ["punchpipe[test,docs]", "pre-commit"] [project.scripts] -punchpipe = "punchpipe.cli:run" +punchpipe = "punchpipe.cli:main" [project.urls] #Homepage = "https://example.com" diff --git a/scripts/deploy.sh b/scripts/deploy.sh deleted file mode 100755 index c709de8..0000000 --- a/scripts/deploy.sh +++ /dev/null @@ -1,10 +0,0 @@ -root_path="/Users/jhughes/Desktop/repos/punchpipe" - -rm *-deployment.yaml - -prefect deployment build $root_path/punchpipe/flows/level1.py:level1_process_flow -n level1_process_flow -prefect deployment build $root_path/punchpipe/flows/level1.py:level1_scheduler_flow -n level1_scheduler_flow -prefect deployment build $root_path/punchpipe/controlsegment/launcher.py:launcher_flow -n launcher_flow -prefect deployment apply level1_process_flow-deployment.yaml -prefect deployment apply level1_scheduler_flow-deployment.yaml -prefect deployment apply launcher_flow-deployment.yaml diff --git a/scripts/erase_db.sql b/scripts/erase_db.sql deleted file mode 100644 index cb7d625..0000000 --- a/scripts/erase_db.sql +++ /dev/null @@ -1,11 +0,0 @@ -/* - These commands can be run to completely delete the Science Reduction Database tables. - */ -USE punchpipe; - -DROP TABLE IF EXISTS relationships; -DROP TABLE IF EXISTS files; -DROP TABLE IF EXISTS flows; -DROP TABLE IF EXISTS packets; -DROP TABLE IF EXISTS sci_packets; -DROP TABLE IF EXISTS eng_packets; diff --git a/scripts/setup_db.sql b/scripts/setup_db.sql deleted file mode 100644 index 69fa040..0000000 --- a/scripts/setup_db.sql +++ /dev/null @@ -1,51 +0,0 @@ -/* - These commands can be run to set up the Science Reduction Database for punchpipe. - */ - -CREATE DATABASE IF NOT EXISTS punchpipe; - -USE punchpipe; - -CREATE TABLE flows ( - flow_id CHAR(44) UNIQUE NOT NULL, - flow_type VARCHAR(64) NOT NULL, - state VARCHAR(64) NOT NULL, - creation_time DATETIME NOT NULL, - start_time DATETIME, - end_time DATETIME, - priority INT NOT NULL, - call_data LONGTEXT, - PRIMARY KEY ( flow_id ) -); - -CREATE TABLE files ( - file_id INT UNSIGNED UNIQUE NOT NULL AUTO_INCREMENT, - level INT NOT NULL, - file_type CHAR(2) NOT NULL, - observatory CHAR(1) NOT NULL, - file_version INT NOT NULL, - software_version INT NOT NULL, - date_acquired DATETIME NOT NULL, - date_obs DATETIME NOT NULL, - date_end DATETIME NOT NULL, - polarization CHAR(2), - state VARCHAR(64) NOT NULL, - processing_flow CHAR(44) NOT NULL, - file_name char(35) GENERATED ALWAYS AS - (concat("PUNCH_L", level ,"_", file_type, observatory, "_", - DATE_FORMAT(date_acquired, '%Y%m%d%H%i%s'), - "_", 'v', file_version, '.fits' )), - PRIMARY KEY ( file_id ), - FOREIGN KEY ( processing_flow ) - REFERENCES flows(flow_id) -); - -CREATE TABLE relationships ( - id INT UNSIGNED UNIQUE NOT NULL AUTO_INCREMENT, - parent INT UNSIGNED NOT NULL, - child INT UNSIGNED NOT NULL, - FOREIGN KEY (parent) - REFERENCES files(file_id), - FOREIGN KEY (child) - REFERENCES files(file_id) -); diff --git a/scripts/test_create_ready_level0.py b/scripts/test_create_ready_level0.py deleted file mode 100644 index eef8163..0000000 --- a/scripts/test_create_ready_level0.py +++ /dev/null @@ -1,89 +0,0 @@ -import os -import json -from datetime import datetime - -import numpy as np -from astropy.nddata import StdDevUncertainty -from astropy.wcs import WCS -from ndcube import NDCube -from prefect import flow, task -from prefect_sqlalchemy import SqlAlchemyConnector -from punchbowl.data import NormalizedMetadata -from sqlalchemy.orm import Session - -from punchpipe.controlsegment.db import File, Flow - - -@task -def construct_fake_entries(): - fake_file = File(level=0, - file_type="XX", - observatory="Y", - file_version="0", - software_version="0", - date_created=datetime.now(), - date_beg=datetime.now(), - date_obs=datetime.now(), - date_end=datetime.now(), - polarization="XX", - state="created", - processing_flow=1) - - fake_flow = Flow(flow_type="Level 0", - flow_level=0, - state="completed", - creation_time=datetime.now(), - start_time=datetime.now(), - end_time=datetime.now(), - priority=1, - call_data=json.dumps({"input_filename": "input_test", "output_filename": "output_test"})) - - return fake_flow, fake_file - - -@task -def insert_into_table(fake_flow, fake_file): - credentials = SqlAlchemyConnector.load("mariadb-creds") - engine = credentials.get_engine() - session = Session(engine) - - session.add(fake_flow) - session.commit() - - session.add(fake_file) - session.commit() - - fake_file.processing_flow = fake_flow.flow_id - session.commit() - - -def generate_fake_level0_data(date_obs): - shape = (2048, 2048) - data = np.random.random(shape) - uncertainty = StdDevUncertainty(np.sqrt(np.abs(data))) - wcs = WCS(naxis=2) - wcs.wcs.ctype = "HPLN-ARC", "HPLT-ARC" - wcs.wcs.cunit = "deg", "deg" - wcs.wcs.cdelt = 0.1, 0.1 - wcs.wcs.crpix = 0, 0 - wcs.wcs.crval = 1, 1 - wcs.wcs.cname = "HPC lon", "HPC lat" - - meta = NormalizedMetadata.load_template("PM1", "0") - return NDCube(data=data, uncertainty=uncertainty, wcs=wcs, meta=meta) - - -@flow -def create_fake_level0(): - fake_flow, fake_file = construct_fake_entries() - fake_data = generate_fake_level0_data(fake_file.date_obs) - output_directory = fake_file.directory("/home/marcus.hughes/running_test/") - if not os.path.isdir(output_directory): - os.makedirs(output_directory) - output_path = os.path.join(output_directory, fake_file.filename()) - fake_data.write(output_path) - insert_into_table(fake_flow, fake_file) - - -if __name__ == "__main__": - create_fake_level0() From 98de9fd8db12962dc2c515191872f8fb683a024e Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Mon, 11 Nov 2024 21:58:35 -0700 Subject: [PATCH 07/28] add deploy to cli --- punchpipe/cli.py | 47 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/punchpipe/cli.py b/punchpipe/cli.py index aeadfa4..0867227 100644 --- a/punchpipe/cli.py +++ b/punchpipe/cli.py @@ -1,7 +1,13 @@ import subprocess import click +from prefect import serve +from punchpipe.controlsegment.launcher import launcher_flow +from punchpipe.flows.level1 import level1_process_flow, level1_scheduler_flow +from punchpipe.flows.level2 import level2_process_flow, level2_scheduler_flow +from punchpipe.flows.level3 import level3_PTM_process_flow, level3_PTM_scheduler_flow +from punchpipe.flows.levelq import levelq_process_flow, levelq_scheduler_flow from .monitor.app import create_app @@ -17,3 +23,44 @@ def run(): app = create_app() app.run_server(debug=False, port=8051) + + launcher_deployment = launcher_flow.to_deployment(name="launcher-deployment", + description="Launch a pipeline segment.", + cron="* * * * *", + ) + + level1_scheduler_deployment = level1_scheduler_flow.to_deployment(name="level1-scheduler-deployment", + description="Schedule a Level 1 flow.", + cron="* * * * *", + ) + level1_process_deployment = level1_process_flow.to_deployment(name="level1_process_flow", + description="Process a file from Level 0 to Level 1.") + + level2_scheduler_deployment = level2_scheduler_flow.to_deployment(name="level2-scheduler-deployment", + description="Schedule a Level 2 flow.", + cron="* * * * *", + ) + level2_process_deployment = level2_process_flow.to_deployment(name="level2_process_flow", + description="Process files from Level 1 to Level 2.") + + levelq_scheduler_deployment = levelq_scheduler_flow.to_deployment(name="levelq-scheduler-deployment", + description="Schedule a Level Q flow.", + cron="* * * * *", + ) + levelq_process_deployment = levelq_process_flow.to_deployment(name="levelq_process_flow", + description="Process files from Level 1 to Level Q.") + + level3_PTM_scheduler_deployment = level3_PTM_scheduler_flow.to_deployment(name="level3-PTM-scheduler-deployment", + description="Schedule a Level 3 flow to make PTM.", + cron="* * * * *", + ) + level3_PTM_process_deployment = level3_PTM_process_flow.to_deployment(name="level3_PTM_process_flow", + description="Process PTM files from Level 2 to Level 3.") + + serve(launcher_deployment, + level1_scheduler_deployment, level1_process_deployment, + level2_scheduler_deployment, level2_process_deployment, + levelq_scheduler_deployment, levelq_process_deployment, + level3_PTM_scheduler_deployment, level3_PTM_process_deployment, + limit=1000 + ) From fac5d449f06f597bef00810f6857587acafa4695 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Mon, 11 Nov 2024 22:26:13 -0700 Subject: [PATCH 08/28] allows deploy after monitor --- punchpipe/cli.py | 50 +++++++++++++++++++++++++++--------------------- 1 file changed, 28 insertions(+), 22 deletions(-) diff --git a/punchpipe/cli.py b/punchpipe/cli.py index 0867227..2cfe187 100644 --- a/punchpipe/cli.py +++ b/punchpipe/cli.py @@ -1,4 +1,5 @@ import subprocess +import multiprocessing as mp import click from prefect import serve @@ -15,47 +16,43 @@ def main(): """Run the PUNCH automated pipeline""" -@main.command -def run(): - print("Launching punchpipe monitor on http://localhost:8050/.") - subprocess.Popen(["prefect", "server", "start"]) - print("\npunchpipe Prefect flows must be stopped manually in Prefect.") - +def launch_monitor(): app = create_app() app.run_server(debug=False, port=8051) +def deploy(): launcher_deployment = launcher_flow.to_deployment(name="launcher-deployment", - description="Launch a pipeline segment.", - cron="* * * * *", - ) + description="Launch a pipeline segment.", + cron="* * * * *", + ) level1_scheduler_deployment = level1_scheduler_flow.to_deployment(name="level1-scheduler-deployment", - description="Schedule a Level 1 flow.", - cron="* * * * *", - ) + description="Schedule a Level 1 flow.", + cron="* * * * *", + ) level1_process_deployment = level1_process_flow.to_deployment(name="level1_process_flow", description="Process a file from Level 0 to Level 1.") level2_scheduler_deployment = level2_scheduler_flow.to_deployment(name="level2-scheduler-deployment", - description="Schedule a Level 2 flow.", - cron="* * * * *", - ) + description="Schedule a Level 2 flow.", + cron="* * * * *", + ) level2_process_deployment = level2_process_flow.to_deployment(name="level2_process_flow", description="Process files from Level 1 to Level 2.") levelq_scheduler_deployment = levelq_scheduler_flow.to_deployment(name="levelq-scheduler-deployment", - description="Schedule a Level Q flow.", - cron="* * * * *", - ) + description="Schedule a Level Q flow.", + cron="* * * * *", + ) levelq_process_deployment = levelq_process_flow.to_deployment(name="levelq_process_flow", description="Process files from Level 1 to Level Q.") level3_PTM_scheduler_deployment = level3_PTM_scheduler_flow.to_deployment(name="level3-PTM-scheduler-deployment", - description="Schedule a Level 3 flow to make PTM.", - cron="* * * * *", - ) + description="Schedule a Level 3 flow to make PTM.", + cron="* * * * *", + ) level3_PTM_process_deployment = level3_PTM_process_flow.to_deployment(name="level3_PTM_process_flow", - description="Process PTM files from Level 2 to Level 3.") + description="Process PTM files from Level 2 to Level 3.") serve(launcher_deployment, level1_scheduler_deployment, level1_process_deployment, @@ -64,3 +61,12 @@ def run(): level3_PTM_scheduler_deployment, level3_PTM_process_deployment, limit=1000 ) + + +@main.command +def run(): + print("Launching punchpipe monitor on http://localhost:8051/.") + subprocess.Popen(["prefect", "server", "start"]) + print("\npunchpipe Prefect flows must be stopped manually in Prefect.") + mp.Process(target=launch_monitor, args=()).start() + deploy() \ No newline at end of file From 4378c292c2eae716ec06ee0c0223b137c1f818bf Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Mon, 11 Nov 2024 22:39:30 -0700 Subject: [PATCH 09/28] expand deploy --- deploy.py | 103 ++++++++++++++++++++++++++++++------------------------ 1 file changed, 58 insertions(+), 45 deletions(-) diff --git a/deploy.py b/deploy.py index 64e2df8..8794076 100644 --- a/deploy.py +++ b/deploy.py @@ -1,53 +1,66 @@ +import subprocess + +from joblib._multiprocessing_helpers import mp from prefect import serve from punchpipe.controlsegment.launcher import launcher_flow -from punchpipe.deliver import create_noaa_delivery from punchpipe.flows.level1 import level1_process_flow, level1_scheduler_flow from punchpipe.flows.level2 import level2_process_flow, level2_scheduler_flow from punchpipe.flows.level3 import level3_PTM_process_flow, level3_PTM_scheduler_flow from punchpipe.flows.levelq import levelq_process_flow, levelq_scheduler_flow +from punchpipe.monitor.app import create_app + +def launch_monitor(): + app = create_app() + app.run_server(debug=False, port=8051) + +if __name__ == "__main__": + launcher_deployment = launcher_flow.to_deployment(name="launcher-deployment", + description="Launch a pipeline segment.", + cron="* * * * *", + ) + + level1_scheduler_deployment = level1_scheduler_flow.to_deployment(name="level1-scheduler-deployment", + description="Schedule a Level 1 flow.", + cron="* * * * *", + ) + level1_process_deployment = level1_process_flow.to_deployment(name="level1_process_flow", + description="Process a file from Level 0 to Level 1.") + + level2_scheduler_deployment = level2_scheduler_flow.to_deployment(name="level2-scheduler-deployment", + description="Schedule a Level 2 flow.", + cron="* * * * *", + ) + level2_process_deployment = level2_process_flow.to_deployment(name="level2_process_flow", + description="Process files from Level 1 to Level 2.") + + levelq_scheduler_deployment = levelq_scheduler_flow.to_deployment(name="levelq-scheduler-deployment", + description="Schedule a Level Q flow.", + cron="* * * * *", + ) + levelq_process_deployment = levelq_process_flow.to_deployment(name="levelq_process_flow", + description="Process files from Level 1 to Level Q.") + + level3_PTM_scheduler_deployment = level3_PTM_scheduler_flow.to_deployment(name="level3-PTM-scheduler-deployment", + description="Schedule a Level 3 flow to make PTM.", + cron="* * * * *", + ) + level3_PTM_process_deployment = level3_PTM_process_flow.to_deployment(name="level3_PTM_process_flow", + description="Process PTM files from Level 2 to Level 3.") + + print("Launching punchpipe monitor on http://localhost:8051/.") + subprocess.Popen(["prefect", "server", "start"]) + print("\npunchpipe Prefect flows must be stopped manually in Prefect.") + monitor_process = mp.Process(target=launch_monitor, args=()) + monitor_process.start() + + serve(launcher_deployment, + level1_scheduler_deployment, level1_process_deployment, + level2_scheduler_deployment, level2_process_deployment, + levelq_scheduler_deployment, levelq_process_deployment, + level3_PTM_scheduler_deployment, level3_PTM_process_deployment, + limit=1000 + ) + + monitor_process.join() -launcher_deployment = launcher_flow.to_deployment(name="launcher-deployment", - description="Launch a pipeline segment.", - cron="* * * * *", - ) - -level1_scheduler_deployment = level1_scheduler_flow.to_deployment(name="level1-scheduler-deployment", - description="Schedule a Level 1 flow.", - cron="* * * * *", - ) -level1_process_deployment = level1_process_flow.to_deployment(name="level1_process_flow", - description="Process a file from Level 0 to Level 1.") - -level2_scheduler_deployment = level2_scheduler_flow.to_deployment(name="level2-scheduler-deployment", - description="Schedule a Level 2 flow.", - cron="* * * * *", - ) -level2_process_deployment = level2_process_flow.to_deployment(name="level2_process_flow", - description="Process files from Level 1 to Level 2.") - -levelq_scheduler_deployment = levelq_scheduler_flow.to_deployment(name="levelq-scheduler-deployment", - description="Schedule a Level Q flow.", - cron="* * * * *", - ) -levelq_process_deployment = levelq_process_flow.to_deployment(name="levelq_process_flow", - description="Process files from Level 1 to Level Q.") - -level3_PTM_scheduler_deployment = level3_PTM_scheduler_flow.to_deployment(name="level3-PTM-scheduler-deployment", - description="Schedule a Level 3 flow to make PTM.", - cron="* * * * *", - ) -level3_PTM_process_deployment = level3_PTM_process_flow.to_deployment(name="level3_PTM_process_flow", - description="Process PTM files from Level 2 to Level 3.") - -noaa_deployment = create_noaa_delivery.to_deployment(name="noaa-deployment", - description="Create a Noaa delivery.") - -serve(launcher_deployment, - level1_scheduler_deployment, level1_process_deployment, - level2_scheduler_deployment, level2_process_deployment, - levelq_scheduler_deployment, levelq_process_deployment, - level3_PTM_scheduler_deployment, level3_PTM_process_deployment, - noaa_deployment, - limit=100 # TODO: remove arbitrary limit - ) From 7e1a2b81996a22946d7adad0d53992efede37dfb Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Mon, 11 Nov 2024 22:42:46 -0700 Subject: [PATCH 10/28] change up cli --- deploy.py | 15 --------------- punchpipe/cli.py | 44 -------------------------------------------- 2 files changed, 59 deletions(-) diff --git a/deploy.py b/deploy.py index 8794076..df609b3 100644 --- a/deploy.py +++ b/deploy.py @@ -1,6 +1,3 @@ -import subprocess - -from joblib._multiprocessing_helpers import mp from prefect import serve from punchpipe.controlsegment.launcher import launcher_flow @@ -8,11 +5,7 @@ from punchpipe.flows.level2 import level2_process_flow, level2_scheduler_flow from punchpipe.flows.level3 import level3_PTM_process_flow, level3_PTM_scheduler_flow from punchpipe.flows.levelq import levelq_process_flow, levelq_scheduler_flow -from punchpipe.monitor.app import create_app -def launch_monitor(): - app = create_app() - app.run_server(debug=False, port=8051) if __name__ == "__main__": launcher_deployment = launcher_flow.to_deployment(name="launcher-deployment", @@ -48,12 +41,6 @@ def launch_monitor(): level3_PTM_process_deployment = level3_PTM_process_flow.to_deployment(name="level3_PTM_process_flow", description="Process PTM files from Level 2 to Level 3.") - print("Launching punchpipe monitor on http://localhost:8051/.") - subprocess.Popen(["prefect", "server", "start"]) - print("\npunchpipe Prefect flows must be stopped manually in Prefect.") - monitor_process = mp.Process(target=launch_monitor, args=()) - monitor_process.start() - serve(launcher_deployment, level1_scheduler_deployment, level1_process_deployment, level2_scheduler_deployment, level2_process_deployment, @@ -62,5 +49,3 @@ def launch_monitor(): limit=1000 ) - monitor_process.join() - diff --git a/punchpipe/cli.py b/punchpipe/cli.py index 2cfe187..0f70047 100644 --- a/punchpipe/cli.py +++ b/punchpipe/cli.py @@ -20,53 +20,9 @@ def launch_monitor(): app = create_app() app.run_server(debug=False, port=8051) -def deploy(): - launcher_deployment = launcher_flow.to_deployment(name="launcher-deployment", - description="Launch a pipeline segment.", - cron="* * * * *", - ) - - level1_scheduler_deployment = level1_scheduler_flow.to_deployment(name="level1-scheduler-deployment", - description="Schedule a Level 1 flow.", - cron="* * * * *", - ) - level1_process_deployment = level1_process_flow.to_deployment(name="level1_process_flow", - description="Process a file from Level 0 to Level 1.") - - level2_scheduler_deployment = level2_scheduler_flow.to_deployment(name="level2-scheduler-deployment", - description="Schedule a Level 2 flow.", - cron="* * * * *", - ) - level2_process_deployment = level2_process_flow.to_deployment(name="level2_process_flow", - description="Process files from Level 1 to Level 2.") - - levelq_scheduler_deployment = levelq_scheduler_flow.to_deployment(name="levelq-scheduler-deployment", - description="Schedule a Level Q flow.", - cron="* * * * *", - ) - levelq_process_deployment = levelq_process_flow.to_deployment(name="levelq_process_flow", - description="Process files from Level 1 to Level Q.") - - level3_PTM_scheduler_deployment = level3_PTM_scheduler_flow.to_deployment(name="level3-PTM-scheduler-deployment", - description="Schedule a Level 3 flow to make PTM.", - cron="* * * * *", - ) - level3_PTM_process_deployment = level3_PTM_process_flow.to_deployment(name="level3_PTM_process_flow", - description="Process PTM files from Level 2 to Level 3.") - - serve(launcher_deployment, - level1_scheduler_deployment, level1_process_deployment, - level2_scheduler_deployment, level2_process_deployment, - levelq_scheduler_deployment, levelq_process_deployment, - level3_PTM_scheduler_deployment, level3_PTM_process_deployment, - limit=1000 - ) - - @main.command def run(): print("Launching punchpipe monitor on http://localhost:8051/.") subprocess.Popen(["prefect", "server", "start"]) print("\npunchpipe Prefect flows must be stopped manually in Prefect.") mp.Process(target=launch_monitor, args=()).start() - deploy() \ No newline at end of file From 32761f9b84c1832beeab240dfb60a4aca851775f Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Tue, 12 Nov 2024 04:44:35 -0700 Subject: [PATCH 11/28] delete old files --- punchpipe/level0/decode_sqrt.py | 393 --------------------- punchpipe/level0/tests/test_decode_sqrt.py | 97 ----- 2 files changed, 490 deletions(-) delete mode 100644 punchpipe/level0/decode_sqrt.py delete mode 100644 punchpipe/level0/tests/test_decode_sqrt.py diff --git a/punchpipe/level0/decode_sqrt.py b/punchpipe/level0/decode_sqrt.py deleted file mode 100644 index 8bd9376..0000000 --- a/punchpipe/level0/decode_sqrt.py +++ /dev/null @@ -1,393 +0,0 @@ -import os.path -from typing import Tuple, Union - -import numpy as np -from ndcube import NDCube -from prefect import get_run_logger, task - -TABLE_PATH = os.path.dirname(__file__) + "/decoding_tables/" - - -def decode_sqrt( - data: Union[np.ndarray, float], - from_bits: int = 16, - to_bits: int = 12, - ccd_gain: float = 1 / 4.3, - ccd_offset: float = 100, - ccd_read_noise: float = 17, - overwrite_table: bool = False, -) -> np.ndarray: - """ - Square root decode between specified bitrate values - - Parameters - ---------- - data - Input encoded data array - from_bits - Specified bitrate of encoded image to unpack - to_bits - Specified bitrate of output data (decoded) - ccd_gain - CCD gain [photons / DN] - ccd_offset - CCD bias level [DN] - ccd_read_noise - CCD read noise level [DN] - overwrite_table - Toggle to regenerate and overwrite existing decoding table - - Returns - ------- - np.ndarray - Square root decoded version of the input image - - """ - - table_name = ( - TABLE_PATH - + "tab_fb" - + str(from_bits) - + "_tb" - + str(to_bits) - + "_g" - + str(1 / ccd_gain) - + "_b" - + str(ccd_offset) - + "_r" - + str(ccd_read_noise) - + ".npy" - ) - - # Check for an existing table, otherwise generate one - if not os.path.isfile(table_name) or overwrite_table: - table = generate_decode_sqrt_table(from_bits, to_bits, ccd_gain, ccd_offset, ccd_read_noise) - - # Make the directory if it doesn't exist - if not os.path.isdir(TABLE_PATH): - os.makedirs(TABLE_PATH, exist_ok=True) - - np.save(table_name, table) - else: - table = np.load(table_name) - - return decode_sqrt_by_table(data, table) - - -def encode_sqrt(data: Union[np.ndarray, float], from_bits: int = 16, to_bits: int = 12) -> np.ndarray: - """ - Square root encode between specified bitrate values - - Parameters - ---------- - data - Input data array - from_bits - Specified bitrate of original input image - to_bits - Specified bitrate of output encoded image - - Returns - ------- - np.ndarray - Encoded version of input data - - """ - - data = np.round(data).astype(np.int32).clip(0, None) - factor = np.array(2 ** (2 * to_bits - from_bits)) - data_scaled_by_factor = np.round(data * factor).astype(np.int32) - - return np.floor(np.sqrt(data_scaled_by_factor)).astype(np.int32) - - -def decode_sqrt_simple(data: Union[np.ndarray, float], from_bits: int = 16, to_bits: int = 12) -> np.ndarray: - """ - Performs a simple decoding using the naive squaring strategy - - Parameters - ---------- - data - Input data array - from_bits - Specified bitrate of original input image - to_bits - Specified bitrate of output encoded image - - Returns - ------- - np.ndarray - Decoded version of input data - - """ - - data = np.round(data).astype(np.int32).clip(0, None) - factor = 2.0 ** (2 * to_bits - from_bits) - - return np.round(np.square(data) / factor).astype(np.int32) - - -def noise_pdf( - data_value: Union[np.ndarray, float], - ccd_gain: float = 1 / 4.3, - ccd_offset: float = 100, - ccd_read_noise: float = 17, - n_sigma: int = 5, - n_steps: int = 10000, -) -> Tuple: - """ - Generates a probability distribution function (pdf) from an input data value - - Parameters - ---------- - data_value - Input data value - ccd_gain - CCD gain [DN / electron] - ccd_offset - CCD bias level [DN] - ccd_read_noise - CCD read noise level [DN] - n_sigma - Number of sigma steps - n_steps - Number of data steps - - - Returns - ------- - np.ndarray - Data step distribution - normal - Data normal distribution - - """ - - # Use camera calibration to get an e-count - electrons = np.clip((data_value - ccd_offset) / ccd_gain, 1, None) - - # Shot noise, converted back to DN - poisson_sigma = np.sqrt(electrons) * ccd_gain - - # Total sigma is quadrature sum of fixed & shot - sigma = np.sqrt(poisson_sigma**2 + ccd_read_noise**2) - - dn_steps = np.arange(-n_sigma * sigma, n_sigma * sigma, sigma * n_sigma * 2 / n_steps) - - # Explicitly calculate the Gaussian/normal PDF at each step - normal = np.exp(-dn_steps * dn_steps / sigma / sigma / 2) - - # Easier to normalize numerically than to account for missing tails - normal = normal / np.sum(normal) - - return data_value + dn_steps, normal - - -def mean_b_offset( - data_value: float, - from_bits: int = 16, - to_bits: int = 12, - ccd_gain: float = 1 / 4.3, - ccd_offset: float = 100, - ccd_read_noise: float = 17, -) -> float: - """ - Compute an offset from the naive and robust decoding processes - - Parameters - ---------- - data_value - Input data value [DN] - from_bits - Specified bitrate of encoded image to unpack - to_bits - Specified bitrate of output data (decoded) - ccd_gain - CCD gain [DN / electron] - ccd_offset - CCD bias level [DN] - ccd_read_noise - CCD read noise level [DN] - - Returns - ------- - float - Generated decoding value for use in constructing a decoding table - - """ - naive_decoded_value = decode_sqrt_simple(data_value, from_bits, to_bits) - - # Generate distribution around naive value - (values, weights) = noise_pdf(naive_decoded_value, ccd_gain, ccd_offset, ccd_read_noise) - - # Ignore values below the offset -- which break the noise model - weights = weights * (values >= ccd_offset) - - if np.sum(weights) < 0.95: - return 0 - - weights = weights / np.sum(weights) - - # Encode the entire value distribution - data_values = encode_sqrt(values, from_bits, to_bits) - - # Decode the entire value distribution to find the net offset - net_offset = decode_sqrt_simple(data_values, from_bits, to_bits) - - # Expected value of the entire distribution - expected_value = np.sum(net_offset * weights) - - # Return ΔB. - return expected_value - naive_decoded_value - - -def decode_sqrt_corrected( - data_value: float, - from_bits: int = 16, - to_bits: int = 12, - ccd_gain: float = 1 / 4.3, - ccd_offset: float = 100, - ccd_read_noise: float = 17, -) -> float: - """ - Compute an individual decoding value for an input data value - - Parameters - ---------- - data_value - Input data value [DN] - from_bits - Specified bitrate of encoded image to unpack - to_bits - Specified bitrate of output data (decoded) - ccd_gain - CCD gain [DN / electron] - ccd_offset - CCD bias level [DN] - ccd_read_noise - CCD read noise level [DN] - - Returns - ------- - float - Generated decoding value for use in constructing a decoding table - - """ - - s1p = decode_sqrt_simple(data_value + 1, from_bits, to_bits) - s1n = decode_sqrt_simple(data_value - 1, from_bits, to_bits) - - width = (s1p - s1n) / 4 - - fixed_sigma = np.sqrt(ccd_read_noise**2 + width**2) - - of = mean_b_offset(data_value, from_bits, to_bits, ccd_gain, ccd_offset, fixed_sigma) - - return decode_sqrt_simple(data_value, from_bits, to_bits) - of - - -def generate_decode_sqrt_table( - from_bits: int = 16, - to_bits: int = 12, - ccd_gain: float = 1 / 4.3, - ccd_offset: float = 100, - ccd_read_noise: float = 17, -) -> np.ndarray: - """ - Generates a square root decode table between specified bitrate values and CCD parameters - - Parameters - ---------- - from_bits - Specified bitrate of encoded image to unpack - to_bits - Specified bitrate of output data (decoded) - ccd_gain - CCD gain [DN / electron] - ccd_offset - CCD bias level [DN] - ccd_read_noise - CCD read noise level [DN] - - Returns - ------- - table - Generated square root decoding table - - """ - - table = np.zeros(2**to_bits) - - for i in range(2**to_bits): - table[i] = decode_sqrt_corrected(i, from_bits, to_bits, ccd_gain, ccd_offset, ccd_read_noise) - - return table - - -def decode_sqrt_by_table(data: Union[np.ndarray, float], table: np.ndarray) -> np.ndarray: - """ - Generates a square root decode table between specified bitrate values and CCD parameters - - Parameters - ---------- - data - Input encoded data array - table - Square root decoding table - - Returns - ------- - np.ndarray - Decoded version of input data - - """ - - data = np.round(data).astype(np.int32).clip(0, table.shape[0]) - - return table[data] - - -@task -def decode_sqrt_data(data_object: NDCube, overwrite_table: bool = False) -> NDCube: - """Prefect task in the pipeline to decode square root encoded data - - Parameters - ---------- - data_object : NDCube - the object you wish to decode - overwrite_table - Toggle to regenerate and overwrite existing decoding table - - Returns - ------- - NDCube - a modified version of the input with the data square root decoded - """ - - logger = get_run_logger() - logger.info("square root decoding started") - - data = data_object.data - - from_bits = data_object.meta["RAWBITS"].value - to_bits = data_object.meta["COMPBITS"].value - - ccd_gain = data_object.meta["GAINCMD"].value - ccd_offset = data_object.meta["OFFSET"].value - ccd_read_noise = 17 # DN - - decoded_data = decode_sqrt( - data, - from_bits=from_bits, - to_bits=to_bits, - ccd_gain=ccd_gain, - ccd_offset=ccd_offset, - ccd_read_noise=ccd_read_noise, - overwrite_table=overwrite_table, - ) - data_object.data[...] = decoded_data[...] - - logger.info("square root decoding finished") - data_object.meta.history.add_now("LEVEL0-decode-sqrt", "image square root decoded") - - return data_object diff --git a/punchpipe/level0/tests/test_decode_sqrt.py b/punchpipe/level0/tests/test_decode_sqrt.py deleted file mode 100644 index 178e7a7..0000000 --- a/punchpipe/level0/tests/test_decode_sqrt.py +++ /dev/null @@ -1,97 +0,0 @@ -from datetime import datetime - -import numpy as np -import pytest -from astropy.nddata import StdDevUncertainty -from astropy.wcs import WCS -from ndcube import NDCube -from prefect.logging import disable_run_logger -from punchbowl.data import NormalizedMetadata -from pytest import fixture - -from punchpipe.level0.decode_sqrt import decode_sqrt, decode_sqrt_data, decode_sqrt_simple, encode_sqrt - - -# Some test inputs -@fixture -def sample_punchdata(): - """ - Generate a sample PUNCHData object for testing - """ - - data = np.random.random([2048, 2048]) - uncertainty = StdDevUncertainty(np.sqrt(np.abs(data))) - wcs = WCS(naxis=2) - wcs.wcs.ctype = "HPLN-ARC", "HPLT-ARC" - wcs.wcs.cunit = "deg", "deg" - wcs.wcs.cdelt = 0.01, 0.01 - wcs.wcs.crpix = 1024, 1024 - wcs.wcs.crval = 0, 0 - wcs.wcs.cname = "HPC lon", "HPC lat" - meta = NormalizedMetadata.load_template("PM1", "0") - meta['DATE-OBS'] = str(datetime(2023, 1, 1, 0, 0, 1)) - - punchdata_obj = NDCube(data=data, uncertainty=uncertainty, wcs=wcs, meta=meta) - - punchdata_obj.meta['RAWBITS'] = 16 - punchdata_obj.meta['COMPBITS'] = 10 - punchdata_obj.meta['GAINCMD'] = 1.0/4.3 - punchdata_obj.meta['OFFSET'] = 100 - - return punchdata_obj - - -def test_encoding(): - arr_dim = 2048 - arr = np.random.random([arr_dim, arr_dim]) * (2 ** 16) - - encoded_arr = encode_sqrt(arr, from_bits=16, to_bits=10) - - assert encoded_arr.shape == arr.shape - assert np.max(encoded_arr) <= 2 ** 10 - - -def test_decoding(): - arr_dim = 2048 - arr = np.random.random([arr_dim, arr_dim]) * (2 ** 10) - - decoded_arr = decode_sqrt_simple(arr, from_bits=10, to_bits=16) - - assert decoded_arr.shape == arr.shape - assert np.max(decoded_arr) <= 2 ** 16 - - -@pytest.mark.parametrize('from_bits, to_bits', [(16, 10), (16, 11), (16, 12)]) -def test_encode_then_decode(from_bits, to_bits): - arr_dim = 2048 - ccd_gain = 1.0 / 4.3 # DN/electron - ccd_offset = 100 # DN - ccd_read_noise = 17 # DN - - original_arr = (np.random.random([arr_dim, arr_dim]) * (2 ** from_bits)).astype(int) - - encoded_arr = encode_sqrt(original_arr, from_bits, to_bits) - decoded_arr = decode_sqrt(encoded_arr, - from_bits=from_bits, - to_bits=to_bits, - ccd_gain=ccd_gain, - ccd_offset=ccd_offset, - ccd_read_noise=ccd_read_noise) - - noise_tolerance = np.sqrt(original_arr / ccd_gain) * ccd_gain - - test_coords = np.where(original_arr > 150) - - assert np.all(np.abs(original_arr[test_coords] - decoded_arr[test_coords]) <= noise_tolerance[test_coords]) - - -@pytest.mark.prefect_test -def test_decode_sqrt_data_task(sample_punchdata): - """ - Test the decode_sqrt_data prefect task using a test harness - """ - - with disable_run_logger(): - output_punchdata = decode_sqrt_data.fn(sample_punchdata, overwrite_table=True) - assert isinstance(output_punchdata, NDCube) - assert output_punchdata.data.shape == (2048, 2048) From a426d803aae6ce53d81a248e0c8a2596c0e9f651 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Tue, 12 Nov 2024 04:44:51 -0700 Subject: [PATCH 12/28] improve l0 creation --- config.yaml | 7 + punchpipe/controlsegment/db.py | 125 ++++++++++-- punchpipe/error.py | 17 ++ punchpipe/level0/flow.py | 353 +++++++++++++++++++++++++-------- punchpipe/level0/meta.py | 51 +++++ 5 files changed, 458 insertions(+), 95 deletions(-) create mode 100644 punchpipe/error.py create mode 100644 punchpipe/level0/meta.py diff --git a/config.yaml b/config.yaml index 947660f..86c59ef 100644 --- a/config.yaml +++ b/config.yaml @@ -1,7 +1,14 @@ root: "/d0/punchsoc/gamera_data/" input_drop: "dropzone/" +tlm_directory: "dropzone/" file_version: "1" +plate_scale: + 1: 88 / 3600 + 2: 88 / 3600 + 3: 88 / 3600 + 4: 30 / 3600 + scheduler: max_start: 10 diff --git a/punchpipe/controlsegment/db.py b/punchpipe/controlsegment/db.py index 28c4120..5f175f2 100644 --- a/punchpipe/controlsegment/db.py +++ b/punchpipe/controlsegment/db.py @@ -1,8 +1,11 @@ import os +import math from sqlalchemy import TEXT, Boolean, Column, DateTime, Float, Integer, String from sqlalchemy.orm import declarative_base +from punchpipe.error import MissingCCSDSDataError + Base = declarative_base() @@ -83,13 +86,13 @@ class SciPacket(Base): flash_block = Column(Integer, nullable=False) timestamp = Column(DateTime, nullable=False, index=True) packet_num = Column(Integer, nullable=False) - source_tlm_file = Column(String(128), nullable=False) # TODO: make realistic size + source_tlm_file = Column(Integer, nullable=False) is_used = Column(Boolean) l0_version = Column(Integer) compression_settings = Column(Integer) -class EngPacket(Base): - __tablename__ = "eng_packets" +class EngXACTPacket(Base): + __tablename__ = "eng_xact" packet_id = Column(Integer, primary_key=True) apid = Column(Integer, nullable=False, index=True) sequence_count = Column(Integer, nullable=False) @@ -98,10 +101,95 @@ class EngPacket(Base): flash_block = Column(Integer, nullable=False) timestamp = Column(DateTime, nullable=False, index=True) packet_num = Column(Integer, nullable=False) - source_tlm_file = Column(String(128), nullable=False) # TODO: make realistic size - is_used = Column(Boolean) - l0_version = Column(Integer) + source_tlm_file = Column(Integer, nullable=False) + + ATT_DET_Q_BODY_WRT_ECI1 = Column(Float, nullable=False) # Attitude Quaternion + ATT_DET_Q_BODY_WRT_ECI2 = Column(Float, nullable=False) # Attitude Quaternion + ATT_DET_Q_BODY_WRT_ECI3 = Column(Float, nullable=False) # Attitude Quaternion + ATT_DET_Q_BODY_WRT_ECI4 = Column(Float, nullable=False) # Attitude Quaternion + + ATT_DET_RESIDUAL1 = Column(Float, nullable=False) # Attitude Filter Residual + ATT_DET_RESIDUAL2 = Column(Float, nullable=False) # Attitude Filter Residual + ATT_DET_RESIDUAL3 = Column(Float, nullable=False) # Attitude Filter Residual + REFS_POSITION_WRT_ECI1 = Column(Float, nullable=False) # Orbit Position ECI + REFS_POSITION_WRT_ECI2 = Column(Float, nullable=False) # Orbit Position ECI + REFS_POSITION_WRT_ECI3 = Column(Float, nullable=False) # Orbit Position ECI + + REFS_VELOCITY_WRT_ECI1 = Column(Float, nullable=False) # Orbit Velocity ECI + REFS_VELOCITY_WRT_ECI2 = Column(Float, nullable=False) # Orbit Velocity ECI + REFS_VELOCITY_WRT_ECI3 = Column(Float, nullable=False) # Orbit Velocity ECI + + ATT_CMD_CMD_Q_BODY_WRT_ECI1 = Column(Float, nullable=False) # Commanded Att Quaternion + ATT_CMD_CMD_Q_BODY_WRT_ECI2 = Column(Float, nullable=False) # Commanded Att Quaternion + ATT_CMD_CMD_Q_BODY_WRT_ECI3 = Column(Float, nullable=False) # Commanded Att Quaternion + ATT_CMD_CMD_Q_BODY_WRT_ECI4 = Column(Float, nullable=False) # Commanded Att Quaternion + +class EngPWFPacket(Base): + __tablename__ = "eng_pfw" + packet_id = Column(Integer, primary_key=True) + apid = Column(Integer, nullable=False, index=True) + sequence_count = Column(Integer, nullable=False) + length = Column(Integer, nullable=False) + spacecraft_id = Column(Integer, nullable=False, index=True) + flash_block = Column(Integer, nullable=False) + timestamp = Column(DateTime, nullable=False, index=True) + packet_num = Column(Integer, nullable=False) + source_tlm_file = Column(Integer, nullable=False) + + PFW_STATUS =Column(Integer, nullable=False) # Current PFW Status (0 - no error, else error) + STEP_CALC = Column(Integer, nullable=False) # Calculated step (0-1199) + LAST_CMD_N_STEPS = Column(Integer, nullable=False) # Commanded number of steps (1-1199) + HOME_POSITION_OVRD = Column(Integer, nullable=False) # HOME Position OVERRIDE + POSITION_CURR = Column(Integer, nullable=False) # Current position (1-5, 0 - manual stepping) + POSITION_CMD = Column(Integer, nullable=False) # Commanded position (1-5, 0 - manual stepping) + RESOLVER_POS_RAW = Column(Integer, nullable=False) # Resolver position - raw resolver counts (0-65000) + RESOLVER_POS_CORR = Column(Integer, nullable=False) # Resolver position - error correction applied (0-65000) + RESOLVER_READ_CNT = Column(Integer, nullable=False) # Accumulative # of resolver reads (resets on boot) + LAST_MOVE_N_STEPS = Column(Integer, nullable=False)# Number of steps on last move (1-1199) + LAST_MOVE_EXECUTION_TIME = Column(Float, nullable=False) # Current move execution time + LIFETIME_STEPS_TAKEN = Column(Integer, nullable=False) # Lifetime accumulative number of steps taken + LIFETIME_EXECUTION_TIME = Column(Float, nullable=False) # Lifetime accumulative execution time + FSM_CTRL_STATE = Column(Integer, nullable=False) # Controller FSM State + READ_SUB_STATE = Column(Integer, nullable=False) # READ Sub-FSM State + MOVE_SUB_STATE = Column(Integer, nullable=False) # MOVE Sub-FSM State + HOME_SUB_STATE = Column(Integer, nullable=False) # HOME Sub-FSM State + HOME_POSITION = Column(Integer, nullable=False) # Home Position (1-5) + RESOLVER_SELECT = Column(Integer, nullable=False) # Resolver Select + RESOLVER_TOLERANCE_HOME = Column(Integer, nullable=False) # Resolver Tolerance + RESOLVER_TOLERANCE_CURR = Column(Integer, nullable=False) # Resolver Tolerance + STEPPER_SELECT= Column(Integer, nullable=False) # Stepper Motor Select + STEPPER_RATE_DELAY = Column(Integer, nullable=False) # Stepper Motor Rate Delay + STEPPER_RATE = Column(Float, nullable=False) # Stepper Motor Rate + SHORT_MOVE_SETTLING_TIME_MS = Column(Integer, nullable=False) # Short Move(1-4 steps) Settling time before reading resolver + LONG_MOVE_SETTLING_TIME_MS = Column(Integer, nullable=False) # Long Move(5-1199 steps) Setting time before reading resolver + PRIMARY_STEP_OFFSET_1 = Column(Integer, nullable=False) # Primary Step Offset 1 + PRIMARY_STEP_OFFSET_2 = Column(Integer, nullable=False) # Short Move(1-4 steps) Delay before reading resolver + PRIMARY_STEP_OFFSET_3 = Column(Integer, nullable=False) # Primary Step Offset 3 + PRIMARY_STEP_OFFSET_4 = Column(Integer, nullable=False) # Primary Step Offset 4 + PRIMARY_STEP_OFFSET_5 = Column(Integer, nullable=False) # Primary Step Offset 5 + REDUNDANT_STEP_OFFSET_1 = Column(Integer, nullable=False) # Redundant Step Offset 1 + REDUNDANT_STEP_OFFSET_2 = Column(Integer, nullable=False) # Redundant Step Offset 2 + REDUNDANT_STEP_OFFSET_3 = Column(Integer, nullable=False) # Redundant Step Offset 3 + REDUNDANT_STEP_OFFSET_4 = Column(Integer, nullable=False) # Redundant Step Offset 4 + REDUNDANT_STEP_OFFSET_5 = Column(Integer, nullable=False) # Redundant Step Offset 5 + PRIMARY_RESOLVER_POSITION_1 = Column(Integer, nullable=False) # Primary Resolver Position 1 + PRIMARY_RESOLVER_POSITION_2 = Column(Integer, nullable=False) # Primary Resolver Position 2 + PRIMARY_RESOLVER_POSITION_3 = Column(Integer, nullable=False) # Primary Resolver Position 3 + PRIMARY_RESOLVER_POSITION_4 = Column(Integer, nullable=False) # Primary Resolver Position 4 + PRIMARY_RESOLVER_POSITION_5 = Column(Integer, nullable=False) # Primary Resolver Position 5 + REDUNDANT_RESOLVER_POSITION_1 = Column(Integer, nullable=False) # Redundant Resolver Position 1 + REDUNDANT_RESOLVER_POSITION_2 = Column(Integer, nullable=False) # Redundant Resolver Position 2 + REDUNDANT_RESOLVER_POSITION_3 = Column(Integer, nullable=False) # Redundant Resolver Position 3 + REDUNDANT_RESOLVER_POSITION_4 = Column(Integer, nullable=False) # Redundant Resolver Position 4 + REDUNDANT_RESOLVER_POSITION_5 = Column(Integer, nullable=False) # Redundant Resolver Position 5 + + +class TLMFiles(Base): + __tablename__ = "tlm_files" + tlm_id = Column(Integer, primary_key=True) + path = Column(String(128), nullable=False) + is_processed = Column(Boolean, nullable=False) class Health(Base): __tablename__ = "health" @@ -114,13 +202,18 @@ class Health(Base): disk_percentage = Column(Float, nullable=False) num_pids = Column(Integer, nullable=False) -# def json_numpy_obj_hook(dct): -# """Decodes a previously encoded numpy ndarray with proper shape and dtype. -# -# :param dct: (dict) json encoded ndarray -# :return: (ndarray) if input was an encoded ndarray -# """ -# if isinstance(dct, dict) and '__ndarray__' in dct: -# data = base64.b64decode(dct['__ndarray__']) -# return np.frombuffer(data, dct['dtype']).reshape(dct['shape']) -# return dct + +def get_closest_eng_packets(table, timestamp, spacecraft_id): + # find the closest events which are greater/less than the timestamp + gt_event = table.query.filter(table.spacecraft_id == spacecraft_id).filter(table.timestamp > timestamp).order_by(table.timestamp.asc()).first() + lt_event = table.query.filter(table.spacecraft_id == spacecraft_id).filter(table.timestamp < timestamp).order_by(table.timestamp.desc()).first() + + if gt_event is None and lt_event is None: + msg = "Could not find packet near that time." + raise MissingCCSDSDataError(msg) + elif gt_event is not None and lt_event is None: + lt_event = gt_event + elif gt_event is None and lt_event is not None: + gt_event = lt_event + + return lt_event, gt_event diff --git a/punchpipe/error.py b/punchpipe/error.py new file mode 100644 index 0000000..e3d32ea --- /dev/null +++ b/punchpipe/error.py @@ -0,0 +1,17 @@ +class PunchPipeWarning(Warning): + pass + + +class CCSDSPacketConstructionWarning(PunchPipeWarning): + pass + + +class CCSDSPacketDatabaseUpdateWarning(PunchPipeWarning): + pass + +class PunchPipeError(Exception): + pass + + +class MissingCCSDSDataError(PunchPipeError): + pass diff --git a/punchpipe/level0/flow.py b/punchpipe/level0/flow.py index fd3d0c0..3b15a48 100644 --- a/punchpipe/level0/flow.py +++ b/punchpipe/level0/flow.py @@ -1,20 +1,34 @@ import json import base64 +import os import warnings from datetime import datetime, timedelta +import importlib.metadata import numpy as np import pylibjpeg import pymysql import sqlalchemy.exc +from astropy.wcs import WCS +from ndcube import NDCube from prefect import flow, task +from punchbowl.data import get_base_file_name from sqlalchemy import and_ +from glob import glob +from sunpy.coordinates import sun -from punchpipe.controlsegment.db import EngPacket, SciPacket -from punchpipe.controlsegment.util import get_database_session + +from punchbowl.data.meta import NormalizedMetadata +from punchbowl.data.io import write_ndcube_to_fits +from punchbowl.data.wcs import calculate_pc_matrix, calculate_helio_wcs_from_celestial + +from punchpipe.controlsegment.db import EngXACTPacket, SciPacket, TLMFiles, File, get_closest_eng_packets, EngPWFPacket +from punchpipe.controlsegment.util import get_database_session, load_pipeline_configuration from punchpipe.error import CCSDSPacketConstructionWarning, CCSDSPacketDatabaseUpdateWarning from punchpipe.level0.ccsds import PACKET_APID2NAME, process_telemetry_file, unpack_compression_settings +from punchpipe.level0.meta import convert_pfw_position_to_polarizer, POSITIONS_TO_CODES, eci_quaternion_to_ra_dec +software_version = importlib.metadata.version("punchpipe") class PacketEncoder(json.JSONEncoder): def default(self, obj): @@ -31,9 +45,14 @@ def default(self, obj): @task -def detect_new_tlm_files() -> [str]: - # TODO: implement - return ["/Users/jhughes/Desktop/data/PUNCH_CCSDS/RAW_CCSDS_DATA/PUNCH_NFI00_RAW_2024_160_19_37_V01.tlm"] +def detect_new_tlm_files(session=None) -> [str]: + if session is None: + session = get_database_session() + + tlm_directory = load_pipeline_configuration()['tlm_directory'] + found_tlm_files = set(glob(tlm_directory + '/*.tlm')) + database_tlm_files = set(session.query(TLMFiles.path).distinct().all()) + return list(found_tlm_files - database_tlm_files) @task @@ -41,6 +60,137 @@ def parse_new_tlm_files(telemetry_file_path: str): return process_telemetry_file(telemetry_file_path) +def get_basic_packet_info(packet_name, packet): + try: + seconds = int(packet[packet_name + "_HDR_SEC"]) + microseconds = int(packet[packet_name + "_HDR_USEC"]) + except ValueError: + seconds = 0 + microseconds = 0 + warnings.warn("Time could not be properly extracted for packet.", + CCSDSPacketConstructionWarning) + timestamp = (datetime(2000, 1, 1) + + timedelta(seconds=seconds) + timedelta(microseconds=microseconds)) + + try: + spacecraft_id = int(packet[packet_name + "_HDR_SCID"]) + except ValueError: + spacecraft_id = -1 + warnings.warn("Spacecraft ID could not be extracted for packet.", + CCSDSPacketConstructionWarning) + + try: + flash_block_address = int(packet[packet_name + "_HDR_FLASH_BLOCK"]) + except ValueError: + flash_block_address = -1 + warnings.warn("Flash block address could not be extracted for packet.", + CCSDSPacketConstructionWarning) + + return timestamp, spacecraft_id, flash_block_address + + +def form_packet_entry(apid, packet, packet_num, source_tlm_file_id): + packet_name = PACKET_APID2NAME[apid] + + timestamp, spacecraft_id, flash_block_address = get_basic_packet_info(packet_name, packet) + + match packet_name.lower(): + case 'sci_xfi': + return SciPacket(apid=apid, + sequence_count=packet['CCSDS_SEQUENCE_COUNT'], + length=packet['CCSDS_PACKET_LENGTH'], + spacecraft_id=spacecraft_id, + flash_block=flash_block_address, + timestamp=timestamp, + packet_num=packet_num, + source_tlm_file=source_tlm_file_id, + is_used=False, + compression_settings=packet['SCI_XFI_COM_SET']) + case 'eng_xact': + return EngXACTPacket(apid=apid, + sequence_count=packet['CCSDS_SEQUENCE_COUNT'], + length=packet['CCSDS_PACKET_LENGTH'], + spacecraft_id=spacecraft_id, + flash_block=flash_block_address, + timestamp=timestamp, + packet_num=packet_num, + source_tlm_file=source_tlm_file_id, + ATT_DET_Q_BODY_WRT_ECI1=packet['ATT_DET_Q_BODY_WRT_ECI1'], + ATT_DET_Q_BODY_WRT_ECI2=packet['ATT_DET_Q_BODY_WRT_ECI2'], + ATT_DET_Q_BODY_WRT_ECI3=packet['ATT_DET_Q_BODY_WRT_ECI3'], + ATT_DET_Q_BODY_WRT_ECI4=packet['ATT_DET_Q_BODY_WRT_ECI4'], + ATT_DET_RESIDUAL1=packet['ATT_DET_RESIDUAL1'], + ATT_DET_RESIDUAL2=packet['ATT_DET_RESIDUAL2'], + ATT_DET_RESIDUAL3=packet['ATT_DET_RESIDUAL3'], + REFS_POSITION_WRT_ECI1=packet['REFS_POSITION_WRT_ECI1'], + REFS_POSITION_WRT_ECI2=packet['REFS_POSITION_WRT_ECI2'], + REFS_POSITION_WRT_ECI3=packet['REFS_POSITION_WRT_ECI3'], + REFS_VELOCITY_WRT_ECI1=packet['REFS_VELOCITY_WRT_ECI1'], + REFS_VELOCITY_WRT_ECI2=packet['REFS_VELOCITY_WRT_ECI2'], + REFS_VELOCITY_WRT_ECI3=packet['REFS_VELOCITY_WRT_ECI3'], + ATT_CMD_CMD_Q_BODY_WRT_ECI1=packet['ATT_CMD_CMD_Q_BODY_WRT_ECI1'], + ATT_CMD_CMD_Q_BODY_WRT_ECI2=packet['ATT_CMD_CMD_Q_BODY_WRT_ECI2'], + ATT_CMD_CMD_Q_BODY_WRT_ECI3=packet['ATT_CMD_CMD_Q_BODY_WRT_ECI3'], + ATT_CMD_CMD_Q_BODY_WRT_ECI4=packet['ATT_CMD_CMD_Q_BODY_WRT_ECI4'],) + case 'eng_pfw': + return EngPWFPacket( + apid=apid, + sequence_count=packet['CCSDS_SEQUENCE_COUNT'], + length=packet['CCSDS_PACKET_LENGTH'], + spacecraft_id=spacecraft_id, + flash_block=flash_block_address, + timestamp=timestamp, + packet_num=packet_num, + source_tlm_file=source_tlm_file_id, + PFW_STATUS=packet['PFW_STATUS'], + STEP_CALC=packet['STEP_CALC'], + LAST_CMD_N_STEPS=packet['LAST_CMD_N_STEPS'], + HOME_POSITION_OVRD=packet['HOME_POSITION_OVRD'], + POSITION_CURR=packet['POSITION_CURR'], + POSITION_CMD=packet['POSITION_CMD'], + RESOLVER_POS_RAW=packet['RESOLVER_POS_RAW'], + RESOLVER_POS_CORR=packet['RESOLVER_POS_CORR'], + RESOLVER_READ_CNT=packet['RESOLVER_READ_CNT'], + LAST_MOVE_N_STEPS=packet['LAST_MOVE_N_STEPS'], + LAST_MOVE_EXECUTION_TIME=packet['LAST_MOVE_EXECUTION_TIME'], + LIFETIME_STEPS_TAKEN=packet['LIFETIME_STEPS_TAKEN'], + LIFETIME_EXECUTION_TIME=packet['LIFETIME_EXECUTION_TIME'], + FSM_CTRL_STATE=packet['FSM_CTRL_STATE'], + READ_SUB_STATE=packet['READ_SUB_STATE'], + MOVE_SUB_STATE=packet['MOVE_SUB_STATE'], + HOME_SUB_STATE=packet['HOME_SUB_STATE'], + HOME_POSITION=packet['HOME_POSITION'], + RESOLVER_SELECT=packet['RESOLVER_SELECT'], + RESOLVER_TOLERANCE_HOME=packet['RESOLVER_TOLERANCE_HOME'], + RESOLVER_TOLERANCE_CURR=packet['RESOLVER_TOLERANCE_CURR'], + STEPPER_SELECT=packet['STEPPER_SELECT'], + STEPPER_RATE_DELAY=packet['STEPPER_RATE_DELAY'], + STEPPER_RATE=packet['STEPPER_RATE'], + SHORT_MOVE_SETTLING_TIME_MS=packet['SHORT_MOVE_SETTLING_TIME_MS'], + LONG_MOVE_SETTLING_TIME_MS=packet['LONG_MOVE_SETTLING_TIME_MS'], + PRIMARY_STEP_OFFSET_1=packet['PRIMARY_STEP_OFFSET_1'], + PRIMARY_STEP_OFFSET_2=packet['PRIMARY_STEP_OFFSET_2'], + PRIMARY_STEP_OFFSET_3=packet['PRIMARY_STEP_OFFSET_3'], + PRIMARY_STEP_OFFSET_4=packet['PRIMARY_STEP_OFFSET_4'], + PRIMARY_STEP_OFFSET_5=packet['PRIMARY_STEP_OFFSET_5'], + REDUNDANT_STEP_OFFSET_1=packet['REDUNDANT_STEP_OFFSET_1'], + REDUNDANT_STEP_OFFSET_2=packet['REDUNDANT_STEP_OFFSET_2'], + REDUNDANT_STEP_OFFSET_3=packet['REDUNDANT_STEP_OFFSET_3'], + REDUNDANT_STEP_OFFSET_4=packet['REDUNDANT_STEP_OFFSET_4'], + REDUNDANT_STEP_OFFSET_5=packet['REDUNDANT_STEP_OFFSET_5'], + PRIMARY_RESOLVER_POSITION_1=packet['PRIMARY_RESOLVER_POSITION_1'], + PRIMARY_RESOLVER_POSITION_2=packet['PRIMARY_RESOLVER_POSITION_2'], + PRIMARY_RESOLVER_POSITION_3=packet['PRIMARY_RESOLVER_POSITION_3'], + PRIMARY_RESOLVER_POSITION_4=packet['PRIMARY_RESOLVER_POSITION_4'], + PRIMARY_RESOLVER_POSITION_5=packet['PRIMARY_RESOLVER_POSITION_5'], + REDUNDANT_RESOLVER_POSITION_1=packet['REDUNDANT_RESOLVER_POSITION_1'], + REDUNDANT_RESOLVER_POSITION_2=packet['REDUNDANT_RESOLVER_POSITION_2'], + REDUNDANT_RESOLVER_POSITION_3=packet['REDUNDANT_RESOLVER_POSITION_3'], + REDUNDANT_RESOLVER_POSITION_4=packet['REDUNDANT_RESOLVER_POSITION_4'], + REDUNDANT_RESOLVER_POSITION_5=packet['REDUNDANT_RESOLVER_POSITION_5'], + ) + case _: + warnings.warn(f"Unable to add packet to database.", CCSDSPacketDatabaseUpdateWarning) @task def update_tlm_database(packets, telemetry_file_path: str, session=None): if session is None: @@ -49,54 +199,8 @@ def update_tlm_database(packets, telemetry_file_path: str, session=None): for apid, this_apid_packets in packets.items(): for i in range(len(this_apid_packets['CCSDS_APID'])): if apid in PACKET_APID2NAME: - packet_name = PACKET_APID2NAME[apid] - try: - seconds = int(this_apid_packets[packet_name + "_HDR_SEC"][i]) - microseconds = int(this_apid_packets[packet_name + "_HDR_USEC"][i]) - except ValueError: - seconds = 0 - microseconds = 0 - warnings.warn("Time could not be properly extracted for packet.", - CCSDSPacketConstructionWarning) - timestamp = (datetime(2000, 1, 1) - + timedelta(seconds=seconds) + timedelta(microseconds=microseconds)) - - try: - spacecraft_id = int(this_apid_packets[packet_name + "_HDR_SCID"][i]) - except ValueError: - spacecraft_id = -1 - warnings.warn("Spacecraft ID could not be extracted for packet.", - CCSDSPacketConstructionWarning) - - try: - flash_block_address = int(this_apid_packets[packet_name + "_HDR_FLASH_BLOCK"][i]) - except ValueError: - flash_block_address = -1 - warnings.warn("Flash block address could not be extracted for packet.", - CCSDSPacketConstructionWarning) - try: - if "sci" in packet_name.lower(): - this_packet = SciPacket(apid=apid, - sequence_count=this_apid_packets['CCSDS_SEQUENCE_COUNT'][i], - length=this_apid_packets['CCSDS_PACKET_LENGTH'][i], - spacecraft_id=spacecraft_id, - flash_block=flash_block_address, - timestamp=timestamp, - packet_num=i, - source_tlm_file=telemetry_file_path, - is_used=False, - compression_settings=this_apid_packets['SCI_XFI_COM_SET'][i]) - else: - this_packet = EngPacket(apid=apid, - sequence_count=this_apid_packets['CCSDS_SEQUENCE_COUNT'][i], - length=this_apid_packets['CCSDS_PACKET_LENGTH'][i], - spacecraft_id=spacecraft_id, - flash_block=flash_block_address, - timestamp=timestamp, - packet_num=i, - source_tlm_file=telemetry_file_path, - is_used=False) + this_packet = form_packet_entry(apid, this_apid_packets[i], i, telemetry_file_path) session.add(this_packet) except (sqlalchemy.exc.DataError, pymysql.err.DataError) as e: warnings.warn(f"Unable to add packet to database, {e}.", CCSDSPacketDatabaseUpdateWarning) @@ -104,45 +208,136 @@ def update_tlm_database(packets, telemetry_file_path: str, session=None): @flow -def ingest_raw_packets(): - paths = detect_new_tlm_files() +def ingest_raw_packets(session=None): + if session is None: + session = get_database_session() + + paths = detect_new_tlm_files(session=session) for path in paths: packets = parse_new_tlm_files(path) update_tlm_database(packets, path) + # update the database with this tlm file + new_tlm_file = TLMFiles(path=path, is_processed=True) + session.add(new_tlm_file) + session.commit() + +def interpolate_value(query_time, before_time, before_value, after_time, after_value): + if query_time == before_time: + return before_value + elif query_time == after_time: + return after_value + else: + return ((after_value - before_value) + * (query_time - before_time) / (after_time - before_value) + + before_value) + +def get_fits_metadata(observation_time, spacecraft_id): + before_xact, after_xact = get_closest_eng_packets(EngXACTPacket, observation_time) + ATT_DET_Q_BODY_WRT_ECI1 = interpolate_value(observation_time, + before_xact.timestamp, before_xact['ATT_DET_Q_BODY_WRT_ECI1'], + after_xact.timestam, after_xact['ATT_DET_Q_BODY_WRT_ECI1']) + ATT_DET_Q_BODY_WRT_ECI2 = interpolate_value(observation_time, + before_xact.timestamp, before_xact['ATT_DET_Q_BODY_WRT_ECI2'], + after_xact.timestam, after_xact['ATT_DET_Q_BODY_WRT_ECI2']) + ATT_DET_Q_BODY_WRT_ECI3 = interpolate_value(observation_time, + before_xact.timestamp, before_xact['ATT_DET_Q_BODY_WRT_ECI3'], + after_xact.timestam, after_xact['ATT_DET_Q_BODY_WRT_ECI3']) + ATT_DET_Q_BODY_WRT_ECI4 = interpolate_value(observation_time, + before_xact.timestamp, before_xact['ATT_DET_Q_BODY_WRT_ECI4'], + after_xact.timestam, after_xact['ATT_DET_Q_BODY_WRT_ECI4']) + return {'spacecraft_id': spacecraft_id, + 'datetime': observation_time, + 'ATT_DET_Q_BODY_WRT_ECI1': ATT_DET_Q_BODY_WRT_ECI1, + 'ATT_DET_Q_BODY_WRT_ECI2': ATT_DET_Q_BODY_WRT_ECI2, + 'ATT_DET_Q_BODY_WRT_ECI3': ATT_DET_Q_BODY_WRT_ECI3, + 'ATT_DET_Q_BODY_WRT_ECI4': ATT_DET_Q_BODY_WRT_ECI4} + + +def form_preliminary_wcs(metadata, plate_scale): + quaternion = np.array([metadata['ATT_DET_Q_BODY_WRT_ECI1'], + metadata['ATT_DET_Q_BODY_WRT_ECI2'], + metadata['ATT_DET_Q_BODY_WRT_ECI3'], + metadata['ATT_DET_Q_BODY_WRT_ECI4']]) + ra, dec, roll = eci_quaternion_to_ra_dec(quaternion) + projection = "ARC" if metadata['spacecraft_id'] == '4' else 'AZP' + celestial_wcs = WCS(naxis=2) + celestial_wcs.wcs.crpix = (1024.5, 1024.5) + celestial_wcs.wcs.crval = (ra, dec) + celestial_wcs.wcs.cdelt = plate_scale, plate_scale + celestial_wcs.wcs.pc = calculate_pc_matrix(roll, celestial_wcs.wcs.cdelt) + celestial_wcs.wcs.set_pv([(2, 1, (-sun.earth_distance(metadata['datetime']) / sun.constants.radius).decompose().value)]) + celestial_wcs.wcs.ctype = f"RA--{projection}", f"DEC-{projection}" + celestial_wcs.wcs.cunit = "deg", "deg" + + return calculate_helio_wcs_from_celestial(celestial_wcs, metadata['datetime'], (2048, 2048)) @flow -def form_level0_fits(session=None): +def form_level0_fits(session=None, pipeline_config_path="config.yaml"): if session is None: session = get_database_session() - distinct_times = session.query(SciPacket.timestamp).distinct().all() - distinct_spacecraft = session.query(SciPacket.spacecraft_id).distinct().all() - print("distinct times", len(distinct_times)) + config = load_pipeline_configuration(pipeline_config_path) + + distinct_times = session.query(SciPacket.timestamp).filter(~SciPacket.is_used).distinct().all() + distinct_spacecraft = session.query(SciPacket.spacecraft_id).filter(~SciPacket.is_used).distinct().all() for spacecraft in distinct_spacecraft: for t in distinct_times: - image_packets = session.query(SciPacket).where(and_(SciPacket.timestamp == t[0], SciPacket.spacecraft_id == spacecraft[0])).all() - image_compression = [unpack_compression_settings(packet.compression_settings) for packet in image_packets] - # TODO: open the packets again from the TLM files and get the data... avoid parsing the same TLM file multiple times - # print("this image", len(image_packets), image_compression[0]) - if image_compression[0]['JPEG'] == 1: - form_from_jpeg_compressed(image_packets) - # TODO: do the square root decoding stuff - # TODO: get all the metadata and put it in the right NormalizedMetadata - # TODO: make a fits file - # TODO: write fits file to disk + image_packets_entries = session.query(SciPacket).where(and_(SciPacket.timestamp == t[0], + SciPacket.spacecraft_id == spacecraft[0])).all() + image_compression = [unpack_compression_settings(packet.compression_settings) + for packet in image_packets_entries] + + # Read all the relevant TLM files + needed_tlm_ids = set([image_packet.source_tlm_file for image_packet in image_packets_entries]) + tlm_id_to_tlm_path = {tlm_id: session.query(TLMFiles.path).where(TLMFiles.tlm_id == tlm_id) + for tlm_id in needed_tlm_ids} + needed_tlm_paths = list(session.query(TLMFiles.path).where(TLMFiles.tlm_id.in_(needed_tlm_ids)).all()) + tlm_contents = [process_telemetry_file(tlm_path) for tlm_path in needed_tlm_paths] + + # Form the image packet stream for decompression + ordered_image_content = [] + for packet_entry in image_packets_entries: + tlm_content_index = needed_tlm_paths.index(tlm_id_to_tlm_path[packet_entry.source_tlm_file]) + selected_tlm_contents = tlm_contents[tlm_content_index] + ordered_image_content.append(selected_tlm_contents[0x20]['SCI_XFI_IMG_DATA'][packet_entry.packet_num]) + ordered_image_content = np.concatenate(ordered_image_content) + + # Get the proper image + if image_compression[0]['JPEG'] == 1: # this assumes the image compression is static for an image + image = form_from_jpeg_compressed(ordered_image_content) + else: + image = np.zeros((2048, 2048)) + + spacecraft_id = image_packets_entries[0].spacecraft_id + metadata_contents = get_fits_metadata(image_packets_entries[0].timestamp, spacecraft_id) + file_type = POSITIONS_TO_CODES[convert_pfw_position_to_polarizer(metadata_contents['POSITION_CURR'])] + preliminary_wcs = form_preliminary_wcs(metadata_contents, config['plate_scale'][spacecraft_id]) + meta = NormalizedMetadata.load_template(file_type + spacecraft_id, "0") + for meta_key, meta_value in metadata_contents.items(): + meta[meta_key] = meta_value + cube = NDCube(data=image, metadata=meta, wcs=preliminary_wcs) + + l0_db_entry = File(level="0", + file_type=file_type, + observatory=str(spacecraft_id), + file_version="1", # TODO: increment the file version + software_version=software_version, + date_created=datetime.now(), + date_obs=t, + date_beg=t, + date_end=t, + state="created") + + write_ndcube_to_fits(cube, os.path.join(l0_db_entry.directory(config['data_path']), + get_base_file_name(cube))) + # TODO: write a jp2 + for image_packets_entries in image_packets_entries: + image_packets_entries.is_used = True + session.add(l0_db_entry) + session.commit() def form_from_jpeg_compressed(packets): - # packets = sorted(packets, key=lambda packet: packet.timestamp) - # reference_files = [packet.source_tlm_file for packet in packets] - img = np.concatenate(packets[0x20]['SCI_XFI_IMG_DATA'][22:44]) - img = pylibjpeg.decode(img.tobytes()) + img = pylibjpeg.decode(packets.tobytes()) return img - -if __name__ == '__main__': - # ingest_raw_packets() - # session = get_database_session() - # results = session.query(SciPacket).where(SciPacket.is_used == False).all() - # unique_times = list(set(r.timestamp for r in results)) - form_level0_fits() diff --git a/punchpipe/level0/meta.py b/punchpipe/level0/meta.py new file mode 100644 index 0000000..2384661 --- /dev/null +++ b/punchpipe/level0/meta.py @@ -0,0 +1,51 @@ +import numpy as np +from astropy.coordinates import SkyCoord + +PFW_POSITIONS = {"M": 960, + "opaque": 720, + "Z": 480, + "P": 240, + "Clear": 0} + +POSITIONS_TO_CODES = {"Clear": "CR", "P": "PP", "M": "PM", "Z": "PZ"} + +def convert_pfw_position_to_polarizer(pfw_position): + differences = {key: abs(pfw_position - reference_position) for key, reference_position in PFW_POSITIONS.items()} + return min(differences, key=differences.get) + + +def eci_quaternion_to_ra_dec(q): + """ + Convert an ECI quaternion to RA and Dec. + + Args: + q: A numpy array representing the ECI quaternion (q0, q1, q2, q3). + + Returns: + ra: Right Ascension in degrees. + dec: Declination in degrees. + """ + + # Normalize the quaternion + q = q / np.linalg.norm(q) + + # Calculate the rotation matrix from the quaternion + R = np.array([ + [1 - 2 * (q[2]**2 + q[3]**2), 2 * (q[1] * q[2] - q[0] * q[3]), 2 * (q[1] * q[3] + q[0] * q[2])], + [2 * (q[1] * q[2] + q[0] * q[3]), 1 - 2 * (q[1]**2 + q[3]**2), 2 * (q[2] * q[3] - q[0] * q[1])], + [2 * (q[1] * q[3] - q[0] * q[2]), 2 * (q[2] * q[3] + q[0] * q[1]), 1 - 2 * (q[1]**2 + q[2]**2)] + ]) + + # Extract the unit vector pointing in the z-direction (ECI frame) + z_eci = np.array([0, 0, 1]) + + # Rotate the z-vector to the body frame + z_body = R @ z_eci + + # Calculate RA and Dec from the rotated z-vector + c = SkyCoord(z_body[0], z_body[1], z_body[2], representation_type='cartesian', unit='m') + ra = c.ra.deg + dec = c.dec.deg + roll = np.arctan2(q[1] * q[2] - q[0] * q[3], 1 / 2 - (q[2] ** 2 + q[3] ** 2)) + + return ra, dec, roll From 8664814e9ffd6d58bcb70757ea6912debaf83bb8 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Tue, 12 Nov 2024 06:56:59 -0700 Subject: [PATCH 13/28] handle spacecraft id --- punchpipe/level0/flow.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/punchpipe/level0/flow.py b/punchpipe/level0/flow.py index 3b15a48..b46e458 100644 --- a/punchpipe/level0/flow.py +++ b/punchpipe/level0/flow.py @@ -16,7 +16,7 @@ from sqlalchemy import and_ from glob import glob from sunpy.coordinates import sun - +from prefect.blocks.system import Secret from punchbowl.data.meta import NormalizedMetadata from punchbowl.data.io import write_ndcube_to_fits @@ -310,7 +310,10 @@ def form_level0_fits(session=None, pipeline_config_path="config.yaml"): else: image = np.zeros((2048, 2048)) - spacecraft_id = image_packets_entries[0].spacecraft_id + spacecraft_secrets = Secret.load("spacecraft-ids") + spacecraft_id_mapper = spacecraft_secrets.get() + spacecraft_id = spacecraft_id_mapper[image_packets_entries[0].spacecraft_id] + metadata_contents = get_fits_metadata(image_packets_entries[0].timestamp, spacecraft_id) file_type = POSITIONS_TO_CODES[convert_pfw_position_to_polarizer(metadata_contents['POSITION_CURR'])] preliminary_wcs = form_preliminary_wcs(metadata_contents, config['plate_scale'][spacecraft_id]) From 796cf9cd6b66424e7eb0b53b53c9bdb52461fbd1 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 12 Nov 2024 14:02:03 +0000 Subject: [PATCH 14/28] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- deploy.py | 3 --- punchpipe/cli.py | 6 ------ punchpipe/controlsegment/db.py | 1 - punchpipe/level0/flow.py | 21 ++++++++++----------- 4 files changed, 10 insertions(+), 21 deletions(-) diff --git a/deploy.py b/deploy.py index 2c08d06..7a743ea 100644 --- a/deploy.py +++ b/deploy.py @@ -1,13 +1,11 @@ from prefect import serve from punchpipe.controlsegment.launcher import launcher_flow -from punchpipe.deliver import create_noaa_delivery from punchpipe.flows.level1 import level1_process_flow, level1_scheduler_flow from punchpipe.flows.level2 import level2_process_flow, level2_scheduler_flow from punchpipe.flows.level3 import level3_PTM_process_flow, level3_PTM_scheduler_flow from punchpipe.flows.levelq import levelq_process_flow, levelq_scheduler_flow - if __name__ == "__main__": launcher_deployment = launcher_flow.to_deployment(name="launcher-deployment", description="Launch a pipeline segment.", @@ -49,4 +47,3 @@ level3_PTM_scheduler_deployment, level3_PTM_process_deployment, limit=1000 ) - diff --git a/punchpipe/cli.py b/punchpipe/cli.py index 0f70047..50dbbee 100644 --- a/punchpipe/cli.py +++ b/punchpipe/cli.py @@ -2,13 +2,7 @@ import multiprocessing as mp import click -from prefect import serve -from punchpipe.controlsegment.launcher import launcher_flow -from punchpipe.flows.level1 import level1_process_flow, level1_scheduler_flow -from punchpipe.flows.level2 import level2_process_flow, level2_scheduler_flow -from punchpipe.flows.level3 import level3_PTM_process_flow, level3_PTM_scheduler_flow -from punchpipe.flows.levelq import levelq_process_flow, levelq_scheduler_flow from .monitor.app import create_app diff --git a/punchpipe/controlsegment/db.py b/punchpipe/controlsegment/db.py index 5f175f2..c8d7d32 100644 --- a/punchpipe/controlsegment/db.py +++ b/punchpipe/controlsegment/db.py @@ -1,5 +1,4 @@ import os -import math from sqlalchemy import TEXT, Boolean, Column, DateTime, Float, Integer, String from sqlalchemy.orm import declarative_base diff --git a/punchpipe/level0/flow.py b/punchpipe/level0/flow.py index b46e458..2115a40 100644 --- a/punchpipe/level0/flow.py +++ b/punchpipe/level0/flow.py @@ -1,9 +1,10 @@ +import os import json import base64 -import os import warnings -from datetime import datetime, timedelta import importlib.metadata +from glob import glob +from datetime import datetime, timedelta import numpy as np import pylibjpeg @@ -12,21 +13,19 @@ from astropy.wcs import WCS from ndcube import NDCube from prefect import flow, task +from prefect.blocks.system import Secret from punchbowl.data import get_base_file_name +from punchbowl.data.io import write_ndcube_to_fits +from punchbowl.data.meta import NormalizedMetadata +from punchbowl.data.wcs import calculate_helio_wcs_from_celestial, calculate_pc_matrix from sqlalchemy import and_ -from glob import glob from sunpy.coordinates import sun -from prefect.blocks.system import Secret - -from punchbowl.data.meta import NormalizedMetadata -from punchbowl.data.io import write_ndcube_to_fits -from punchbowl.data.wcs import calculate_pc_matrix, calculate_helio_wcs_from_celestial -from punchpipe.controlsegment.db import EngXACTPacket, SciPacket, TLMFiles, File, get_closest_eng_packets, EngPWFPacket +from punchpipe.controlsegment.db import EngPWFPacket, EngXACTPacket, File, SciPacket, TLMFiles, get_closest_eng_packets from punchpipe.controlsegment.util import get_database_session, load_pipeline_configuration from punchpipe.error import CCSDSPacketConstructionWarning, CCSDSPacketDatabaseUpdateWarning from punchpipe.level0.ccsds import PACKET_APID2NAME, process_telemetry_file, unpack_compression_settings -from punchpipe.level0.meta import convert_pfw_position_to_polarizer, POSITIONS_TO_CODES, eci_quaternion_to_ra_dec +from punchpipe.level0.meta import POSITIONS_TO_CODES, convert_pfw_position_to_polarizer, eci_quaternion_to_ra_dec software_version = importlib.metadata.version("punchpipe") @@ -190,7 +189,7 @@ def form_packet_entry(apid, packet, packet_num, source_tlm_file_id): REDUNDANT_RESOLVER_POSITION_5=packet['REDUNDANT_RESOLVER_POSITION_5'], ) case _: - warnings.warn(f"Unable to add packet to database.", CCSDSPacketDatabaseUpdateWarning) + warnings.warn("Unable to add packet to database.", CCSDSPacketDatabaseUpdateWarning) @task def update_tlm_database(packets, telemetry_file_path: str, session=None): if session is None: From 7124ab11d92baf9695a1ec756ece9b881e5ed7b5 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Tue, 12 Nov 2024 07:07:24 -0700 Subject: [PATCH 15/28] increase software version len in db --- punchpipe/controlsegment/db.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/punchpipe/controlsegment/db.py b/punchpipe/controlsegment/db.py index c8d7d32..1d6ae80 100644 --- a/punchpipe/controlsegment/db.py +++ b/punchpipe/controlsegment/db.py @@ -15,7 +15,7 @@ class File(Base): file_type = Column(String(2), nullable=False) observatory = Column(String(1), nullable=False) file_version = Column(String(16), nullable=False) - software_version = Column(String(16), nullable=False) + software_version = Column(String(20), nullable=False) date_created = Column(DateTime, nullable=True) date_obs = Column(DateTime, nullable=False) date_beg = Column(DateTime, nullable=True) From 9305c33ca0d824730b09d672bbbbddfa7dedff88 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Tue, 12 Nov 2024 07:59:26 -0700 Subject: [PATCH 16/28] remove vignetting --- punchpipe/flows/level1.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/punchpipe/flows/level1.py b/punchpipe/flows/level1.py index b4e2e0f..f7f039d 100644 --- a/punchpipe/flows/level1.py +++ b/punchpipe/flows/level1.py @@ -18,18 +18,6 @@ def level1_query_ready_files(session, pipeline_config: dict): return [[f.file_id] for f in session.query(File).where(and_(File.state == "created", File.level == "0")).all()] -# TODO handle more robustly -@task -def get_vignetting_function(level0_file): - observatory = int(level0_file.observatory) - if observatory < 4: - vignetting_function_path = "/home/marcus.hughes/build4/simpunch/build_3_review_files/PUNCH_L1_GM1_20240817174727_v2.fits" - else: - vignetting_function_path = "/home/marcus.hughes/build4/simpunch/build_3_review_files/PUNCH_L1_GM4_20240819045110_v1.fits" - return vignetting_function_path - - -# TODO handle more robustly @task def get_psf_model_path(level0_file): return "/home/marcus.hughes/build4/simpunch/build_3_review_files/synthetic_forward_psf.h5" @@ -47,7 +35,6 @@ def level1_construct_flow_info(level0_files: list[File], level1_files: File, pip os.path.join(level0_file.directory(pipeline_config["root"]), level0_file.filename()) for level0_file in level0_files ], - "vignetting_function_path": get_vignetting_function(level0_files[0]), "psf_model_path": get_psf_model_path(level0_files[0]), } ) From f03d2a617325a799a5a4ef5f5145fa9dd0f4f4c0 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Tue, 12 Nov 2024 08:09:00 -0700 Subject: [PATCH 17/28] select best psf model --- punchpipe/flows/level1.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/punchpipe/flows/level1.py b/punchpipe/flows/level1.py index f7f039d..bb8bfe8 100644 --- a/punchpipe/flows/level1.py +++ b/punchpipe/flows/level1.py @@ -5,6 +5,7 @@ from prefect import flow, task from punchbowl.level1.flow import level1_core_flow +from punchpipe.controlsegment.util import get_database_session from sqlalchemy import and_ from punchpipe import __version__ @@ -19,8 +20,19 @@ def level1_query_ready_files(session, pipeline_config: dict): @task -def get_psf_model_path(level0_file): - return "/home/marcus.hughes/build4/simpunch/build_3_review_files/synthetic_forward_psf.h5" +def get_psf_model_path(level0_file, pipeline_config: dict): + corresponding_psf_model_type = {"PM": "RM", + "PZ": "RZ", + "PP": "RP", + "CR": "RC"} + psf_model_type = corresponding_psf_model_type[level0_file.file_type] + with get_database_session() as session: + best_model = (session.query(File) + .filter(File.file_type == psf_model_type) + .filter(File.observatory == level0_file.observatory) + .where(File.date_obs < level0_file.date_obs) + .order_by(File.date_obs.desc()).first()) + return os.path.join(best_model.directory(pipeline_config['root']), best_model.filename()) @task @@ -40,7 +52,7 @@ def level1_construct_flow_info(level0_files: list[File], level1_files: File, pip ) return Flow( flow_type=flow_type, - flow_level=1, + flow_level="1", state=state, creation_time=creation_time, priority=priority, From 669ddadbc386b5075542e1750ad4ca6924b3daae Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 12 Nov 2024 15:11:11 +0000 Subject: [PATCH 18/28] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- punchpipe/flows/level1.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/punchpipe/flows/level1.py b/punchpipe/flows/level1.py index bb8bfe8..18f64b4 100644 --- a/punchpipe/flows/level1.py +++ b/punchpipe/flows/level1.py @@ -5,13 +5,13 @@ from prefect import flow, task from punchbowl.level1.flow import level1_core_flow -from punchpipe.controlsegment.util import get_database_session from sqlalchemy import and_ from punchpipe import __version__ from punchpipe.controlsegment.db import File, Flow from punchpipe.controlsegment.processor import generic_process_flow_logic from punchpipe.controlsegment.scheduler import generic_scheduler_flow_logic +from punchpipe.controlsegment.util import get_database_session @task From bf16c836b354219d9e09edc84ca67c98868044ee Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Tue, 12 Nov 2024 09:25:39 -0700 Subject: [PATCH 19/28] fix filters, improve l0 --- config.yaml | 4 + punchpipe/controlsegment/db.py | 2 +- punchpipe/controlsegment/scheduler.py | 3 +- punchpipe/flows/level1.py | 22 ++--- punchpipe/flows/level2.py | 7 +- punchpipe/flows/level3.py | 2 +- punchpipe/flows/levelq.py | 2 +- punchpipe/flows/tests/test_level1.py | 31 ++++--- punchpipe/level0/flow.py | 125 +++++++++++++++++--------- 9 files changed, 131 insertions(+), 67 deletions(-) diff --git a/config.yaml b/config.yaml index 86c59ef..3774873 100644 --- a/config.yaml +++ b/config.yaml @@ -9,6 +9,10 @@ plate_scale: 3: 88 / 3600 4: 30 / 3600 +quality_check: + mean_low: 0 + mean_high: 65000 + scheduler: max_start: 10 diff --git a/punchpipe/controlsegment/db.py b/punchpipe/controlsegment/db.py index 1d6ae80..10b2ea9 100644 --- a/punchpipe/controlsegment/db.py +++ b/punchpipe/controlsegment/db.py @@ -124,7 +124,7 @@ class EngXACTPacket(Base): ATT_CMD_CMD_Q_BODY_WRT_ECI3 = Column(Float, nullable=False) # Commanded Att Quaternion ATT_CMD_CMD_Q_BODY_WRT_ECI4 = Column(Float, nullable=False) # Commanded Att Quaternion -class EngPWFPacket(Base): +class ENGPFWPacket(Base): __tablename__ = "eng_pfw" packet_id = Column(Integer, primary_key=True) apid = Column(Integer, nullable=False, index=True) diff --git a/punchpipe/controlsegment/scheduler.py b/punchpipe/controlsegment/scheduler.py index 8f5e5c3..46ed1dd 100644 --- a/punchpipe/controlsegment/scheduler.py +++ b/punchpipe/controlsegment/scheduler.py @@ -27,7 +27,8 @@ def generic_scheduler_flow_logic( # prepare the new level flow and file children_files = construct_child_file_info(parent_files, pipeline_config) - database_flow_info = construct_child_flow_info(parent_files, children_files, pipeline_config) + database_flow_info = construct_child_flow_info(parent_files, children_files, + pipeline_config, session=session) for child_file in children_files: session.add(child_file) session.add(database_flow_info) diff --git a/punchpipe/flows/level1.py b/punchpipe/flows/level1.py index bb8bfe8..31e0969 100644 --- a/punchpipe/flows/level1.py +++ b/punchpipe/flows/level1.py @@ -14,29 +14,31 @@ from punchpipe.controlsegment.scheduler import generic_scheduler_flow_logic +SCIENCE_LEVEL0_TYPE_CODES = ["PM", "PZ", "PP", "CR"] + @task def level1_query_ready_files(session, pipeline_config: dict): - return [[f.file_id] for f in session.query(File).where(and_(File.state == "created", File.level == "0")).all()] + return [[f.file_id] for f in session.query(File).filter(File.file_type.in_(SCIENCE_LEVEL0_TYPE_CODES)) + .where(and_(File.state == "created", File.level == "0")).all()] @task -def get_psf_model_path(level0_file, pipeline_config: dict): +def get_psf_model_path(level0_file, pipeline_config: dict, session=None): corresponding_psf_model_type = {"PM": "RM", "PZ": "RZ", "PP": "RP", "CR": "RC"} psf_model_type = corresponding_psf_model_type[level0_file.file_type] - with get_database_session() as session: - best_model = (session.query(File) - .filter(File.file_type == psf_model_type) - .filter(File.observatory == level0_file.observatory) - .where(File.date_obs < level0_file.date_obs) - .order_by(File.date_obs.desc()).first()) + best_model = (session.query(File) + .filter(File.file_type == psf_model_type) + .filter(File.observatory == level0_file.observatory) + .where(File.date_obs < level0_file.date_obs) + .order_by(File.date_obs.desc()).first()) return os.path.join(best_model.directory(pipeline_config['root']), best_model.filename()) @task -def level1_construct_flow_info(level0_files: list[File], level1_files: File, pipeline_config: dict): +def level1_construct_flow_info(level0_files: list[File], level1_files: File, pipeline_config: dict, session=None): flow_type = "level1_process_flow" state = "planned" creation_time = datetime.now() @@ -47,7 +49,7 @@ def level1_construct_flow_info(level0_files: list[File], level1_files: File, pip os.path.join(level0_file.directory(pipeline_config["root"]), level0_file.filename()) for level0_file in level0_files ], - "psf_model_path": get_psf_model_path(level0_files[0]), + "psf_model_path": get_psf_model_path(level0_files[0], pipeline_config, session=session), } ) return Flow( diff --git a/punchpipe/flows/level2.py b/punchpipe/flows/level2.py index 40159a8..0cb35e2 100644 --- a/punchpipe/flows/level2.py +++ b/punchpipe/flows/level2.py @@ -12,12 +12,15 @@ from punchpipe.controlsegment.scheduler import generic_scheduler_flow_logic +SCIENCE_LEVEL1_TYPE_CODES = ["PM", "PZ", "PP", "CR"] + + @task def level2_query_ready_files(session, pipeline_config: dict): logger = get_run_logger() all_ready_files = (session.query(File).filter(File.state == "created") .filter(File.level == "1") - .filter(File.file_type.in_(['PP', 'PZ', 'PM'])).all()) + .filter(File.file_type.in_(SCIENCE_LEVEL1_TYPE_CODES)).all()) logger.info(f"{len(all_ready_files)} ready files") unique_times = set(f.date_obs for f in all_ready_files) logger.info(f"{len(unique_times)} unique times: {unique_times}") @@ -29,7 +32,7 @@ def level2_query_ready_files(session, pipeline_config: dict): @task -def level2_construct_flow_info(level1_files: list[File], level2_file: File, pipeline_config: dict): +def level2_construct_flow_info(level1_files: list[File], level2_file: File, pipeline_config: dict, session=None): flow_type = "level2_process_flow" state = "planned" creation_time = datetime.now() diff --git a/punchpipe/flows/level3.py b/punchpipe/flows/level3.py index af5cf39..73a5fdf 100644 --- a/punchpipe/flows/level3.py +++ b/punchpipe/flows/level3.py @@ -58,7 +58,7 @@ def level3_PTM_query_ready_files(session, pipeline_config: dict): @task -def level3_PTM_construct_flow_info(level2_files: File, level3_file: File, pipeline_config: dict): +def level3_PTM_construct_flow_info(level2_files: File, level3_file: File, pipeline_config: dict, session=None): session = get_database_session() # TODO: replace so this works in the tests by passing in a test flow_type = "level3_PTM_process_flow" diff --git a/punchpipe/flows/levelq.py b/punchpipe/flows/levelq.py index 4700d17..ade1aff 100644 --- a/punchpipe/flows/levelq.py +++ b/punchpipe/flows/levelq.py @@ -29,7 +29,7 @@ def levelq_query_ready_files(session, pipeline_config: dict): @task -def levelq_construct_flow_info(level1_files: list[File], levelq_file: File, pipeline_config: dict): +def levelq_construct_flow_info(level1_files: list[File], levelq_file: File, pipeline_config: dict, session=None): flow_type = "levelq_process_flow" state = "planned" creation_time = datetime.now() diff --git a/punchpipe/flows/tests/test_level1.py b/punchpipe/flows/tests/test_level1.py index d634487..6d9fedd 100644 --- a/punchpipe/flows/tests/test_level1.py +++ b/punchpipe/flows/tests/test_level1.py @@ -1,5 +1,5 @@ import os -from datetime import datetime +from datetime import datetime, timedelta import pytest from prefect.testing.utilities import prefect_test_harness @@ -23,24 +23,33 @@ def prefect_test_fixture(): yield def session_fn(session): - level0_file = File(level=0, - file_type='XX', + level0_file = File(level="0", + file_type='PM', observatory='0', state='created', file_version='none', software_version='none', date_obs=datetime.now()) - level1_file = File(level=1, - file_type="XX", + level1_file = File(level="1", + file_type="PM", observatory='0', state='created', file_version='none', software_version='none', date_obs=datetime.now()) - session.add(level0_file, level1_file) + psf_model = File(level="0", + file_type="RM", + observatory='0', + state='created', + file_version='none', + software_version='none', + date_obs=datetime.now()-timedelta(days=1)) + session.add(level0_file) + session.add(level1_file) + session.add(psf_model) db = create_mysql_fixture(Base, session_fn, session=True) @@ -55,7 +64,7 @@ def test_level1_construct_file_info(): pipeline_config_path = os.path.join(TEST_DIR, "config.yaml") pipeline_config = load_pipeline_configuration.fn(pipeline_config_path) level0_file = [File(level="0", - file_type='XX', + file_type='PM', observatory='0', state='created', file_version='none', @@ -72,22 +81,22 @@ def test_level1_construct_file_info(): assert constructed_file_info.state == "planned" -def test_level1_construct_flow_info(): +def test_level1_construct_flow_info(db): pipeline_config_path = os.path.join(TEST_DIR, "config.yaml") pipeline_config = load_pipeline_configuration.fn(pipeline_config_path) level0_file = [File(level="0", - file_type='XX', + file_type='PM', observatory='0', state='created', file_version='none', software_version='none', date_obs=datetime.now())] level1_file = level1_construct_file_info.fn(level0_file, pipeline_config) - flow_info = level1_construct_flow_info.fn(level0_file, level1_file, pipeline_config) + flow_info = level1_construct_flow_info.fn(level0_file, level1_file, pipeline_config, session=db) assert flow_info.flow_type == 'level1_process_flow' assert flow_info.state == "planned" - assert flow_info.flow_level == 1 + assert flow_info.flow_level == "1" assert flow_info.priority == 6 diff --git a/punchpipe/level0/flow.py b/punchpipe/level0/flow.py index 2115a40..ef69b46 100644 --- a/punchpipe/level0/flow.py +++ b/punchpipe/level0/flow.py @@ -7,6 +7,7 @@ from datetime import datetime, timedelta import numpy as np +import pandas as pd import pylibjpeg import pymysql import sqlalchemy.exc @@ -21,7 +22,7 @@ from sqlalchemy import and_ from sunpy.coordinates import sun -from punchpipe.controlsegment.db import EngPWFPacket, EngXACTPacket, File, SciPacket, TLMFiles, get_closest_eng_packets +from punchpipe.controlsegment.db import ENGPFWPacket, EngXACTPacket, File, SciPacket, TLMFiles, get_closest_eng_packets from punchpipe.controlsegment.util import get_database_session, load_pipeline_configuration from punchpipe.error import CCSDSPacketConstructionWarning, CCSDSPacketDatabaseUpdateWarning from punchpipe.level0.ccsds import PACKET_APID2NAME, process_telemetry_file, unpack_compression_settings @@ -132,7 +133,7 @@ def form_packet_entry(apid, packet, packet_num, source_tlm_file_id): ATT_CMD_CMD_Q_BODY_WRT_ECI3=packet['ATT_CMD_CMD_Q_BODY_WRT_ECI3'], ATT_CMD_CMD_Q_BODY_WRT_ECI4=packet['ATT_CMD_CMD_Q_BODY_WRT_ECI4'],) case 'eng_pfw': - return EngPWFPacket( + return ENGPFWPacket( apid=apid, sequence_count=packet['CCSDS_SEQUENCE_COUNT'], length=packet['CCSDS_PACKET_LENGTH'], @@ -232,7 +233,7 @@ def interpolate_value(query_time, before_time, before_value, after_time, after_v + before_value) def get_fits_metadata(observation_time, spacecraft_id): - before_xact, after_xact = get_closest_eng_packets(EngXACTPacket, observation_time) + before_xact, after_xact = get_closest_eng_packets(EngXACTPacket, observation_time, spacecraft_id) ATT_DET_Q_BODY_WRT_ECI1 = interpolate_value(observation_time, before_xact.timestamp, before_xact['ATT_DET_Q_BODY_WRT_ECI1'], after_xact.timestam, after_xact['ATT_DET_Q_BODY_WRT_ECI1']) @@ -245,15 +246,19 @@ def get_fits_metadata(observation_time, spacecraft_id): ATT_DET_Q_BODY_WRT_ECI4 = interpolate_value(observation_time, before_xact.timestamp, before_xact['ATT_DET_Q_BODY_WRT_ECI4'], after_xact.timestam, after_xact['ATT_DET_Q_BODY_WRT_ECI4']) + + before_pfw, _ = get_closest_eng_packets(ENGPFWPacket, observation_time, spacecraft_id) return {'spacecraft_id': spacecraft_id, 'datetime': observation_time, 'ATT_DET_Q_BODY_WRT_ECI1': ATT_DET_Q_BODY_WRT_ECI1, 'ATT_DET_Q_BODY_WRT_ECI2': ATT_DET_Q_BODY_WRT_ECI2, 'ATT_DET_Q_BODY_WRT_ECI3': ATT_DET_Q_BODY_WRT_ECI3, - 'ATT_DET_Q_BODY_WRT_ECI4': ATT_DET_Q_BODY_WRT_ECI4} + 'ATT_DET_Q_BODY_WRT_ECI4': ATT_DET_Q_BODY_WRT_ECI4, + 'POSITION_CURR': before_pfw['POSITION_CURR']} def form_preliminary_wcs(metadata, plate_scale): + """Create the preliminary WCS for punchbowl""" quaternion = np.array([metadata['ATT_DET_Q_BODY_WRT_ECI1'], metadata['ATT_DET_Q_BODY_WRT_ECI2'], metadata['ATT_DET_Q_BODY_WRT_ECI3'], @@ -271,6 +276,15 @@ def form_preliminary_wcs(metadata, plate_scale): return calculate_helio_wcs_from_celestial(celestial_wcs, metadata['datetime'], (2048, 2048)) +def image_is_okay(image, pipeline_config): + """Check that the formed image conforms to image quality expectations""" + return pipeline_config['quality_check']['mean_low'] < np.mean(image) < pipeline_config['quality_check']['mean_high'] + +def form_from_jpeg_compressed(packets): + """Form a JPEG-LS image from packets""" + img = pylibjpeg.decode(packets.tobytes()) + return img + @flow def form_level0_fits(session=None, pipeline_config_path="config.yaml"): if session is None: @@ -281,7 +295,10 @@ def form_level0_fits(session=None, pipeline_config_path="config.yaml"): distinct_times = session.query(SciPacket.timestamp).filter(~SciPacket.is_used).distinct().all() distinct_spacecraft = session.query(SciPacket.spacecraft_id).filter(~SciPacket.is_used).distinct().all() + for spacecraft in distinct_spacecraft: + errors = [] + for t in distinct_times: image_packets_entries = session.query(SciPacket).where(and_(SciPacket.timestamp == t[0], SciPacket.spacecraft_id == spacecraft[0])).all() @@ -304,42 +321,70 @@ def form_level0_fits(session=None, pipeline_config_path="config.yaml"): ordered_image_content = np.concatenate(ordered_image_content) # Get the proper image + skip_image = False if image_compression[0]['JPEG'] == 1: # this assumes the image compression is static for an image - image = form_from_jpeg_compressed(ordered_image_content) + try: + image = form_from_jpeg_compressed(ordered_image_content) + except ValueError as e: + error = {'start_time': image_packets_entries[0].timestamp.strftime("%Y-%m-%d %h:%m:%s"), + 'start_block': image_packets_entries[0].flash_block, + 'replay_length': image_packets_entries[-1].flash_block + - image_packets_entries[0].flash_block} + errors.append(error) + else: + skip_image = True else: - image = np.zeros((2048, 2048)) - - spacecraft_secrets = Secret.load("spacecraft-ids") - spacecraft_id_mapper = spacecraft_secrets.get() - spacecraft_id = spacecraft_id_mapper[image_packets_entries[0].spacecraft_id] - - metadata_contents = get_fits_metadata(image_packets_entries[0].timestamp, spacecraft_id) - file_type = POSITIONS_TO_CODES[convert_pfw_position_to_polarizer(metadata_contents['POSITION_CURR'])] - preliminary_wcs = form_preliminary_wcs(metadata_contents, config['plate_scale'][spacecraft_id]) - meta = NormalizedMetadata.load_template(file_type + spacecraft_id, "0") - for meta_key, meta_value in metadata_contents.items(): - meta[meta_key] = meta_value - cube = NDCube(data=image, metadata=meta, wcs=preliminary_wcs) - - l0_db_entry = File(level="0", - file_type=file_type, - observatory=str(spacecraft_id), - file_version="1", # TODO: increment the file version - software_version=software_version, - date_created=datetime.now(), - date_obs=t, - date_beg=t, - date_end=t, - state="created") - - write_ndcube_to_fits(cube, os.path.join(l0_db_entry.directory(config['data_path']), - get_base_file_name(cube))) - # TODO: write a jp2 - for image_packets_entries in image_packets_entries: - image_packets_entries.is_used = True - session.add(l0_db_entry) - session.commit() + skip_image = True + error = {'start_time': image_packets_entries[0].timestamp.strftime("%Y-%m-%d %h:%m:%s"), + 'start_block': image_packets_entries[0].flash_block, + 'replay_length': image_packets_entries[-1].flash_block + - image_packets_entries[0].flash_block} + errors.append(error) + + # check the quality of the image + if not skip_image and not image_is_okay(image, config): + skip_image = True + error = {'start_time': image_packets_entries[0].timestamp.strftime("%Y-%m-%d %h:%m:%s"), + 'start_block': image_packets_entries[0].flash_block, + 'replay_length': image_packets_entries[-1].flash_block + - image_packets_entries[0].flash_block} + errors.append(error) + + if not skip_image: + spacecraft_secrets = Secret.load("spacecraft-ids") + spacecraft_id_mapper = spacecraft_secrets.get() + spacecraft_id = spacecraft_id_mapper[image_packets_entries[0].spacecraft_id] + + metadata_contents = get_fits_metadata(image_packets_entries[0].timestamp, spacecraft_id) + file_type = POSITIONS_TO_CODES[convert_pfw_position_to_polarizer(metadata_contents['POSITION_CURR'])] + preliminary_wcs = form_preliminary_wcs(metadata_contents, config['plate_scale'][spacecraft_id]) + meta = NormalizedMetadata.load_template(file_type + spacecraft_id, "0") + for meta_key, meta_value in metadata_contents.items(): + meta[meta_key] = meta_value + cube = NDCube(data=image, metadata=meta, wcs=preliminary_wcs) + + l0_db_entry = File(level="0", + file_type=file_type, + observatory=str(spacecraft_id), + file_version="1", # TODO: increment the file version + software_version=software_version, + date_created=datetime.now(), + date_obs=t, + date_beg=t, + date_end=t, + state="created") + + write_ndcube_to_fits(cube, os.path.join(l0_db_entry.directory(config['data_path']), + get_base_file_name(cube))) + # TODO: write a jp2 + for image_packets_entries in image_packets_entries: + image_packets_entries.is_used = True + session.add(l0_db_entry) + session.commit() + df_errors = pd.DataFrame(errors) + date_str = datetime.now().strftime("%Y_%j") + df_path = os.path.join(config['root'], 'REPLAY', f'PUNCH_{spacecraft}_REPLAY_{date_str}.csv') + os.makedirs(df_path, exist_ok=True) + df_errors.to_csv(df_path, index=False) + -def form_from_jpeg_compressed(packets): - img = pylibjpeg.decode(packets.tobytes()) - return img From 5ac305c9d963266d3e388297a38d12510eaf436b Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 12 Nov 2024 16:26:14 +0000 Subject: [PATCH 20/28] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- punchpipe/flows/level1.py | 2 -- punchpipe/flows/level2.py | 1 - punchpipe/level0/flow.py | 4 +--- 3 files changed, 1 insertion(+), 6 deletions(-) diff --git a/punchpipe/flows/level1.py b/punchpipe/flows/level1.py index a3c3f91..ac5f339 100644 --- a/punchpipe/flows/level1.py +++ b/punchpipe/flows/level1.py @@ -11,8 +11,6 @@ from punchpipe.controlsegment.db import File, Flow from punchpipe.controlsegment.processor import generic_process_flow_logic from punchpipe.controlsegment.scheduler import generic_scheduler_flow_logic -from punchpipe.controlsegment.util import get_database_session - SCIENCE_LEVEL0_TYPE_CODES = ["PM", "PZ", "PP", "CR"] diff --git a/punchpipe/flows/level2.py b/punchpipe/flows/level2.py index 0cb35e2..9510a79 100644 --- a/punchpipe/flows/level2.py +++ b/punchpipe/flows/level2.py @@ -11,7 +11,6 @@ from punchpipe.controlsegment.processor import generic_process_flow_logic from punchpipe.controlsegment.scheduler import generic_scheduler_flow_logic - SCIENCE_LEVEL1_TYPE_CODES = ["PM", "PZ", "PP", "CR"] diff --git a/punchpipe/level0/flow.py b/punchpipe/level0/flow.py index ef69b46..a333ab7 100644 --- a/punchpipe/level0/flow.py +++ b/punchpipe/level0/flow.py @@ -325,7 +325,7 @@ def form_level0_fits(session=None, pipeline_config_path="config.yaml"): if image_compression[0]['JPEG'] == 1: # this assumes the image compression is static for an image try: image = form_from_jpeg_compressed(ordered_image_content) - except ValueError as e: + except ValueError: error = {'start_time': image_packets_entries[0].timestamp.strftime("%Y-%m-%d %h:%m:%s"), 'start_block': image_packets_entries[0].flash_block, 'replay_length': image_packets_entries[-1].flash_block @@ -386,5 +386,3 @@ def form_level0_fits(session=None, pipeline_config_path="config.yaml"): df_path = os.path.join(config['root'], 'REPLAY', f'PUNCH_{spacecraft}_REPLAY_{date_str}.csv') os.makedirs(df_path, exist_ok=True) df_errors.to_csv(df_path, index=False) - - From 6af10d3c992f318da677461e0351a3a6d7e45551 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Tue, 12 Nov 2024 12:04:53 -0700 Subject: [PATCH 21/28] add quartic selection --- punchpipe/flows/level1.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/punchpipe/flows/level1.py b/punchpipe/flows/level1.py index ac5f339..81ffdc8 100644 --- a/punchpipe/flows/level1.py +++ b/punchpipe/flows/level1.py @@ -34,6 +34,14 @@ def get_psf_model_path(level0_file, pipeline_config: dict, session=None): .order_by(File.date_obs.desc()).first()) return os.path.join(best_model.directory(pipeline_config['root']), best_model.filename()) +@task +def get_quartic_model_path(level0_file, pipeline_config: dict, session=None): + best_model = (session.query(File) + .filter(File.file_type == 'FQ') + .filter(File.observatory == level0_file.observatory) + .where(File.date_obs < level0_file.date_obs) + .order_by(File.date_obs.desc()).first()) + return os.path.join(best_model.directory(pipeline_config['root']), best_model.filename()) @task def level1_construct_flow_info(level0_files: list[File], level1_files: File, pipeline_config: dict, session=None): @@ -48,6 +56,7 @@ def level1_construct_flow_info(level0_files: list[File], level1_files: File, pip for level0_file in level0_files ], "psf_model_path": get_psf_model_path(level0_files[0], pipeline_config, session=session), + "quartic_coefficient_path": get_quartic_model_path(level0_files[0], pipeline_config, session=session), } ) return Flow( From 3381fde98559bc720690eaf909a5b6d69d80a1f1 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Tue, 12 Nov 2024 12:05:03 -0700 Subject: [PATCH 22/28] clean up formatting --- punchpipe/monitor/app.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/punchpipe/monitor/app.py b/punchpipe/monitor/app.py index 58a08c1..1291aa0 100644 --- a/punchpipe/monitor/app.py +++ b/punchpipe/monitor/app.py @@ -11,8 +11,9 @@ REFRESH_RATE = 60 # seconds -column_names = ["call_data", "creation_time", "end_time", "flow_id", "flow_level", "flow_run_id", - "flow_run_name", "flow_type", "priority", "start_time", "state"] +column_names = ["flow_id", "flow_level", "flow_run_id", + "flow_run_name", "flow_type", "call_data", "creation_time", "end_time", + "priority", "start_time", "state"] schedule_columns =[{'name': v, 'id': v} for v in column_names] From 5527b7081d6b13a16997e11bb894371981c2ec96 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Tue, 12 Nov 2024 13:42:22 -0700 Subject: [PATCH 23/28] allow model to be simultaneous when selected --- punchpipe/flows/level1.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/punchpipe/flows/level1.py b/punchpipe/flows/level1.py index 81ffdc8..fa1905d 100644 --- a/punchpipe/flows/level1.py +++ b/punchpipe/flows/level1.py @@ -30,7 +30,7 @@ def get_psf_model_path(level0_file, pipeline_config: dict, session=None): best_model = (session.query(File) .filter(File.file_type == psf_model_type) .filter(File.observatory == level0_file.observatory) - .where(File.date_obs < level0_file.date_obs) + .where(File.date_obs <= level0_file.date_obs) .order_by(File.date_obs.desc()).first()) return os.path.join(best_model.directory(pipeline_config['root']), best_model.filename()) @@ -39,7 +39,7 @@ def get_quartic_model_path(level0_file, pipeline_config: dict, session=None): best_model = (session.query(File) .filter(File.file_type == 'FQ') .filter(File.observatory == level0_file.observatory) - .where(File.date_obs < level0_file.date_obs) + .where(File.date_obs <= level0_file.date_obs) .order_by(File.date_obs.desc()).first()) return os.path.join(best_model.directory(pipeline_config['root']), best_model.filename()) From 86eaeb8a57516a1d3b31b78a75ae4dfc87a80f5f Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Tue, 12 Nov 2024 14:19:02 -0700 Subject: [PATCH 24/28] drop req that models are before data --- punchpipe/flows/level1.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/punchpipe/flows/level1.py b/punchpipe/flows/level1.py index fa1905d..a87a853 100644 --- a/punchpipe/flows/level1.py +++ b/punchpipe/flows/level1.py @@ -30,7 +30,7 @@ def get_psf_model_path(level0_file, pipeline_config: dict, session=None): best_model = (session.query(File) .filter(File.file_type == psf_model_type) .filter(File.observatory == level0_file.observatory) - .where(File.date_obs <= level0_file.date_obs) + # .where(File.date_obs <= level0_file.date_obs) .order_by(File.date_obs.desc()).first()) return os.path.join(best_model.directory(pipeline_config['root']), best_model.filename()) @@ -39,7 +39,7 @@ def get_quartic_model_path(level0_file, pipeline_config: dict, session=None): best_model = (session.query(File) .filter(File.file_type == 'FQ') .filter(File.observatory == level0_file.observatory) - .where(File.date_obs <= level0_file.date_obs) + # .where(File.date_obs <= level0_file.date_obs) .order_by(File.date_obs.desc()).first()) return os.path.join(best_model.directory(pipeline_config['root']), best_model.filename()) From 55d5b6e6c2b040ccc697033132c3ed3ccb9e7002 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Tue, 12 Nov 2024 14:19:45 -0700 Subject: [PATCH 25/28] revert --- punchpipe/flows/level1.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/punchpipe/flows/level1.py b/punchpipe/flows/level1.py index a87a853..fa1905d 100644 --- a/punchpipe/flows/level1.py +++ b/punchpipe/flows/level1.py @@ -30,7 +30,7 @@ def get_psf_model_path(level0_file, pipeline_config: dict, session=None): best_model = (session.query(File) .filter(File.file_type == psf_model_type) .filter(File.observatory == level0_file.observatory) - # .where(File.date_obs <= level0_file.date_obs) + .where(File.date_obs <= level0_file.date_obs) .order_by(File.date_obs.desc()).first()) return os.path.join(best_model.directory(pipeline_config['root']), best_model.filename()) @@ -39,7 +39,7 @@ def get_quartic_model_path(level0_file, pipeline_config: dict, session=None): best_model = (session.query(File) .filter(File.file_type == 'FQ') .filter(File.observatory == level0_file.observatory) - # .where(File.date_obs <= level0_file.date_obs) + .where(File.date_obs <= level0_file.date_obs) .order_by(File.date_obs.desc()).first()) return os.path.join(best_model.directory(pipeline_config['root']), best_model.filename()) From cdc291d4e4934d2fa2ddf5ae7ce63012f266750d Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Tue, 12 Nov 2024 14:26:56 -0700 Subject: [PATCH 26/28] drop requirement --- punchpipe/flows/level1.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/punchpipe/flows/level1.py b/punchpipe/flows/level1.py index fa1905d..a87a853 100644 --- a/punchpipe/flows/level1.py +++ b/punchpipe/flows/level1.py @@ -30,7 +30,7 @@ def get_psf_model_path(level0_file, pipeline_config: dict, session=None): best_model = (session.query(File) .filter(File.file_type == psf_model_type) .filter(File.observatory == level0_file.observatory) - .where(File.date_obs <= level0_file.date_obs) + # .where(File.date_obs <= level0_file.date_obs) .order_by(File.date_obs.desc()).first()) return os.path.join(best_model.directory(pipeline_config['root']), best_model.filename()) @@ -39,7 +39,7 @@ def get_quartic_model_path(level0_file, pipeline_config: dict, session=None): best_model = (session.query(File) .filter(File.file_type == 'FQ') .filter(File.observatory == level0_file.observatory) - .where(File.date_obs <= level0_file.date_obs) + # .where(File.date_obs <= level0_file.date_obs) .order_by(File.date_obs.desc()).first()) return os.path.join(best_model.directory(pipeline_config['root']), best_model.filename()) From 916d5f4edda93694a4e41de5736ffc59cc8da8c3 Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Tue, 12 Nov 2024 14:33:55 -0700 Subject: [PATCH 27/28] make sure models are l1 --- punchpipe/flows/level1.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/punchpipe/flows/level1.py b/punchpipe/flows/level1.py index a87a853..f944ac8 100644 --- a/punchpipe/flows/level1.py +++ b/punchpipe/flows/level1.py @@ -29,7 +29,7 @@ def get_psf_model_path(level0_file, pipeline_config: dict, session=None): psf_model_type = corresponding_psf_model_type[level0_file.file_type] best_model = (session.query(File) .filter(File.file_type == psf_model_type) - .filter(File.observatory == level0_file.observatory) + .filter(File.observatory == "1") # .where(File.date_obs <= level0_file.date_obs) .order_by(File.date_obs.desc()).first()) return os.path.join(best_model.directory(pipeline_config['root']), best_model.filename()) @@ -38,7 +38,7 @@ def get_psf_model_path(level0_file, pipeline_config: dict, session=None): def get_quartic_model_path(level0_file, pipeline_config: dict, session=None): best_model = (session.query(File) .filter(File.file_type == 'FQ') - .filter(File.observatory == level0_file.observatory) + .filter(File.observatory == "1") # .where(File.date_obs <= level0_file.date_obs) .order_by(File.date_obs.desc()).first()) return os.path.join(best_model.directory(pipeline_config['root']), best_model.filename()) From dccc3676c5277604a53ab132fa6062f7531ff5dd Mon Sep 17 00:00:00 2001 From: Marcus Hughes Date: Tue, 12 Nov 2024 20:32:52 -0700 Subject: [PATCH 28/28] fix broken test --- punchpipe/flows/level1.py | 8 ++++---- punchpipe/flows/tests/test_level1.py | 11 ++++++++++- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/punchpipe/flows/level1.py b/punchpipe/flows/level1.py index f944ac8..fa1905d 100644 --- a/punchpipe/flows/level1.py +++ b/punchpipe/flows/level1.py @@ -29,8 +29,8 @@ def get_psf_model_path(level0_file, pipeline_config: dict, session=None): psf_model_type = corresponding_psf_model_type[level0_file.file_type] best_model = (session.query(File) .filter(File.file_type == psf_model_type) - .filter(File.observatory == "1") - # .where(File.date_obs <= level0_file.date_obs) + .filter(File.observatory == level0_file.observatory) + .where(File.date_obs <= level0_file.date_obs) .order_by(File.date_obs.desc()).first()) return os.path.join(best_model.directory(pipeline_config['root']), best_model.filename()) @@ -38,8 +38,8 @@ def get_psf_model_path(level0_file, pipeline_config: dict, session=None): def get_quartic_model_path(level0_file, pipeline_config: dict, session=None): best_model = (session.query(File) .filter(File.file_type == 'FQ') - .filter(File.observatory == "1") - # .where(File.date_obs <= level0_file.date_obs) + .filter(File.observatory == level0_file.observatory) + .where(File.date_obs <= level0_file.date_obs) .order_by(File.date_obs.desc()).first()) return os.path.join(best_model.directory(pipeline_config['root']), best_model.filename()) diff --git a/punchpipe/flows/tests/test_level1.py b/punchpipe/flows/tests/test_level1.py index 6d9fedd..006af0a 100644 --- a/punchpipe/flows/tests/test_level1.py +++ b/punchpipe/flows/tests/test_level1.py @@ -39,7 +39,7 @@ def session_fn(session): software_version='none', date_obs=datetime.now()) - psf_model = File(level="0", + psf_model = File(level="1", file_type="RM", observatory='0', state='created', @@ -47,9 +47,18 @@ def session_fn(session): software_version='none', date_obs=datetime.now()-timedelta(days=1)) + quartic_fit_coeffs = File(level="1", + file_type="FQ", + observatory='0', + state='created', + file_version='none', + software_version='none', + date_obs=datetime.now()-timedelta(days=1)) + session.add(level0_file) session.add(level1_file) session.add(psf_model) + session.add(quartic_fit_coeffs) db = create_mysql_fixture(Base, session_fn, session=True)