Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/robot-server-add_new_fast_fetchi…
Browse files Browse the repository at this point in the history
…ng_run_cmds_endpoint' into robot-server-add_new_fast_fetching_run_cmds_endpoint
  • Loading branch information
ncdiehl committed Apr 30, 2024
2 parents dba9374 + 6a59a88 commit e8f2968
Show file tree
Hide file tree
Showing 13 changed files with 192 additions and 13 deletions.
3 changes: 3 additions & 0 deletions robot-server/robot_server/runs/router/actions_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
MaintenanceEngineStore,
)
from robot_server.maintenance_runs.dependencies import get_maintenance_engine_store
from robot_server.service.notifications import get_runs_publisher, RunsPublisher

log = logging.getLogger(__name__)
actions_router = APIRouter()
Expand All @@ -45,6 +46,7 @@ async def get_run_controller(
task_runner: TaskRunner = Depends(get_task_runner),
engine_store: EngineStore = Depends(get_engine_store),
run_store: RunStore = Depends(get_run_store),
runs_publisher: RunsPublisher = Depends(get_runs_publisher),
) -> RunController:
"""Get a RunController for the current run.
Expand All @@ -67,6 +69,7 @@ async def get_run_controller(
task_runner=task_runner,
engine_store=engine_store,
run_store=run_store,
runs_publisher=runs_publisher,
)


Expand Down
37 changes: 29 additions & 8 deletions robot-server/robot_server/runs/router/commands_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from robot_server.robot.control.dependencies import require_estop_in_good_state

from ..run_models import RunCommandSummary
from ..run_data_manager import RunDataManager
from ..run_data_manager import RunDataManager, PreSerializedCommandsNotAvailableError
from ..engine_store import EngineStore
from ..run_store import RunStore, CommandNotFoundError
from ..run_models import RunNotFoundError
Expand Down Expand Up @@ -72,6 +72,18 @@ class CommandNotAllowed(ErrorDetails):
title: str = "Command Not Allowed"


class PreSerializedCommandsNotAvailable(ErrorDetails):
"""An error if one tries to fetch pre-serialized commands before they are written to the database."""

id: Literal[
"PreSerializedCommandsNotAvailable"
] = "PreSerializedCommandsNotAvailable"
title: str = "Pre-Serialized commands not available."
detail: str = (
"Pre-serialized commands are only available once a run has finished running."
)


class CommandLinkMeta(BaseModel):
"""Metadata about a command resource referenced in `links`."""

Expand Down Expand Up @@ -356,24 +368,30 @@ async def get_run_commands(
@PydanticResponse.wrap_route(
commands_router.get,
path="/runs/{runId}/commandsAsPreSerializedList",
summary="Get all commands as a list of pre-serialized commands",
summary="Get all commands of a completed run as a list of pre-serialized commands",
description=(
"Get all commands of a run as a list of pre-serialized commands."
"Get all commands of a completed run as a list of pre-serialized commands."
"**Warning:** This endpoint is experimental. We may change or remove it without warning."
"\n\n"
" This is a faster alternative to fetching *all* commands using `GET /runs/{runId}/commands`"
" For large protocols (10k+ commands), the above endpoint can take minutes to respond,"
" whereas this one should only take a few seconds."
"The commands list will only be available after a run has completed"
" (whether successful, failed or stopped) and its data has been committed to the database."
" If a request is received before the run is completed, it will return a 503 Unavailable error."
" This is a faster alternative to fetching the full commands list using"
" `GET /runs/{runId}/commands`. For large protocols (10k+ commands), the above"
" endpoint can take minutes to respond, whereas this one should only take a few seconds."
),
responses={
status.HTTP_404_NOT_FOUND: {"model": ErrorBody[RunNotFound]},
status.HTTP_503_SERVICE_UNAVAILABLE: {
"model": ErrorBody[PreSerializedCommandsNotAvailable]
},
},
)
async def get_run_commands_as_pre_serialized_list(
runId: str,
run_data_manager: RunDataManager = Depends(get_run_data_manager),
) -> PydanticResponse[SimpleMultiBody[str]]:
"""Get all commands in a run as a list of pre-serialized (string encoded) commands.
"""Get all commands of a completed run as a list of pre-serialized (string encoded) commands.
Arguments:
runId: Requested run ID, from the URL
Expand All @@ -383,7 +401,10 @@ async def get_run_commands_as_pre_serialized_list(
commands = run_data_manager.get_all_commands_as_preserialized_list(runId)
except RunNotFoundError as e:
raise RunNotFound.from_exc(e).as_error(status.HTTP_404_NOT_FOUND) from e

except PreSerializedCommandsNotAvailableError as e:
raise PreSerializedCommandsNotAvailable.from_exc(e).as_error(
status.HTTP_503_SERVICE_UNAVAILABLE
) from e
return await PydanticResponse.create(
content=SimpleMultiBody.construct(
data=commands, meta=MultiBodyMeta(cursor=0, totalLength=len(commands))
Expand Down
9 changes: 8 additions & 1 deletion robot-server/robot_server/runs/run_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

from opentrons.protocol_engine.types import DeckConfigurationType

from robot_server.service.notifications import RunsPublisher

log = logging.getLogger(__name__)


Expand All @@ -21,19 +23,21 @@ class RunActionNotAllowedError(RoboticsInteractionError):


class RunController:
"""An interface to manage the side-effects of requested run actions."""
"""An interface to manage the side effects of requested run actions."""

def __init__(
self,
run_id: str,
task_runner: TaskRunner,
engine_store: EngineStore,
run_store: RunStore,
runs_publisher: RunsPublisher,
) -> None:
self._run_id = run_id
self._task_runner = task_runner
self._engine_store = engine_store
self._run_store = run_store
self._runs_publisher = runs_publisher

def create_action(
self,
Expand Down Expand Up @@ -108,3 +112,6 @@ async def _run_protocol_and_insert_result(
commands=result.commands,
run_time_parameters=result.parameters,
)
await self._runs_publisher.publish_pre_serialized_commands_notification(
self._run_id
)
24 changes: 22 additions & 2 deletions robot-server/robot_server/runs/run_data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ class RunNotCurrentError(ValueError):
"""Error raised when a requested run is not the current run."""


class PreSerializedCommandsNotAvailableError(LookupError):
"""Error raised when a run's commands are not available as pre-serialized list of commands."""


class RunDataManager:
"""Collaborator to manage current and historical run data.
Expand Down Expand Up @@ -290,10 +294,16 @@ async def delete(self, run_id: str) -> None:
self._run_store.remove(run_id=run_id)

async def update(self, run_id: str, current: Optional[bool]) -> Union[Run, BadRun]:
"""Get and potentially archive a run.
"""Get and potentially archive the current run.
Args:
run_id: The run to get and maybe archive.
current: Whether to mark the run as current or not.
If `current` set to False, then the run is 'un-current'ed by
stopping the run, saving the final run data to the run store,
and clearing the engine and runner.
If 'current' is True or not specified, we simply fetch the run's
data from memory and database.
Returns:
The updated run.
Expand All @@ -320,6 +330,9 @@ async def update(self, run_id: str, current: Optional[bool]) -> Union[Run, BadRu
commands=commands,
run_time_parameters=parameters,
)
await self._runs_publisher.publish_pre_serialized_commands_notification(
run_id
)
else:
state_summary = self._engine_store.engine.state_view.get_summary()
parameters = self._engine_store.runner.run_time_parameters
Expand Down Expand Up @@ -388,7 +401,14 @@ def get_command(self, run_id: str, command_id: str) -> Command:
return self._run_store.get_command(run_id=run_id, command_id=command_id)

def get_all_commands_as_preserialized_list(self, run_id: str) -> List[str]:
"""Get all commands of a run in a stringified json doc."""
"""Get all commands of a run in a serialized json list."""
if (
run_id == self._engine_store.current_run_id
and not self._engine_store.engine.state_view.commands.get_is_terminal()
):
raise PreSerializedCommandsNotAvailableError(
"Pre-serialized commands are only available after a run has ended."
)
return self._run_store.get_all_commands_as_preserialized_list(run_id)

def _get_state_summary(self, run_id: str) -> Union[StateSummary, BadStateSummary]:
Expand Down
1 change: 0 additions & 1 deletion robot-server/robot_server/runs/run_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,6 @@ def get_commands_slice(
)
.order_by(run_command_table.c.index_in_run)
)
# select_all_cmds_as_doc = sqlalchemy.select(sqlalchemy.func.json_array_elements)
slice_result = transaction.execute(select_slice).all()

sliced_commands: List[Command] = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ async def initialize(
self._engine_state_slice = EngineStateSlice()

await self._publish_runs_advise_refetch_async(run_id=run_id)
engine_state_summary = self._run_hooks.get_state_summary(self._run_hooks.run_id)
if (
engine_state_summary is not None
and engine_state_summary.completedAt is not None
):
await self.publish_pre_serialized_commands_notification(run_id=run_id)

async def clean_up_run(self, run_id: str) -> None:
"""Publish final refetch and unsubscribe flags for the given run."""
Expand Down Expand Up @@ -99,6 +105,13 @@ async def _publish_runs_advise_unsubscribe_async(self, run_id: str) -> None:
topic=f"{Topics.RUNS}/{run_id}"
)

async def publish_pre_serialized_commands_notification(self, run_id: str) -> None:
"""Publishes notification for GET /runs/:runId/commandsAsPreSerializedList."""
if self._run_hooks is not None:
await self._client.publish_advise_refetch_async(
topic=f"{Topics.RUNS_PRE_SERIALIZED_COMMANDS}/{run_id}"
)

async def _handle_current_command_change(self) -> None:
"""Publish a refetch flag if the current command has changed."""
if self._run_hooks is not None and self._engine_state_slice is not None:
Expand Down
1 change: 1 addition & 0 deletions robot-server/robot_server/service/notifications/topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ class Topics(str, Enum):
MAINTENANCE_RUNS_CURRENT_RUN = f"{_TOPIC_BASE}/maintenance_runs/current_run"
RUNS_CURRENT_COMMAND = f"{_TOPIC_BASE}/runs/current_command"
RUNS = f"{_TOPIC_BASE}/runs"
RUNS_PRE_SERIALIZED_COMMANDS = f"{_TOPIC_BASE}/runs/pre_serialized_commands"
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
from copy import deepcopy
from datetime import datetime
from typing import Any, AsyncGenerator, Dict, NamedTuple, cast
Expand Down Expand Up @@ -250,6 +251,9 @@ async def test_run_commands_persist(client_and_server: ClientServerFixture) -> N
get_persisted_command_response = await client.get_run_command(
run_id=run_id, command_id=command_id
)
get_preserialized_commands_response = await client.get_preserialized_commands(
run_id=run_id
)

# ensure the persisted commands still match the original ones
assert get_all_persisted_commands_response.json()["data"] == [
Expand All @@ -259,6 +263,11 @@ async def test_run_commands_persist(client_and_server: ClientServerFixture) -> N
]
assert get_persisted_command_response.json()["data"] == expected_command

json_converted_command = json.loads(
get_preserialized_commands_response.json()["data"][0]
)
assert json_converted_command == expected_command


async def test_runs_completed_started_at_persist_via_actions_router(
client_and_server: ClientServerFixture,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,3 +198,19 @@ stages:
createdAt: !re_search "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d+\\+\\d{2}:\\d{2}$"
startedAt: !re_search "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d+\\+\\d{2}:\\d{2}$"
completedAt: !re_search "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d+\\+\\d{2}:\\d{2}$"

- name: Get all the commands in the run as a pre-serialized list
request:
url: '{ot2_server_base_url}/runs/{run_id}/commandsAsPreSerializedList'
method: GET
response:
status_code: 200
json:
data:
- !anystr
- !anystr
- !anystr
- !anystr
meta:
cursor: 0
totalLength: 4
8 changes: 8 additions & 0 deletions robot-server/tests/integration/robot_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,14 @@ async def get_run_command(self, run_id: str, command_id: str) -> Response:
response.raise_for_status()
return response

async def get_preserialized_commands(self, run_id: str) -> Response:
"""GET /runs/:run_id/commandsAsPreSerializedList."""
response = await self.httpx_client.get(
url=f"{self.base_url}/runs/{run_id}/commandsAsPreSerializedList",
)
response.raise_for_status()
return response

async def post_labware_offset(
self,
run_id: str,
Expand Down
11 changes: 11 additions & 0 deletions robot-server/tests/runs/test_run_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from opentrons.protocol_engine.types import RunTimeParameter, BooleanParameter
from opentrons.protocol_runner import RunResult, JsonRunner, PythonAndLegacyRunner

from robot_server.service.notifications import RunsPublisher
from robot_server.service.task_runner import TaskRunner
from robot_server.runs.action_models import RunAction, RunActionType
from robot_server.runs.engine_store import EngineStore
Expand Down Expand Up @@ -41,6 +42,12 @@ def mock_task_runner(decoy: Decoy) -> TaskRunner:
return decoy.mock(cls=TaskRunner)


@pytest.fixture()
def mock_runs_publisher(decoy: Decoy) -> RunsPublisher:
"""Get a mock RunsPublisher."""
return decoy.mock(cls=RunsPublisher)


@pytest.fixture
def run_id() -> str:
"""A run identifier value."""
Expand Down Expand Up @@ -90,13 +97,15 @@ def subject(
mock_engine_store: EngineStore,
mock_run_store: RunStore,
mock_task_runner: TaskRunner,
mock_runs_publisher: RunsPublisher,
) -> RunController:
"""Get a RunController test subject."""
return RunController(
run_id=run_id,
engine_store=mock_engine_store,
run_store=mock_run_store,
task_runner=mock_task_runner,
runs_publisher=mock_runs_publisher,
)


Expand Down Expand Up @@ -135,6 +144,7 @@ async def test_create_play_action_to_start(
mock_engine_store: EngineStore,
mock_run_store: RunStore,
mock_task_runner: TaskRunner,
mock_runs_publisher: RunsPublisher,
engine_state_summary: StateSummary,
run_time_parameters: List[RunTimeParameter],
protocol_commands: List[pe_commands.Command],
Expand Down Expand Up @@ -181,6 +191,7 @@ async def test_create_play_action_to_start(
commands=protocol_commands,
run_time_parameters=run_time_parameters,
),
await mock_runs_publisher.publish_pre_serialized_commands_notification(run_id),
times=1,
)

Expand Down
Loading

0 comments on commit e8f2968

Please sign in to comment.