Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare for End2End Test #81

Merged
merged 24 commits into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
15c83b7
lengthen software version
jmbhughes Nov 13, 2024
ebda996
expand config
Nov 13, 2024
ed9633d
add voters to flow construction
jmbhughes Nov 13, 2024
b7751e8
fix number of voters
jmbhughes Nov 13, 2024
36c996b
avoid mixing in clear
jmbhughes Nov 13, 2024
33dc721
move config to variable, use gunicorn
jmbhughes Nov 16, 2024
9629968
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 16, 2024
19bd053
rename controlsegment, change deploy, move health stat check
jmbhughes Nov 17, 2024
9cf865a
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 17, 2024
b1a54f3
fixed monitoring labels and units
jmbhughes Nov 17, 2024
8021758
Merge remote-tracking branch 'origin/mhughes-nov13' into mhughes-nov13
jmbhughes Nov 17, 2024
0008223
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 17, 2024
12941b8
fixed monitoring units
jmbhughes Nov 17, 2024
272b3bb
Merge remote-tracking branch 'origin/mhughes-nov13' into mhughes-nov13
jmbhughes Nov 17, 2024
b6fc72c
fix included strings
jmbhughes Nov 17, 2024
3e16edf
wait longer before setting the variable
jmbhughes Nov 17, 2024
3bb2971
fixed install
jmbhughes Nov 17, 2024
13de576
wait even longer
jmbhughes Nov 17, 2024
34f305b
fix table
jmbhughes Nov 17, 2024
708b35b
fix config path confusion
jmbhughes Nov 17, 2024
be814c1
made table filterable
jmbhughes Nov 17, 2024
3799a52
fix table name
jmbhughes Nov 17, 2024
82240b5
improve table layout
jmbhughes Nov 17, 2024
7613d1c
improve table layout
jmbhughes Nov 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions config.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
root: "/d0/punchsoc/gamera_data/"
root: "/Users/jhughes/Desktop/d0/"
input_drop: "dropzone/"
tlm_directory: "dropzone/"
file_version: "1"
Expand All @@ -17,15 +17,16 @@ scheduler:
max_start: 10

launcher:
max_flows_running: 5
schedule: "* * * * *"
max_flows_running: 25

levels:
level0_process_flow:
priority:
initial: 5
seconds: [ 30, 120, 600 ]
escalation: [ 10, 20, 30 ]
schedule:
schedule: "* * * * *"
options:
num_quads: 100

Expand All @@ -34,29 +35,29 @@ levels:
initial: 6
seconds: [ 30, 120, 600 ]
escalation: [ 11, 21, 31 ]
schedule:
schedule: "* * * * *"
options:

level2_process_flow:
priority:
initial: 1000
seconds: [ 3000, 12000, 60000 ]
escalation: [ 1000, 1000, 1000 ]
schedule:
schedule: "* * * * *"
options:

levelq_process_flow:
priority:
initial: 1000
seconds: [ 3000, 12000, 60000 ]
escalation: [ 1000, 1000, 1000 ]
schedule:
schedule: "* * * * *"
options:

level3_PTM_process_flow:
priority:
initial: 10000
seconds: [1]
escalation: [10000]
schedule:
schedule: "* * * * *"
options:
49 changes: 0 additions & 49 deletions deploy.py

This file was deleted.

143 changes: 133 additions & 10 deletions punchpipe/cli.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,145 @@
import os
import time
import subprocess
import multiprocessing as mp
from pathlib import Path
from datetime import datetime

import click
from prefect import flow, serve
from prefect.variables import Variable

from .monitor.app import create_app
from punchpipe.control.health import update_machine_health_stats
from punchpipe.control.launcher import launcher_flow
from punchpipe.control.util import load_pipeline_configuration
from punchpipe.flows.level1 import level1_process_flow, level1_scheduler_flow
from punchpipe.flows.level2 import level2_process_flow, level2_scheduler_flow
from punchpipe.flows.level3 import level3_PTM_process_flow, level3_PTM_scheduler_flow
from punchpipe.flows.levelq import levelq_process_flow, levelq_scheduler_flow
from punchpipe.monitor.app import create_app

THIS_DIR = os.path.dirname(__file__)
app = create_app()
server = app.server

@click.group
def main():
"""Run the PUNCH automated pipeline"""

def launch_monitor():
app = create_app()
app.run_server(debug=False, port=8051)

@flow
def my_flow():
print("Hello, Prefect!")


def serve_flows(configuration_path):
config = load_pipeline_configuration.fn(configuration_path)
launcher_deployment = launcher_flow.to_deployment(name="launcher-deployment",
description="Launch a pipeline segment.",
cron=config['launcher'].get("schedule", "* * * * *"),
parameters={"pipeline_configuration_path": configuration_path}
)

level1_scheduler_deployment = level1_scheduler_flow.to_deployment(name="level1-scheduler-deployment",
description="Schedule a Level 1 flow.",
cron="* * * * *",
parameters={"pipeline_config_path": configuration_path}
)
level1_process_deployment = level1_process_flow.to_deployment(name="level1_process_flow",
description="Process a file from Level 0 to Level 1.",
parameters={"pipeline_config_path": configuration_path}
)

level2_scheduler_deployment = level2_scheduler_flow.to_deployment(name="level2-scheduler-deployment",
description="Schedule a Level 2 flow.",
cron="* * * * *",
parameters={
"pipeline_config_path": configuration_path}

)
level2_process_deployment = level2_process_flow.to_deployment(name="level2_process_flow",
description="Process files from Level 1 to Level 2.",
parameters={"pipeline_config_path": configuration_path}
)

levelq_scheduler_deployment = levelq_scheduler_flow.to_deployment(name="levelq-scheduler-deployment",
description="Schedule a Level Q flow.",
cron="* * * * *",
parameters={
"pipeline_config_path": configuration_path}

)
levelq_process_deployment = levelq_process_flow.to_deployment(name="levelq_process_flow",
description="Process files from Level 1 to Level Q.",
parameters={"pipeline_config_path": configuration_path}
)

level3_PTM_scheduler_deployment = level3_PTM_scheduler_flow.to_deployment(name="level3-PTM-scheduler-deployment",
description="Schedule a Level 3 flow to make PTM.",
cron="* * * * *",
parameters={
"pipeline_config_path": configuration_path}

)
level3_PTM_process_deployment = level3_PTM_process_flow.to_deployment(name="level3_PTM_process_flow",
description="Process PTM files from Level 2 to Level 3.",
parameters={
"pipeline_config_path": configuration_path}
)

health_deployment = update_machine_health_stats.to_deployment(name="update-health-stats-deployment",
description="Update the health stats table data.",
cron="* * * * *")

serve(launcher_deployment,
level1_scheduler_deployment, level1_process_deployment,
level2_scheduler_deployment, level2_process_deployment,
levelq_scheduler_deployment, levelq_process_deployment,
level3_PTM_scheduler_deployment, level3_PTM_process_deployment,
health_deployment,
limit=1000
)


@main.command
def run():
print("Launching punchpipe monitor on http://localhost:8051/.")
subprocess.Popen(["prefect", "server", "start"])
print("\npunchpipe Prefect flows must be stopped manually in Prefect.")
mp.Process(target=launch_monitor, args=()).start()
@click.argument("configuration_path", type=click.Path(exists=True))
def run(configuration_path):
now = datetime.now()

configuration_path = str(Path(configuration_path).resolve())
output_path = f"punchpipe_{now.strftime('%Y%m%d_%H%M%S')}.txt"

print()
print(f"Launching punchpipe at {now} with configuration: {configuration_path}")
print(f"Terminal logs from punchpipe are in {output_path}")


with open(output_path, "w") as f:
try:
prefect_process = subprocess.Popen(["prefect", "server", "start"],
stdout=f, stderr=subprocess.STDOUT)
time.sleep(10)
monitor_process = subprocess.Popen(["gunicorn",
"-b", "0.0.0.0:8050",
"--chdir", THIS_DIR,
"cli:server"],
stdout=f, stderr=subprocess.STDOUT)
Variable.set("punchpipe_config", configuration_path, overwrite=True)
print("Launched Prefect dashboard on http://localhost:4200/")
print("Launched punchpipe monitor on http://localhost:8050/")
print("Use ctrl-c to exit.")

serve_flows(configuration_path)
prefect_process.wait()
monitor_process.wait()
except KeyboardInterrupt:
print("Shutting down.")
prefect_process.terminate()
monitor_process.terminate()
print()
print("punchpipe safely shut down.")
except Exception as e:
print(f"Received error: {e}")
prefect_process.terminate()
monitor_process.terminate()
print()
print("punchpipe abruptly shut down.")
File renamed without changes.
2 changes: 1 addition & 1 deletion punchpipe/controlsegment/db.py → punchpipe/control/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class File(Base):
file_type = Column(String(2), nullable=False)
observatory = Column(String(1), nullable=False)
file_version = Column(String(16), nullable=False)
software_version = Column(String(20), nullable=False)
software_version = Column(String(35), nullable=False)
date_created = Column(DateTime, nullable=True)
date_obs = Column(DateTime, nullable=False)
date_beg = Column(DateTime, nullable=True)
Expand Down
31 changes: 31 additions & 0 deletions punchpipe/control/health.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from datetime import datetime

import psutil
from prefect import flow

from punchpipe.control.db import Health
from punchpipe.control.util import get_database_session, load_pipeline_configuration


@flow
def update_machine_health_stats():
config = load_pipeline_configuration()

now = datetime.now()
cpu_usage = psutil.cpu_percent(interval=5)
memory_usage = psutil.virtual_memory().used / 1E9 # store in GB
memory_percentage = psutil.virtual_memory().percent
disk_usage = psutil.disk_usage(config.get("root", "/")).used / 1E9 # store in GB
disk_percentage = psutil.disk_usage(config.get("root", "/")).percent
num_pids = len(psutil.pids())

with get_database_session() as session:
new_health_entry = Health(datetime=now,
cpu_usage=cpu_usage,
memory_usage=memory_usage,
memory_percentage=memory_percentage,
disk_usage=disk_usage,
disk_percentage=disk_percentage,
num_pids=num_pids)
session.add(new_health_entry)
session.commit()
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@

from prefect import flow, get_run_logger, task
from prefect.client import get_client
from prefect.variables import Variable
from sqlalchemy import and_
from sqlalchemy.orm import Session

from punchpipe.controlsegment.db import Flow
from punchpipe.controlsegment.util import get_database_session, load_pipeline_configuration
from punchpipe.control.db import Flow
from punchpipe.control.util import get_database_session, load_pipeline_configuration


@task
Expand Down Expand Up @@ -87,7 +88,7 @@


@flow
async def launcher_flow(pipeline_configuration_path="config.yaml"):
async def launcher_flow(pipeline_configuration_path=None):
"""The main launcher flow for Prefect, responsible for identifying flows, based on priority,
that are ready to run and creating flow runs for them. It also escalates long-waiting flows' priorities.

Expand All @@ -99,6 +100,8 @@
"""
logger = get_run_logger()

if pipeline_configuration_path is None:
pipeline_configuration_path = await Variable.get("punchpipe_config", "punchpipe_config.yaml")

Check warning on line 104 in punchpipe/control/launcher.py

View check run for this annotation

Codecov / codecov/patch

punchpipe/control/launcher.py#L103-L104

Added lines #L103 - L104 were not covered by tests
pipeline_config = load_pipeline_configuration(pipeline_configuration_path)

logger.info("Establishing database connection")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
from prefect import get_run_logger
from prefect.context import get_run_context

from punchpipe.controlsegment.db import File, Flow
from punchpipe.controlsegment.util import (
from punchpipe.control.db import File, Flow
from punchpipe.control.util import (
get_database_session,
load_pipeline_configuration,
match_data_with_file_db_entry,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from punchpipe.controlsegment.db import File, FileRelationship
from punchpipe.controlsegment.util import get_database_session, load_pipeline_configuration, update_file_state

from punchpipe.control.db import File, FileRelationship
from punchpipe.control.util import get_database_session, load_pipeline_configuration, update_file_state


def generic_scheduler_flow_logic(
query_ready_files_func, construct_child_file_info, construct_child_flow_info, pipeline_config_path, session=None
):
# load pipeline configuration
pipeline_config = load_pipeline_configuration(pipeline_config_path)

max_start = pipeline_config['scheduler']['max_start']

# get database connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
from ndcube import NDCube
from punchbowl.data import NormalizedMetadata

from punchpipe.controlsegment.db import File
from punchpipe.controlsegment.util import match_data_with_file_db_entry
from punchpipe.control.db import File
from punchpipe.control.util import match_data_with_file_db_entry


@pytest.fixture()
Expand Down
Loading
Loading