Skip to content

Commit

Permalink
Merge pull request #78 from punch-mission/monitoring
Browse files Browse the repository at this point in the history
Improve monitoring utility
  • Loading branch information
jmbhughes authored Nov 13, 2024
2 parents de0b62d + dccc367 commit 422f9ed
Show file tree
Hide file tree
Showing 23 changed files with 717 additions and 927 deletions.
11 changes: 11 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
root: "/d0/punchsoc/gamera_data/"
input_drop: "dropzone/"
tlm_directory: "dropzone/"
file_version: "1"

plate_scale:
1: 88 / 3600
2: 88 / 3600
3: 88 / 3600
4: 30 / 3600

quality_check:
mean_low: 0
mean_high: 65000

scheduler:
max_start: 10

Expand Down
86 changes: 41 additions & 45 deletions deploy.py
Original file line number Diff line number Diff line change
@@ -1,53 +1,49 @@
from prefect import serve

from punchpipe.controlsegment.launcher import launcher_flow
from punchpipe.deliver import create_noaa_delivery
from punchpipe.flows.level1 import level1_process_flow, level1_scheduler_flow
from punchpipe.flows.level2 import level2_process_flow, level2_scheduler_flow
from punchpipe.flows.level3 import level3_PTM_process_flow, level3_PTM_scheduler_flow
from punchpipe.flows.levelq import levelq_process_flow, levelq_scheduler_flow

launcher_deployment = launcher_flow.to_deployment(name="launcher-deployment",
description="Launch a pipeline segment.",
cron="* * * * *",
)

level1_scheduler_deployment = level1_scheduler_flow.to_deployment(name="level1-scheduler-deployment",
description="Schedule a Level 1 flow.",
cron="* * * * *",
)
level1_process_deployment = level1_process_flow.to_deployment(name="level1_process_flow",
description="Process a file from Level 0 to Level 1.")

level2_scheduler_deployment = level2_scheduler_flow.to_deployment(name="level2-scheduler-deployment",
description="Schedule a Level 2 flow.",
cron="* * * * *",
)
level2_process_deployment = level2_process_flow.to_deployment(name="level2_process_flow",
description="Process files from Level 1 to Level 2.")

levelq_scheduler_deployment = levelq_scheduler_flow.to_deployment(name="levelq-scheduler-deployment",
description="Schedule a Level Q flow.",
cron="* * * * *",
)
levelq_process_deployment = levelq_process_flow.to_deployment(name="levelq_process_flow",
description="Process files from Level 1 to Level Q.")

level3_PTM_scheduler_deployment = level3_PTM_scheduler_flow.to_deployment(name="level3-PTM-scheduler-deployment",
description="Schedule a Level 3 flow to make PTM.",
cron="* * * * *",
)
level3_PTM_process_deployment = level3_PTM_process_flow.to_deployment(name="level3_PTM_process_flow",
description="Process PTM files from Level 2 to Level 3.")

noaa_deployment = create_noaa_delivery.to_deployment(name="noaa-deployment",
description="Create a Noaa delivery.")

serve(launcher_deployment,
level1_scheduler_deployment, level1_process_deployment,
level2_scheduler_deployment, level2_process_deployment,
levelq_scheduler_deployment, levelq_process_deployment,
level3_PTM_scheduler_deployment, level3_PTM_process_deployment,
noaa_deployment,
limit=100 # TODO: remove arbitrary limit
)
if __name__ == "__main__":
launcher_deployment = launcher_flow.to_deployment(name="launcher-deployment",
description="Launch a pipeline segment.",
cron="* * * * *",
)

level1_scheduler_deployment = level1_scheduler_flow.to_deployment(name="level1-scheduler-deployment",
description="Schedule a Level 1 flow.",
cron="* * * * *",
)
level1_process_deployment = level1_process_flow.to_deployment(name="level1_process_flow",
description="Process a file from Level 0 to Level 1.")

level2_scheduler_deployment = level2_scheduler_flow.to_deployment(name="level2-scheduler-deployment",
description="Schedule a Level 2 flow.",
cron="* * * * *",
)
level2_process_deployment = level2_process_flow.to_deployment(name="level2_process_flow",
description="Process files from Level 1 to Level 2.")

levelq_scheduler_deployment = levelq_scheduler_flow.to_deployment(name="levelq-scheduler-deployment",
description="Schedule a Level Q flow.",
cron="* * * * *",
)
levelq_process_deployment = levelq_process_flow.to_deployment(name="levelq_process_flow",
description="Process files from Level 1 to Level Q.")

level3_PTM_scheduler_deployment = level3_PTM_scheduler_flow.to_deployment(name="level3-PTM-scheduler-deployment",
description="Schedule a Level 3 flow to make PTM.",
cron="* * * * *",
)
level3_PTM_process_deployment = level3_PTM_process_flow.to_deployment(name="level3_PTM_process_flow",
description="Process PTM files from Level 2 to Level 3.")

serve(launcher_deployment,
level1_scheduler_deployment, level1_process_deployment,
level2_scheduler_deployment, level2_process_deployment,
levelq_scheduler_deployment, levelq_process_deployment,
level3_PTM_scheduler_deployment, level3_PTM_process_deployment,
limit=1000
)
28 changes: 0 additions & 28 deletions old_config.yaml

This file was deleted.

18 changes: 13 additions & 5 deletions punchpipe/cli.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
import subprocess
import multiprocessing as mp

import click
from waitress import serve

from .monitor.app import server
from .monitor.app import create_app


@click.command
@click.group
def main():
"""Run the PUNCH automated pipeline"""

def launch_monitor():
app = create_app()
app.run_server(debug=False, port=8051)

@main.command
def run():
print("Launching punchpipe monitor on http://localhost:8050/.")
print("Launching punchpipe monitor on http://localhost:8051/.")
subprocess.Popen(["prefect", "server", "start"])
serve(server, host='0.0.0.0', port=8050)
print("\npunchpipe Prefect flows must be stopped manually in Prefect.")
mp.Process(target=launch_monitor, args=()).start()
139 changes: 121 additions & 18 deletions punchpipe/controlsegment/db.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import os

from sqlalchemy import TEXT, Boolean, Column, DateTime, Integer, String
from sqlalchemy import TEXT, Boolean, Column, DateTime, Float, Integer, String
from sqlalchemy.orm import declarative_base

from punchpipe.error import MissingCCSDSDataError

Base = declarative_base()


Expand All @@ -13,7 +15,7 @@ class File(Base):
file_type = Column(String(2), nullable=False)
observatory = Column(String(1), nullable=False)
file_version = Column(String(16), nullable=False)
software_version = Column(String(16), nullable=False)
software_version = Column(String(20), nullable=False)
date_created = Column(DateTime, nullable=True)
date_obs = Column(DateTime, nullable=False)
date_beg = Column(DateTime, nullable=True)
Expand Down Expand Up @@ -83,13 +85,13 @@ class SciPacket(Base):
flash_block = Column(Integer, nullable=False)
timestamp = Column(DateTime, nullable=False, index=True)
packet_num = Column(Integer, nullable=False)
source_tlm_file = Column(String(128), nullable=False) # TODO: make realistic size
source_tlm_file = Column(Integer, nullable=False)
is_used = Column(Boolean)
l0_version = Column(Integer)
compression_settings = Column(Integer)

class EngPacket(Base):
__tablename__ = "eng_packets"
class EngXACTPacket(Base):
__tablename__ = "eng_xact"
packet_id = Column(Integer, primary_key=True)
apid = Column(Integer, nullable=False, index=True)
sequence_count = Column(Integer, nullable=False)
Expand All @@ -98,18 +100,119 @@ class EngPacket(Base):
flash_block = Column(Integer, nullable=False)
timestamp = Column(DateTime, nullable=False, index=True)
packet_num = Column(Integer, nullable=False)
source_tlm_file = Column(String(128), nullable=False) # TODO: make realistic size
is_used = Column(Boolean)
l0_version = Column(Integer)
source_tlm_file = Column(Integer, nullable=False)

ATT_DET_Q_BODY_WRT_ECI1 = Column(Float, nullable=False) # Attitude Quaternion
ATT_DET_Q_BODY_WRT_ECI2 = Column(Float, nullable=False) # Attitude Quaternion
ATT_DET_Q_BODY_WRT_ECI3 = Column(Float, nullable=False) # Attitude Quaternion
ATT_DET_Q_BODY_WRT_ECI4 = Column(Float, nullable=False) # Attitude Quaternion

ATT_DET_RESIDUAL1 = Column(Float, nullable=False) # Attitude Filter Residual
ATT_DET_RESIDUAL2 = Column(Float, nullable=False) # Attitude Filter Residual
ATT_DET_RESIDUAL3 = Column(Float, nullable=False) # Attitude Filter Residual

# def json_numpy_obj_hook(dct):
# """Decodes a previously encoded numpy ndarray with proper shape and dtype.
#
# :param dct: (dict) json encoded ndarray
# :return: (ndarray) if input was an encoded ndarray
# """
# if isinstance(dct, dict) and '__ndarray__' in dct:
# data = base64.b64decode(dct['__ndarray__'])
# return np.frombuffer(data, dct['dtype']).reshape(dct['shape'])
# return dct
REFS_POSITION_WRT_ECI1 = Column(Float, nullable=False) # Orbit Position ECI
REFS_POSITION_WRT_ECI2 = Column(Float, nullable=False) # Orbit Position ECI
REFS_POSITION_WRT_ECI3 = Column(Float, nullable=False) # Orbit Position ECI

REFS_VELOCITY_WRT_ECI1 = Column(Float, nullable=False) # Orbit Velocity ECI
REFS_VELOCITY_WRT_ECI2 = Column(Float, nullable=False) # Orbit Velocity ECI
REFS_VELOCITY_WRT_ECI3 = Column(Float, nullable=False) # Orbit Velocity ECI

ATT_CMD_CMD_Q_BODY_WRT_ECI1 = Column(Float, nullable=False) # Commanded Att Quaternion
ATT_CMD_CMD_Q_BODY_WRT_ECI2 = Column(Float, nullable=False) # Commanded Att Quaternion
ATT_CMD_CMD_Q_BODY_WRT_ECI3 = Column(Float, nullable=False) # Commanded Att Quaternion
ATT_CMD_CMD_Q_BODY_WRT_ECI4 = Column(Float, nullable=False) # Commanded Att Quaternion

class ENGPFWPacket(Base):
__tablename__ = "eng_pfw"
packet_id = Column(Integer, primary_key=True)
apid = Column(Integer, nullable=False, index=True)
sequence_count = Column(Integer, nullable=False)
length = Column(Integer, nullable=False)
spacecraft_id = Column(Integer, nullable=False, index=True)
flash_block = Column(Integer, nullable=False)
timestamp = Column(DateTime, nullable=False, index=True)
packet_num = Column(Integer, nullable=False)
source_tlm_file = Column(Integer, nullable=False)

PFW_STATUS =Column(Integer, nullable=False) # Current PFW Status (0 - no error, else error)
STEP_CALC = Column(Integer, nullable=False) # Calculated step (0-1199)
LAST_CMD_N_STEPS = Column(Integer, nullable=False) # Commanded number of steps (1-1199)
HOME_POSITION_OVRD = Column(Integer, nullable=False) # HOME Position OVERRIDE
POSITION_CURR = Column(Integer, nullable=False) # Current position (1-5, 0 - manual stepping)
POSITION_CMD = Column(Integer, nullable=False) # Commanded position (1-5, 0 - manual stepping)
RESOLVER_POS_RAW = Column(Integer, nullable=False) # Resolver position - raw resolver counts (0-65000)
RESOLVER_POS_CORR = Column(Integer, nullable=False) # Resolver position - error correction applied (0-65000)
RESOLVER_READ_CNT = Column(Integer, nullable=False) # Accumulative # of resolver reads (resets on boot)
LAST_MOVE_N_STEPS = Column(Integer, nullable=False)# Number of steps on last move (1-1199)
LAST_MOVE_EXECUTION_TIME = Column(Float, nullable=False) # Current move execution time
LIFETIME_STEPS_TAKEN = Column(Integer, nullable=False) # Lifetime accumulative number of steps taken
LIFETIME_EXECUTION_TIME = Column(Float, nullable=False) # Lifetime accumulative execution time
FSM_CTRL_STATE = Column(Integer, nullable=False) # Controller FSM State
READ_SUB_STATE = Column(Integer, nullable=False) # READ Sub-FSM State
MOVE_SUB_STATE = Column(Integer, nullable=False) # MOVE Sub-FSM State
HOME_SUB_STATE = Column(Integer, nullable=False) # HOME Sub-FSM State
HOME_POSITION = Column(Integer, nullable=False) # Home Position (1-5)
RESOLVER_SELECT = Column(Integer, nullable=False) # Resolver Select
RESOLVER_TOLERANCE_HOME = Column(Integer, nullable=False) # Resolver Tolerance
RESOLVER_TOLERANCE_CURR = Column(Integer, nullable=False) # Resolver Tolerance
STEPPER_SELECT= Column(Integer, nullable=False) # Stepper Motor Select
STEPPER_RATE_DELAY = Column(Integer, nullable=False) # Stepper Motor Rate Delay
STEPPER_RATE = Column(Float, nullable=False) # Stepper Motor Rate
SHORT_MOVE_SETTLING_TIME_MS = Column(Integer, nullable=False) # Short Move(1-4 steps) Settling time before reading resolver
LONG_MOVE_SETTLING_TIME_MS = Column(Integer, nullable=False) # Long Move(5-1199 steps) Setting time before reading resolver
PRIMARY_STEP_OFFSET_1 = Column(Integer, nullable=False) # Primary Step Offset 1
PRIMARY_STEP_OFFSET_2 = Column(Integer, nullable=False) # Short Move(1-4 steps) Delay before reading resolver
PRIMARY_STEP_OFFSET_3 = Column(Integer, nullable=False) # Primary Step Offset 3
PRIMARY_STEP_OFFSET_4 = Column(Integer, nullable=False) # Primary Step Offset 4
PRIMARY_STEP_OFFSET_5 = Column(Integer, nullable=False) # Primary Step Offset 5
REDUNDANT_STEP_OFFSET_1 = Column(Integer, nullable=False) # Redundant Step Offset 1
REDUNDANT_STEP_OFFSET_2 = Column(Integer, nullable=False) # Redundant Step Offset 2
REDUNDANT_STEP_OFFSET_3 = Column(Integer, nullable=False) # Redundant Step Offset 3
REDUNDANT_STEP_OFFSET_4 = Column(Integer, nullable=False) # Redundant Step Offset 4
REDUNDANT_STEP_OFFSET_5 = Column(Integer, nullable=False) # Redundant Step Offset 5
PRIMARY_RESOLVER_POSITION_1 = Column(Integer, nullable=False) # Primary Resolver Position 1
PRIMARY_RESOLVER_POSITION_2 = Column(Integer, nullable=False) # Primary Resolver Position 2
PRIMARY_RESOLVER_POSITION_3 = Column(Integer, nullable=False) # Primary Resolver Position 3
PRIMARY_RESOLVER_POSITION_4 = Column(Integer, nullable=False) # Primary Resolver Position 4
PRIMARY_RESOLVER_POSITION_5 = Column(Integer, nullable=False) # Primary Resolver Position 5
REDUNDANT_RESOLVER_POSITION_1 = Column(Integer, nullable=False) # Redundant Resolver Position 1
REDUNDANT_RESOLVER_POSITION_2 = Column(Integer, nullable=False) # Redundant Resolver Position 2
REDUNDANT_RESOLVER_POSITION_3 = Column(Integer, nullable=False) # Redundant Resolver Position 3
REDUNDANT_RESOLVER_POSITION_4 = Column(Integer, nullable=False) # Redundant Resolver Position 4
REDUNDANT_RESOLVER_POSITION_5 = Column(Integer, nullable=False) # Redundant Resolver Position 5


class TLMFiles(Base):
__tablename__ = "tlm_files"
tlm_id = Column(Integer, primary_key=True)
path = Column(String(128), nullable=False)
is_processed = Column(Boolean, nullable=False)

class Health(Base):
__tablename__ = "health"
health_id = Column(Integer, primary_key=True)
datetime = Column(DateTime, nullable=False)
cpu_usage = Column(Float, nullable=False)
memory_usage = Column(Float, nullable=False)
memory_percentage = Column(Float, nullable=False)
disk_usage = Column(Float, nullable=False)
disk_percentage = Column(Float, nullable=False)
num_pids = Column(Integer, nullable=False)


def get_closest_eng_packets(table, timestamp, spacecraft_id):
# find the closest events which are greater/less than the timestamp
gt_event = table.query.filter(table.spacecraft_id == spacecraft_id).filter(table.timestamp > timestamp).order_by(table.timestamp.asc()).first()
lt_event = table.query.filter(table.spacecraft_id == spacecraft_id).filter(table.timestamp < timestamp).order_by(table.timestamp.desc()).first()

if gt_event is None and lt_event is None:
msg = "Could not find packet near that time."
raise MissingCCSDSDataError(msg)
elif gt_event is not None and lt_event is None:
lt_event = gt_event
elif gt_event is None and lt_event is not None:
gt_event = lt_event

return lt_event, gt_event
3 changes: 2 additions & 1 deletion punchpipe/controlsegment/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ def generic_scheduler_flow_logic(

# prepare the new level flow and file
children_files = construct_child_file_info(parent_files, pipeline_config)
database_flow_info = construct_child_flow_info(parent_files, children_files, pipeline_config)
database_flow_info = construct_child_flow_info(parent_files, children_files,
pipeline_config, session=session)
for child_file in children_files:
session.add(child_file)
session.add(database_flow_info)
Expand Down
17 changes: 17 additions & 0 deletions punchpipe/error.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
class PunchPipeWarning(Warning):
pass


class CCSDSPacketConstructionWarning(PunchPipeWarning):
pass


class CCSDSPacketDatabaseUpdateWarning(PunchPipeWarning):
pass

class PunchPipeError(Exception):
pass


class MissingCCSDSDataError(PunchPipeError):
pass
Loading

0 comments on commit 422f9ed

Please sign in to comment.