Skip to content

Commit

Permalink
Query manager (#72)
Browse files Browse the repository at this point in the history
* initial query test, mocking env variables

initial query test, mocking env variables

* add more query tests

* add pylint hint

* remove tmp file, accidental addition

* add QueryRunner, fail if a query is running

* add QueryManager, refactor query loading and running. add tests for /start routes

* fix bugs; add alert if query can't start

* Update alert.tsx

Co-authored-by: Alex Koshelev <[email protected]>

* Update sidecar/app/query/base.py

Co-authored-by: Alex Koshelev <[email protected]>

* move logger onto settings, only built once

* wrapper function for query not found

* add comments to Query and QueryManager

* fix typing issue with logger and pydantic

* small refactor to add configure logger method on settings

* create check_capacity helper, add details to message when at capacity

---------

Co-authored-by: Alex Koshelev <[email protected]>
  • Loading branch information
eriktaubeneck and akoshelev authored Jul 17, 2024
1 parent 771b88e commit 7490840
Show file tree
Hide file tree
Showing 16 changed files with 585 additions and 152 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ repos:
entry: coverage report
types: [python]
pass_filenames: false
args: [--fail-under=9] # increase this over time
args: [--fail-under=60] # increase this over time, goal=80
- id: pyre-check
name: pyre-check
entry: pyre check
Expand Down
38 changes: 25 additions & 13 deletions server/app/alert.tsx
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import Link from "next/link";
import { CheckCircleIcon, XMarkIcon } from "@heroicons/react/20/solid";
import {
CheckCircleIcon,
ExclamationTriangleIcon,
} from "@heroicons/react/20/solid";

export default function QueryStartedAlert({ queryId }: { queryId: string }) {
export function QueryStartedAlert({ queryId }: { queryId: string }) {
return (
<div className="rounded-md bg-green-50 p-4">
<div className="-mt-16 mb-4 rounded-md bg-green-50 p-4">
<div className="flex">
<div className="shrink-0">
<CheckCircleIcon
Expand All @@ -19,16 +22,25 @@ export default function QueryStartedAlert({ queryId }: { queryId: string }) {
</Link>.{" "}
</p>
</div>
<div className="ml-auto pl-3">
<div className="-m-1.5">
<button
type="button"
className="inline-flex rounded-md bg-green-50 p-1.5 text-green-500 hover:bg-green-100 focus:outline-none focus:ring-2 focus:ring-green-600 focus:ring-offset-2 focus:ring-offset-green-50"
>
<span className="sr-only">Dismiss</span>
<XMarkIcon className="size-5" aria-hidden="true" />
</button>
</div>
</div>
</div>
);
}

export function QueryFailedToStartAlert({ queryId }: { queryId: string }) {
return (
<div className="-mt-16 mb-4 rounded-md bg-red-50 p-4">
<div className="flex">
<div className="shrink-0">
<ExclamationTriangleIcon
className="size-5 text-red-400"
aria-hidden="true"
/>
</div>
<div className="ml-3">
<p className="text-sm font-medium text-red-800">
Failed to start a query: {queryId}.
</p>
</div>
</div>
</div>
Expand Down
19 changes: 16 additions & 3 deletions server/app/query/create/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
} from "@heroicons/react/20/solid";
import { useRouter } from "next/navigation";
import { ExclamationCircleIcon } from "@heroicons/react/20/solid";
import QueryStartedAlert from "@/app/alert";
import { QueryStartedAlert, QueryFailedToStartAlert } from "@/app/alert";
import {
DemoLoggerRemoteServers,
IPARemoteServers,
Expand All @@ -23,6 +23,9 @@ type QueryType = Database["public"]["Enums"]["query_type"];

export default function Page() {
const [queryId, setQueryId] = useState<string | null>(null);
const [querySubmitSuccess, setQuerySubmitSuccess] = useState<boolean | null>(
null,
);
const router = useRouter();

const handleFormSubmit = async (
Expand All @@ -43,9 +46,14 @@ export default function Page() {
method: "POST",
body: params,
});
const _data = await response.json();
if (!response.ok) {
setQuerySubmitSuccess(false);
const error_message = await response.text();
throw new Error(error_message);
}
}

setQuerySubmitSuccess(true);
await new Promise((f) => setTimeout(f, 1000));

// Redirect to /query/view/<newQueryId>
Expand All @@ -67,7 +75,12 @@ export default function Page() {

return (
<>
{queryId && <QueryStartedAlert queryId={queryId} />}
{queryId && querySubmitSuccess === true && (
<QueryStartedAlert queryId={queryId} />
)}
{queryId && querySubmitSuccess === false && (
<QueryFailedToStartAlert queryId={queryId} />
)}
<div className="md:flex md:items-start md:justify-between">
<DemoLogsForm handleDemoLogsFormSubmit={handleDemoLogsFormSubmit} />
<IPAForm handleIPAFormSubmit={handleIPAFormSubmit} />
Expand Down
2 changes: 1 addition & 1 deletion server/app/query/layout.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export default function RootLayout({
children: React.ReactNode;
}) {
return (
<div className="mx-auto min-h-full max-w-7xl bg-slate-100 py-10 sm:px-6 lg:px-8 dark:bg-slate-900">
<div className="mx-auto min-h-full max-w-7xl bg-slate-100 py-20 sm:px-6 lg:px-8 dark:bg-slate-900">
{children}
</div>
);
Expand Down
22 changes: 0 additions & 22 deletions sidecar/app/logger.py

This file was deleted.

2 changes: 2 additions & 0 deletions sidecar/app/main.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from .query.base import QueryManager
from .routes import start, stop, websockets

app = FastAPI()
app.state.QUERY_MANAGER = QueryManager(max_parallel_queries=1)
app.include_router(websockets.router)
app.include_router(start.router)
app.include_router(stop.router)
Expand Down
125 changes: 79 additions & 46 deletions sidecar/app/query/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,54 +8,69 @@
import loguru

from ..helpers import Role
from ..logger import logger
from ..settings import settings
from ..settings import get_settings
from .status import Status, StatusHistory
from .step import Step

# Dictionary to store queries
queries: dict[str, "Query"] = {}


class QueryExistsError(Exception):
pass


def status_file_path(query_id: str) -> Path:
settings = get_settings()
return settings.status_dir_path / Path(query_id)


def log_file_path(query_id: str) -> Path:
settings = get_settings()
return settings.log_dir_path / Path(query_id)


@dataclass
class Query:
"""
Query is the base class, used to implement a list of steps to be run by this server.
The server has a role, obtained from get_settings().
Steps implement a `build_from_query` method,
which allows them to utilize data stored on the query.
"""

# pylint: disable=too-many-instance-attributes
query_id: str
current_step: Optional[Step] = field(init=False, default=None, repr=True)
logger: loguru.Logger = field(init=False, repr=False)
_logger_id: int = field(init=False, repr=False)
logger: loguru.Logger = field(init=False, repr=False, compare=False)
_logger_id: int = field(init=False, repr=False, compare=False)
role: Role = field(init=False, repr=True)
_status_history: StatusHistory = field(init=False, repr=True)
step_classes: ClassVar[list[type[Step]]] = []

def __post_init__(self):
self.logger = logger.bind(task=self.query_id)
status_dir = settings.root_path / Path("status")
status_dir.mkdir(exist_ok=True)
status_file_path = status_dir / Path(f"{self.query_id}")
self._status_history = StatusHistory(file_path=status_file_path, logger=logger)

self._log_dir.mkdir(exist_ok=True)
self._logger_id = logger.add(
settings = get_settings()

self.logger = settings.logger.bind(task=self.query_id)
self.role = settings.role

self._status_history = StatusHistory(
file_path=self.status_file_path, logger=self.logger
)

self._logger_id = self.logger.add(
self.log_file_path,
serialize=True,
filter=lambda record: record["extra"].get("task") == self.query_id,
enqueue=True,
)
self.logger.debug(f"adding new Query {self}.")
if queries.get(self.query_id) is not None:
raise QueryExistsError(f"{self.query_id} already exists")
queries[self.query_id] = self

@property
def _log_dir(self) -> Path:
return settings.root_path / Path("logs")
def status_file_path(self) -> Path:
return status_file_path(self.query_id)

@property
def role(self) -> Role:
return settings.role
def log_file_path(self) -> Path:
return log_file_path(self.query_id)

@property
def started(self) -> bool:
Expand All @@ -65,23 +80,6 @@ def started(self) -> bool:
def finished(self) -> bool:
return self.status >= Status.COMPLETE

@classmethod
def get_from_query_id(cls, query_id) -> Optional["Query"]:
query = queries.get(query_id)
if query:
return query
try:
query = cls(query_id)
except QueryExistsError as e:
# avoid race condition on queries
query = queries.get(query_id)
if query:
return query
raise e
if query.status == Status.UNKNOWN:
return None
return query

@property
def status(self) -> Status:
return self._status_history.current_status
Expand All @@ -99,10 +97,6 @@ def status_event_json(self):
def running(self):
return self.started and not self.finished

@property
def log_file_path(self) -> Path:
return self._log_dir / Path(f"{self.query_id}.log")

@property
def steps(self) -> Iterable[Step]:
for step_class in self.step_classes:
Expand Down Expand Up @@ -154,11 +148,9 @@ def crash(self):
def _cleanup(self):
self.current_step = None
try:
logger.remove(self._logger_id)
self.logger.remove(self._logger_id)
except ValueError:
pass
if queries.get(self.query_id) is not None:
del queries[self.query_id]

@property
def cpu_usage_percent(self) -> float:
Expand All @@ -174,3 +166,44 @@ def memory_rss_usage(self) -> int:


QueryTypeT = TypeVar("QueryTypeT", bound=Query)


class MaxQueriesRunningError(Exception):
pass


@dataclass
class QueryManager:
"""
The QueryManager allows for a fixed number of queries to run at once,
and stores those queries in a dictionary.
Accessing running queries allows the finish and kill methods to be called
from another caller (typically a route handler in the HTTP layer.
"""

max_parallel_queries: int = field(init=True, repr=False, default=1)
running_queries: dict[str, Query] = field(
init=False, repr=True, default_factory=dict
)

def get_from_query_id(self, cls, query_id: str) -> Optional[Query]:
if query_id in self.running_queries:
return self.running_queries[query_id]
if status_file_path(query_id).exists():
return cls(query_id)
return None

def run_query(self, query: Query):
if not self.capacity_available:
raise MaxQueriesRunningError(
f"Only {self.max_parallel_queries} allowed. Currently running {self}"
)

self.running_queries[query.query_id] = query
query.start()
del self.running_queries[query.query_id]

@property
def capacity_available(self):
return len(self.running_queries) < self.max_parallel_queries
5 changes: 4 additions & 1 deletion sidecar/app/query/ipa.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from ..helpers import Role
from ..local_paths import Paths
from ..settings import settings
from ..settings import get_settings
from .base import Query
from .command import FileOutputCommand, LoggerOutputCommand
from .step import CommandStep, LoggerOutputCommandStep, Status, Step
Expand All @@ -30,6 +30,7 @@ class IPAQuery(Query):

def send_kill_signals(self):
self.logger.info("sending kill signals")
settings = get_settings()
for helper in settings.helpers.values():
if helper.role == self.role:
continue
Expand Down Expand Up @@ -241,6 +242,7 @@ def build_from_query(cls, query: IPAQuery):
)

def run(self):
settings = get_settings()
sidecar_urls = [
helper.sidecar_url
for helper in settings.helpers.values()
Expand Down Expand Up @@ -339,6 +341,7 @@ class IPACoordinatorQuery(IPAQuery):

def send_terminate_signals(self):
self.logger.info("sending terminate signals")
settings = get_settings()
for helper in settings.helpers.values():
if helper.role == self.role:
continue
Expand Down
6 changes: 4 additions & 2 deletions sidecar/app/query/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dataclasses import dataclass, field
from enum import IntEnum, auto
from pathlib import Path
from typing import NamedTuple
from typing import NamedTuple, Optional

import loguru

Expand Down Expand Up @@ -51,7 +51,9 @@ def locking_status(self):
"""Cannot add to history after this or higher status is reached"""
return Status.COMPLETE

def add(self, status: Status, timestamp: float = time.time()):
def add(self, status: Status, timestamp: Optional[float] = None):
if timestamp is None:
timestamp = time.time()
assert status > self.current_status
assert self.current_status < self.locking_status
self._status_history.append(
Expand Down
Loading

0 comments on commit 7490840

Please sign in to comment.