diff --git a/.gitignore b/.gitignore index b155c0d..0b344ec 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,4 @@ scripts/credentials.py /venv-docs/ build/ punchpipe.egg-info -.idea \ No newline at end of file +.idea diff --git a/README.md b/README.md index 87ba114..e5f75a9 100644 --- a/README.md +++ b/README.md @@ -1,26 +1,26 @@ # punchpipe -`punchpipe` is the data processing pipeline for [the PUNCH mission](https://punch.space.swri.edu/). +`punchpipe` is the data processing pipeline for [the PUNCH mission](https://punch.space.swri.edu/). All the science code and actual calibration functionality lives in `punchbowl`. This package -only automates the control segment for the Science Operations Center. +only automates the control segment for the Science Operations Center. The `punchpipe` is organized into segments, i.e. levels of processing to produce specific -data products. Segments are referred in code by their ending level, -e.g. `level1` means the Level 0 to Level 1 segment. +data products. Segments are referred in code by their ending level, +e.g. `level1` means the Level 0 to Level 1 segment. ## Accessing the data ## First-time setup -1. Create a clean virtual environment. You can do this with conda using `conda env create --name ENVIRONMENT-NAME` -2. Install `punchbowl` using `pip install .` in the `punchbowl` directory. -3. Install `punchpipe` using `pip install .` while in this directory -4. Set up database credentials Prefect block by running `python scripts/credentials.py`. - - If this file does not exist for you. You need to determine your mySQL credentials then create a block in Python: +1. Create a clean virtual environment. You can do this with conda using `conda env create --name ENVIRONMENT-NAME` +2. Install `punchbowl` using `pip install .` in the `punchbowl` directory. +3. Install `punchpipe` using `pip install .` while in this directory +4. Set up database credentials Prefect block by running `python scripts/credentials.py`. + - If this file does not exist for you. You need to determine your mySQL credentials then create a block in Python: ```py from punchpipe.controlsegment.db import MySQLCredentials cred = MySQLCredentials(user="username", password="password") cred.save('mysql-cred') ``` -5. Set up databases by running `scripts/create_db.py` directory. +5. Set up databases by running `scripts/create_db.py` directory. 6. Build all the necessary deployments for Prefect by following [these instructions](https://docs.prefect.io/concepts/deployments/). - See below for an example: ```shell @@ -35,9 +35,9 @@ e.g. `level1` means the Level 0 to Level 1 segment. 3. Create agents for the work queues by following the instructions in the UI for the work queue ## Resetting -1. Reset the Prefect Orion database using `prefect orion database reset`. +1. Reset the Prefect Orion database using `prefect orion database reset`. 2. Remove all the `punchpipe` databases by running `erase_db.sql` ## Contributing - -## Licensing \ No newline at end of file + +## Licensing diff --git a/docs/source/control/configuration.rst b/docs/source/control/configuration.rst index a7b844a..135f3fc 100644 --- a/docs/source/control/configuration.rst +++ b/docs/source/control/configuration.rst @@ -1,4 +1,4 @@ Configuration ============== -How to set up a configuration file. \ No newline at end of file +How to set up a configuration file. diff --git a/docs/source/control/index.rst b/docs/source/control/index.rst index 6055fe2..5c6768c 100644 --- a/docs/source/control/index.rst +++ b/docs/source/control/index.rst @@ -6,4 +6,4 @@ Control Segment Design :caption: Contents: states - configuration \ No newline at end of file + configuration diff --git a/old_config.yaml b/old_config.yaml index 2dac064..4aa6b4f 100644 --- a/old_config.yaml +++ b/old_config.yaml @@ -25,4 +25,4 @@ scheduler: process_options: pointing: - num_quads: 100 \ No newline at end of file + num_quads: 100 diff --git a/punchpipe/controlsegment/__init__.py b/punchpipe/controlsegment/__init__.py index b563a36..dbedbb3 100644 --- a/punchpipe/controlsegment/__init__.py +++ b/punchpipe/controlsegment/__init__.py @@ -1,13 +1,12 @@ from datetime import datetime -import pytest import numpy as np +import pytest from astropy.nddata import StdDevUncertainty from astropy.wcs import WCS from punchbowl.data import NormalizedMetadata, PUNCHData -from punchpipe.controlsegment.db import Base, Flow, File -from punchpipe.controlsegment.processor import generic_process_flow_logic +from punchpipe.controlsegment.db import File from punchpipe.controlsegment.util import match_data_with_file_db_entry diff --git a/punchpipe/controlsegment/db.py b/punchpipe/controlsegment/db.py index a921275..64dbb5a 100644 --- a/punchpipe/controlsegment/db.py +++ b/punchpipe/controlsegment/db.py @@ -1,10 +1,10 @@ -from typing import Optional import os +from typing import Optional from prefect.blocks.core import Block from pydantic import SecretStr -from sqlalchemy import Column, ForeignKey, Integer, String, create_engine, DateTime, Float, TEXT -from sqlalchemy.orm import declarative_base, relationship +from sqlalchemy import TEXT, Column, DateTime, Integer, String +from sqlalchemy.orm import declarative_base class MySQLCredentials(Block): @@ -84,4 +84,3 @@ class FileRelationship(Base): relationship_id = Column(Integer, primary_key=True) parent = Column(Integer, nullable=False) child = Column(Integer, nullable=False) - diff --git a/punchpipe/controlsegment/launcher.py b/punchpipe/controlsegment/launcher.py index 343899a..14be0c9 100644 --- a/punchpipe/controlsegment/launcher.py +++ b/punchpipe/controlsegment/launcher.py @@ -1,13 +1,14 @@ -from typing import List from datetime import datetime, timedelta +from typing import List -from prefect import task, flow, get_run_logger +from prefect import flow, get_run_logger, task from prefect.client import get_client from sqlalchemy import and_ from sqlalchemy.orm import Session from punchpipe.controlsegment.db import Flow -from punchpipe.controlsegment.util import get_database_session, load_pipeline_configuration +from punchpipe.controlsegment.util import (get_database_session, + load_pipeline_configuration) @task diff --git a/punchpipe/controlsegment/processor.py b/punchpipe/controlsegment/processor.py index bf817ee..f82d135 100644 --- a/punchpipe/controlsegment/processor.py +++ b/punchpipe/controlsegment/processor.py @@ -1,14 +1,13 @@ -from datetime import datetime import json -import os +from datetime import datetime from prefect.context import get_run_context -from punchpipe.controlsegment.db import Flow, File +from punchpipe.controlsegment.db import File, Flow from punchpipe.controlsegment.util import (get_database_session, load_pipeline_configuration, - write_file, - match_data_with_file_db_entry) + match_data_with_file_db_entry, + write_file) def generic_process_flow_logic(flow_id: int, core_flow_to_launch, diff --git a/punchpipe/controlsegment/scheduler.py b/punchpipe/controlsegment/scheduler.py index 3f47497..4960690 100644 --- a/punchpipe/controlsegment/scheduler.py +++ b/punchpipe/controlsegment/scheduler.py @@ -1,7 +1,8 @@ -import itertools from punchpipe.controlsegment.db import File, FileRelationship -from punchpipe.controlsegment.util import get_database_session, update_file_state, load_pipeline_configuration +from punchpipe.controlsegment.util import (get_database_session, + load_pipeline_configuration, + update_file_state) def generic_scheduler_flow_logic(query_ready_files_func, diff --git a/punchpipe/controlsegment/tests/conftest.py b/punchpipe/controlsegment/tests/conftest.py index 5acfa6f..782eaf0 100644 --- a/punchpipe/controlsegment/tests/conftest.py +++ b/punchpipe/controlsegment/tests/conftest.py @@ -4,4 +4,4 @@ @pytest.fixture(scope='session') def pmr_mysql_config(): - return MysqlConfig(image='mariadb:latest') \ No newline at end of file + return MysqlConfig(image='mariadb:latest') diff --git a/punchpipe/controlsegment/tests/test_launcher.py b/punchpipe/controlsegment/tests/test_launcher.py index 5209dbe..6ff6bd5 100644 --- a/punchpipe/controlsegment/tests/test_launcher.py +++ b/punchpipe/controlsegment/tests/test_launcher.py @@ -1,17 +1,16 @@ -from datetime import datetime import os +from datetime import datetime -from pytest_mock_resources import create_mysql_fixture -from prefect.testing.utilities import prefect_test_harness -from prefect.logging import disable_run_logger from freezegun import freeze_time +from prefect.logging import disable_run_logger +from prefect.testing.utilities import prefect_test_harness +from pytest_mock_resources import create_mysql_fixture -from punchpipe import __version__ -from punchpipe.controlsegment.db import Base, Flow, File -from punchpipe.controlsegment.launcher import (gather_planned_flows, - count_running_flows, +from punchpipe.controlsegment.db import Base, File, Flow +from punchpipe.controlsegment.launcher import (count_running_flows, + escalate_long_waiting_flows, filter_for_launchable_flows, - escalate_long_waiting_flows) + gather_planned_flows) from punchpipe.controlsegment.util import load_pipeline_configuration TEST_DIR = os.path.dirname(__file__) @@ -132,7 +131,3 @@ def test_filter_for_launchable_flows_with_empty_db(db_empty): def test_launch_ready_flows(): pass - - - - diff --git a/punchpipe/controlsegment/tests/test_processor.py b/punchpipe/controlsegment/tests/test_processor.py index 6fccc21..a470b9f 100644 --- a/punchpipe/controlsegment/tests/test_processor.py +++ b/punchpipe/controlsegment/tests/test_processor.py @@ -1,20 +1,19 @@ -from datetime import datetime import json import os import shutil +from datetime import datetime -import pytest -from prefect import flow, task -from prefect.testing.utilities import prefect_test_harness -from pytest_mock_resources import create_mysql_fixture import numpy as np +import pytest from astropy.nddata import StdDevUncertainty from astropy.wcs import WCS +from prefect import flow +from prefect.testing.utilities import prefect_test_harness from punchbowl.data import NormalizedMetadata, PUNCHData +from pytest_mock_resources import create_mysql_fixture -from punchpipe.controlsegment.db import Base, Flow, File +from punchpipe.controlsegment.db import Base, File, Flow from punchpipe.controlsegment.processor import generic_process_flow_logic -from punchpipe.controlsegment.util import match_data_with_file_db_entry TESTDATA_DIR = os.path.dirname(__file__) diff --git a/punchpipe/controlsegment/util.py b/punchpipe/controlsegment/util.py index 4afe82e..6f70e01 100644 --- a/punchpipe/controlsegment/util.py +++ b/punchpipe/controlsegment/util.py @@ -1,14 +1,13 @@ import os +import yaml +from prefect import task +from punchbowl.data import PUNCHData from sqlalchemy import create_engine from sqlalchemy.orm import Session -from prefect import task -import yaml from yaml.loader import FullLoader -from punchbowl.data import PUNCHData -from punchpipe.controlsegment.db import MySQLCredentials -from punchpipe.controlsegment.db import Flow, File, FileRelationship +from punchpipe.controlsegment.db import File, MySQLCredentials def get_database_session(): diff --git a/punchpipe/flows/level1.py b/punchpipe/flows/level1.py index 5755479..371f597 100644 --- a/punchpipe/flows/level1.py +++ b/punchpipe/flows/level1.py @@ -1,15 +1,14 @@ -from datetime import datetime import json import os import typing as t +from datetime import datetime -from sqlalchemy import and_ from prefect import flow, task -from prefect.context import get_run_context from punchbowl.level1.flow import level1_core_flow +from sqlalchemy import and_ from punchpipe import __version__ -from punchpipe.controlsegment.db import Flow, File +from punchpipe.controlsegment.db import File, Flow from punchpipe.controlsegment.processor import generic_process_flow_logic from punchpipe.controlsegment.scheduler import generic_scheduler_flow_logic diff --git a/punchpipe/flows/level2.py b/punchpipe/flows/level2.py index a326100..7493c5f 100644 --- a/punchpipe/flows/level2.py +++ b/punchpipe/flows/level2.py @@ -1,14 +1,14 @@ -from datetime import datetime, timedelta import json import os import typing as t +from datetime import datetime, timedelta -from sqlalchemy import and_ from prefect import flow, task from punchbowl.level2.flow import level2_core_flow +from sqlalchemy import and_ from punchpipe import __version__ -from punchpipe.controlsegment.db import Flow, File +from punchpipe.controlsegment.db import File, Flow from punchpipe.controlsegment.processor import generic_process_flow_logic from punchpipe.controlsegment.scheduler import generic_scheduler_flow_logic diff --git a/punchpipe/flows/tests/conftest.py b/punchpipe/flows/tests/conftest.py index 5acfa6f..782eaf0 100644 --- a/punchpipe/flows/tests/conftest.py +++ b/punchpipe/flows/tests/conftest.py @@ -4,4 +4,4 @@ @pytest.fixture(scope='session') def pmr_mysql_config(): - return MysqlConfig(image='mariadb:latest') \ No newline at end of file + return MysqlConfig(image='mariadb:latest') diff --git a/punchpipe/flows/tests/test_level1.py b/punchpipe/flows/tests/test_level1.py index 09ddb2b..06e7e26 100644 --- a/punchpipe/flows/tests/test_level1.py +++ b/punchpipe/flows/tests/test_level1.py @@ -1,13 +1,16 @@ -from datetime import datetime import os +from datetime import datetime -from pytest_mock_resources import create_mysql_fixture from prefect.testing.utilities import prefect_test_harness +from pytest_mock_resources import create_mysql_fixture from punchpipe import __version__ -from punchpipe.controlsegment.db import Base, Flow, File +from punchpipe.controlsegment.db import Base, File, Flow from punchpipe.controlsegment.util import load_pipeline_configuration -from punchpipe.flows.level1 import level1_query_ready_files, level1_construct_file_info, level1_construct_flow_info, level1_scheduler_flow +from punchpipe.flows.level1 import (level1_construct_file_info, + level1_construct_flow_info, + level1_query_ready_files, + level1_scheduler_flow) TEST_DIR = os.path.dirname(__file__) @@ -84,7 +87,7 @@ def test_level1_construct_flow_info(): def test_level1_scheduler_flow(db): pipeline_config_path = os.path.join(TEST_DIR, "config.yaml") with prefect_test_harness(): - outcome = level1_scheduler_flow(pipeline_config_path, db) + level1_scheduler_flow(pipeline_config_path, db) results = db.query(Flow).where(Flow.state == 'planned').all() assert len(results) == 1 diff --git a/punchpipe/flows/tests/test_level2.py b/punchpipe/flows/tests/test_level2.py index 8dabf6c..20f9c0e 100644 --- a/punchpipe/flows/tests/test_level2.py +++ b/punchpipe/flows/tests/test_level2.py @@ -1,14 +1,17 @@ -from datetime import datetime import os +from datetime import datetime -from pytest_mock_resources import create_mysql_fixture -from prefect.testing.utilities import prefect_test_harness from freezegun import freeze_time +from prefect.testing.utilities import prefect_test_harness +from pytest_mock_resources import create_mysql_fixture from punchpipe import __version__ -from punchpipe.controlsegment.db import Base, Flow, File +from punchpipe.controlsegment.db import Base, File, Flow from punchpipe.controlsegment.util import load_pipeline_configuration -from punchpipe.flows.level2 import level2_query_ready_files, level2_construct_file_info, level2_construct_flow_info, level2_scheduler_flow +from punchpipe.flows.level2 import (level2_construct_file_info, + level2_construct_flow_info, + level2_query_ready_files, + level2_scheduler_flow) TEST_DIR = os.path.dirname(__file__) @@ -38,7 +41,7 @@ def session_fn(session): def test_level2_query_ready_files(db): - with freeze_time(datetime(2023, 1, 1, 0, 5, 0)) as frozen_datatime: + with freeze_time(datetime(2023, 1, 1, 0, 5, 0)) as frozen_datatime: # noqa: F841 pipeline_config = {'levels': {'level2_process_flow': {'schedule': {'latency': 3, 'window_duration_seconds': 3}}}} ready_file_ids = level2_query_ready_files.fn(db, pipeline_config) assert len(ready_file_ids) == 1 @@ -88,7 +91,7 @@ def test_level2_construct_flow_info(): def test_level2_scheduler_flow(db): pipeline_config_path = os.path.join(TEST_DIR, "config.yaml") with prefect_test_harness(): - outcome = level2_scheduler_flow(pipeline_config_path, db) + level2_scheduler_flow(pipeline_config_path, db) results = db.query(Flow).where(Flow.state == 'planned').all() assert len(results) == 0 diff --git a/punchpipe/monitor/app.py b/punchpipe/monitor/app.py index 4ee0189..a646d57 100644 --- a/punchpipe/monitor/app.py +++ b/punchpipe/monitor/app.py @@ -1,13 +1,13 @@ # Run this app with `python app.py` and # visit http://127.0.0.1:8050/ in your web browser. -from dash import Dash, dcc, html, Input, Output, State, dash_table -import plotly.express as px +import json +from datetime import date, timedelta + import pandas as pd -import numpy as np -from datetime import timedelta, date +import plotly.express as px +from dash import Dash, Input, Output, dash_table, dcc, html from dash.exceptions import PreventUpdate -import json app = Dash(__name__) diff --git a/punchpipe/monitor/create_fake_data.py b/punchpipe/monitor/create_fake_data.py index a466d11..7f222e1 100644 --- a/punchpipe/monitor/create_fake_data.py +++ b/punchpipe/monitor/create_fake_data.py @@ -1,10 +1,12 @@ import random -import numpy as np +import uuid +from datetime import datetime, timedelta +from random import randrange + import coolname +import numpy as np import pandas as pd -from random import randrange -from datetime import datetime, timedelta -import uuid + def random_date(start, end): """ diff --git a/punchpipe/monitor/dp_test.py b/punchpipe/monitor/dp_test.py index 5bd3ca8..d940835 100644 --- a/punchpipe/monitor/dp_test.py +++ b/punchpipe/monitor/dp_test.py @@ -1,75 +1,75 @@ -from datetime import timedelta, datetime - -import pandas as pd -import datapane as dp -import plotly.express as px -import numpy as np - - -def _create_overall_blocks(start_date, end_date, df): - return [f"## Overall: {start_date.strftime('%Y/%m/%d %H:%M')} to {end_date.strftime('%Y/%m/%d %H:%M')} "] - - -def _process_level(start_date, end_date, level, df): - filtered_df = df[((df['start_time'] > start_date) * (df['end_time'] < end_date) * (df['flow_type'] == f"Level {level}") * (df['state'] == "completed")) | ((df['state'] == "running") * (df['start_time'] > start_date))] - filtered_df.set_index("flow_id") - if len(filtered_df): - completed = df[ - (df['start_time'] > start_date) * (df['end_time'] < end_date) * (df['flow_type'] == f"Level {level}") * ( - df['state'] == "completed")] - completed['duration'] = (completed['end_time'] - completed['start_time']).map(timedelta.total_seconds) - average_duration = np.mean(completed['duration']) - stddev_duration = np.std(completed['duration']) - running_flow_count = len(df[(df['state'] == "running") * (df['start_time'] > start_date)]) - - plot = px.histogram(completed, x='duration') - blocks = [f"## Level {level}: {start_date.strftime('%Y/%m/%d %H:%M')} to {end_date.strftime('%Y/%m/%d %H:%M')}", - dp.Group( - dp.BigNumber(heading="Average Duration [sec]", value=f"{average_duration:.1f}", change=3.01, is_upward_change=True, is_positive_intent=False), - dp.BigNumber(heading='Stddev Duration [sec]', value=f"{stddev_duration: .1f}", prev_value=f"{stddev_duration-1: .1f}"), - dp.BigNumber(heading="Number of files written", value=512, prev_value=510), - dp.BigNumber(heading="Running flow count", value=running_flow_count), - dp.BigNumber(heading="Queued flow count", value=0), - columns=3 - ), - dp.Plot(plot), - dp.DataTable(filtered_df) - ] - stats = [] - else: - blocks = [f"## Level {level}: {start_date.strftime('%Y/%m/%d %H:%M')} to {end_date.strftime('%Y/%m/%d %H:%M')}", - "No data"] - stats = [] - return blocks, stats - - -def _create_alert_blocks(start_date, end_date): - return [f"## Alerts: {start_date.strftime('%Y/%m/%d %H:%M')} to {end_date.strftime('%Y/%m/%d %H:%M')}"] - - -def generate_monitoring_pages(start_date=datetime.now()-timedelta(days=3), end_date=datetime.now()): - # download data & group by manufacturer - df = pd.read_csv("/Users/jhughes/Desktop/repos/punchpipe/punchpipe/monitor/sample.csv", - parse_dates=['creation_time', 'start_time', 'end_time']) - - - level0_blocks, level0_stats = _process_level(start_date, end_date, 0, df) - level1_blocks, level1_stats = _process_level(start_date, end_date, 1, df) - level2_blocks, level2_stats = _process_level(start_date, end_date, 2, df) - level3_blocks, level3_stats = _process_level(start_date, end_date, 3, df) - - # embed into a Datapane app - app = dp.App( - dp.Page(title="Overall", blocks=_create_overall_blocks(start_date, end_date, df)), - dp.Page(title="Alerts", blocks=_create_alert_blocks(start_date, end_date)), - dp.Page(title="Level 0", blocks=level0_blocks), - dp.Page(title="Level 1", blocks=level1_blocks), - dp.Page(title="Level 2", blocks=level2_blocks), - dp.Page(title="Level 3", blocks=level3_blocks) - ) - - app.save("monitor.html") - - -if __name__ == "__main__": - generate_monitoring_pages(start_date=datetime(2022, 12, 1), end_date=datetime.now()) \ No newline at end of file +# from datetime import datetime, timedelta +# +# import datapane as dp +# import numpy as np +# import pandas as pd +# import plotly.express as px +# +# +# def _create_overall_blocks(start_date, end_date, df): +# return [f"## Overall: {start_date.strftime('%Y/%m/%d %H:%M')} to {end_date.strftime('%Y/%m/%d %H:%M')} "] +# +# +# def _process_level(start_date, end_date, level, df): +# filtered_df = df[((df['start_time'] > start_date) * (df['end_time'] < end_date) * (df['flow_type'] == f"Level {level}") * (df['state'] == "completed")) | ((df['state'] == "running") * (df['start_time'] > start_date))] +# filtered_df.set_index("flow_id") +# if len(filtered_df): +# completed = df[ +# (df['start_time'] > start_date) * (df['end_time'] < end_date) * (df['flow_type'] == f"Level {level}") * ( +# df['state'] == "completed")] +# completed['duration'] = (completed['end_time'] - completed['start_time']).map(timedelta.total_seconds) +# average_duration = np.mean(completed['duration']) +# stddev_duration = np.std(completed['duration']) +# running_flow_count = len(df[(df['state'] == "running") * (df['start_time'] > start_date)]) +# +# plot = px.histogram(completed, x='duration') +# blocks = [f"## Level {level}: {start_date.strftime('%Y/%m/%d %H:%M')} to {end_date.strftime('%Y/%m/%d %H:%M')}", +# dp.Group( +# dp.BigNumber(heading="Average Duration [sec]", value=f"{average_duration:.1f}", change=3.01, is_upward_change=True, is_positive_intent=False), +# dp.BigNumber(heading='Stddev Duration [sec]', value=f"{stddev_duration: .1f}", prev_value=f"{stddev_duration-1: .1f}"), +# dp.BigNumber(heading="Number of files written", value=512, prev_value=510), +# dp.BigNumber(heading="Running flow count", value=running_flow_count), +# dp.BigNumber(heading="Queued flow count", value=0), +# columns=3 +# ), +# dp.Plot(plot), +# dp.DataTable(filtered_df) +# ] +# stats = [] +# else: +# blocks = [f"## Level {level}: {start_date.strftime('%Y/%m/%d %H:%M')} to {end_date.strftime('%Y/%m/%d %H:%M')}", +# "No data"] +# stats = [] +# return blocks, stats +# +# +# def _create_alert_blocks(start_date, end_date): +# return [f"## Alerts: {start_date.strftime('%Y/%m/%d %H:%M')} to {end_date.strftime('%Y/%m/%d %H:%M')}"] +# +# +# def generate_monitoring_pages(start_date=datetime.now()-timedelta(days=3), end_date=datetime.now()): +# # download data & group by manufacturer +# df = pd.read_csv("/Users/jhughes/Desktop/repos/punchpipe/punchpipe/monitor/sample.csv", +# parse_dates=['creation_time', 'start_time', 'end_time']) +# +# +# level0_blocks, level0_stats = _process_level(start_date, end_date, 0, df) +# level1_blocks, level1_stats = _process_level(start_date, end_date, 1, df) +# level2_blocks, level2_stats = _process_level(start_date, end_date, 2, df) +# level3_blocks, level3_stats = _process_level(start_date, end_date, 3, df) +# +# # embed into a Datapane app +# app = dp.App( +# dp.Page(title="Overall", blocks=_create_overall_blocks(start_date, end_date, df)), +# dp.Page(title="Alerts", blocks=_create_alert_blocks(start_date, end_date)), +# dp.Page(title="Level 0", blocks=level0_blocks), +# dp.Page(title="Level 1", blocks=level1_blocks), +# dp.Page(title="Level 2", blocks=level2_blocks), +# dp.Page(title="Level 3", blocks=level3_blocks) +# ) +# +# app.save("monitor.html") +# +# +# if __name__ == "__main__": +# generate_monitoring_pages(start_date=datetime(2022, 12, 1), end_date=datetime.now()) diff --git a/punchpipe/monitor/flow.py b/punchpipe/monitor/flow.py index 218e4a7..ac6c2fb 100644 --- a/punchpipe/monitor/flow.py +++ b/punchpipe/monitor/flow.py @@ -1,10 +1,10 @@ from typing import List -from sqlalchemy import and_, create_engine -from sqlalchemy.orm import Session from prefect import flow, task +from sqlalchemy import create_engine +from sqlalchemy.orm import Session -from punchpipe.controlsegment.db import Flow, MySQLCredentials, File, FileRelationship +from punchpipe.controlsegment.db import Flow, MySQLCredentials @task diff --git a/punchpipe/monitor/monitor.py b/punchpipe/monitor/monitor.py index 3fe899c..20ec8ff 100644 --- a/punchpipe/monitor/monitor.py +++ b/punchpipe/monitor/monitor.py @@ -1,21 +1,18 @@ -from datetime import timedelta, datetime import os +from datetime import datetime, timedelta -import pandas as pd import datapane as dp -import plotly.express as px -import numpy as np -from sqlalchemy import and_, or_ -from sqlalchemy.orm.exc import NoResultFound, MultipleResultsFound -from sqlalchemy import create_engine -from sqlalchemy.orm import Session -from astropy.io import fits import matplotlib.pyplot as plt +import numpy as np +import pandas as pd +import plotly.express as px from punchbowl.data import PUNCHData +from sqlalchemy import and_, create_engine, or_ +from sqlalchemy.orm import Session +from sqlalchemy.orm.exc import MultipleResultsFound, NoResultFound -from punchpipe.controlsegment.db import MySQLCredentials -from punchpipe.controlsegment.util import get_database_session -from punchpipe.controlsegment.db import Flow, File, FileRelationship +from punchpipe.controlsegment.db import (File, FileRelationship, Flow, + MySQLCredentials) def _create_overall_blocks(start_date, end_date): @@ -38,8 +35,8 @@ def _process_level(start_date, end_date, level): or_(and_(Flow.start_time > previous_interval_start, Flow.end_time < previous_interval_end, Flow.flow_level == level, Flow.state == 'completed'), and_(Flow.flow_level == level, Flow.state == 'running', Flow.start_time > start_date))).statement - previous_file_query = session.query(File).where( - and_(File.date_obs > previous_interval_start, File.date_obs < previous_interval_end, File.level == level)).statement + # previous_file_query = session.query(File).where( + # and_(File.date_obs > previous_interval_start, File.date_obs < previous_interval_end, File.level == level)).statement flow_df = pd.read_sql_query(sql=flow_query, con=engine) flow_df['duration'] = (flow_df['end_time'] - flow_df['start_time']).dt.total_seconds() @@ -49,7 +46,7 @@ def _process_level(start_date, end_date, level): previous_flow_df = pd.read_sql_query(sql=previous_flow_query, con=engine) previous_flow_df['duration'] = (previous_flow_df['end_time'] - previous_flow_df['start_time']).dt.total_seconds() previous_flow_df['delay'] = (previous_flow_df['start_time'] - previous_flow_df['creation_time']).dt.total_seconds() - previous_file_df = pd.read_sql_query(sql=previous_file_query, con=engine) + # previous_file_df = pd.read_sql_query(sql=previous_file_query, con=engine) if len(flow_df): completed = flow_df[flow_df['state'] == "completed"] @@ -66,7 +63,7 @@ def _process_level(start_date, end_date, level): previous_average_duration = np.nanmean(previous_completed['duration']) previous_stddev_duration = np.nanstd(previous_completed['duration']) - planned_count = len(flow_df[flow_df['state'] == 'planned']) + # planned_count = len(flow_df[flow_df['state'] == 'planned']) running_flow_count = len(flow_df[(flow_df['state'] == "running") * (flow_df['start_time'] > start_date)]) written_count = len(file_df[file_df['state'] == 'created']) + len(file_df[file_df['state'] == 'progressed']) @@ -142,9 +139,9 @@ def _file_inquiry(file_id): provenance = f"""**Children FileIDs**: {children_ids} \n **Parent FileIDs**: {parent_ids}""" return dp.View(f"# FileID={file_id}", dp.Text(info_markdown), dp.Plot(fig), dp.Text(provenance)) - except MultipleResultsFound as e: + except MultipleResultsFound: return dp.View(f"Multiple files with file_id={file_id} found.") - except NoResultFound as e: + except NoResultFound: return dp.View(f"No file with file_id={file_id} found.") @@ -162,9 +159,9 @@ def _flow_inquiry(flow_id): # Make table info_markdown = row2table(flow_entry) return dp.View(f"# FlowID={flow_id}", dp.Text(info_markdown)) - except MultipleResultsFound as e: + except MultipleResultsFound: return dp.View(f"Multiple files with file_id={flow_id} found.") - except NoResultFound as e: + except NoResultFound: return dp.View(f"No file with file_id={flow_id} found.") diff --git a/scripts/create_db.py b/scripts/create_db.py index d4a2f87..949119d 100644 --- a/scripts/create_db.py +++ b/scripts/create_db.py @@ -1,10 +1,6 @@ -from punchpipe.controlsegment.db import MySQLCredentials, Base, Flow, File -from punchpipe.flows.level1 import level1_scheduler_flow - from sqlalchemy import create_engine -from sqlalchemy.orm import Session -import json -from datetime import datetime + +from punchpipe.controlsegment.db import Base, MySQLCredentials if __name__ == "__main__": # Base = declarative_base() diff --git a/scripts/deploy.sh b/scripts/deploy.sh index ed015e5..c709de8 100755 --- a/scripts/deploy.sh +++ b/scripts/deploy.sh @@ -7,4 +7,4 @@ prefect deployment build $root_path/punchpipe/flows/level1.py:level1_scheduler_f prefect deployment build $root_path/punchpipe/controlsegment/launcher.py:launcher_flow -n launcher_flow prefect deployment apply level1_process_flow-deployment.yaml prefect deployment apply level1_scheduler_flow-deployment.yaml -prefect deployment apply launcher_flow-deployment.yaml \ No newline at end of file +prefect deployment apply launcher_flow-deployment.yaml diff --git a/scripts/setup_db.sql b/scripts/setup_db.sql index e2cdd0c..69fa040 100644 --- a/scripts/setup_db.sql +++ b/scripts/setup_db.sql @@ -48,4 +48,4 @@ CREATE TABLE relationships ( REFERENCES files(file_id), FOREIGN KEY (child) REFERENCES files(file_id) -); \ No newline at end of file +); diff --git a/scripts/test_create_ready_level0.py b/scripts/test_create_ready_level0.py index b6ad39e..d2aeca0 100644 --- a/scripts/test_create_ready_level0.py +++ b/scripts/test_create_ready_level0.py @@ -1,16 +1,16 @@ -from datetime import datetime import json import os +from datetime import datetime import numpy as np from astropy.nddata import StdDevUncertainty from astropy.wcs import WCS from prefect import flow, task +from punchbowl.data import NormalizedMetadata, PUNCHData from sqlalchemy import create_engine from sqlalchemy.orm import Session -from punchbowl.data import PUNCHData, NormalizedMetadata -from punchpipe.controlsegment.db import Flow, File, MySQLCredentials +from punchpipe.controlsegment.db import File, Flow, MySQLCredentials @task diff --git a/setup.py b/setup.py index 809053f..7a39cf3 100644 --- a/setup.py +++ b/setup.py @@ -1,6 +1,7 @@ -from setuptools import setup, find_packages import pathlib +from setuptools import find_packages, setup + here = pathlib.Path(__file__).parent.resolve() long_description = (here / 'README.md').read_text(encoding='utf-8')