From a35f8112fe52c2052aa253e2c0ea3555140dcff3 Mon Sep 17 00:00:00 2001 From: Sanniti Date: Mon, 29 Apr 2024 17:39:32 -0400 Subject: [PATCH 1/2] return error for active run, add tests, improved docstrings --- .../runs/router/commands_router.py | 37 ++++++++++++++---- .../robot_server/runs/run_data_manager.py | 13 ++++++- robot-server/robot_server/runs/run_store.py | 1 - .../http_api/runs/test_persistence.py | 9 +++++ ...t_run_queued_protocol_commands.tavern.yaml | 16 ++++++++ .../tests/integration/robot_client.py | 8 ++++ .../tests/runs/test_run_data_manager.py | 38 ++++++++++++++++++- robot-server/tests/runs/test_run_store.py | 28 ++++++++++++++ 8 files changed, 139 insertions(+), 11 deletions(-) diff --git a/robot-server/robot_server/runs/router/commands_router.py b/robot-server/robot_server/runs/router/commands_router.py index 9a7ea6954193..15d2be8f0379 100644 --- a/robot-server/robot_server/runs/router/commands_router.py +++ b/robot-server/robot_server/runs/router/commands_router.py @@ -27,7 +27,7 @@ from robot_server.robot.control.dependencies import require_estop_in_good_state from ..run_models import RunCommandSummary -from ..run_data_manager import RunDataManager +from ..run_data_manager import RunDataManager, PreSerializedCommandsNotAvailableError from ..engine_store import EngineStore from ..run_store import RunStore, CommandNotFoundError from ..run_models import RunNotFoundError @@ -72,6 +72,18 @@ class CommandNotAllowed(ErrorDetails): title: str = "Command Not Allowed" +class PreSerializedCommandsNotAvailable(ErrorDetails): + """An error if one tries to fetch pre-serialized commands before they are written to the database.""" + + id: Literal[ + "PreSerializedCommandsNotAvailable" + ] = "PreSerializedCommandsNotAvailable" + title: str = "Pre-Serialized commands not available." + detail: str = ( + "Pre-serialized commands are only available once a run has finished running." + ) + + class CommandLinkMeta(BaseModel): """Metadata about a command resource referenced in `links`.""" @@ -356,24 +368,30 @@ async def get_run_commands( @PydanticResponse.wrap_route( commands_router.get, path="/runs/{runId}/commandsAsPreSerializedList", - summary="Get all commands as a list of pre-serialized commands", + summary="Get all commands of a completed run as a list of pre-serialized commands", description=( - "Get all commands of a run as a list of pre-serialized commands." + "Get all commands of a completed run as a list of pre-serialized commands." "**Warning:** This endpoint is experimental. We may change or remove it without warning." "\n\n" - " This is a faster alternative to fetching *all* commands using `GET /runs/{runId}/commands`" - " For large protocols (10k+ commands), the above endpoint can take minutes to respond," - " whereas this one should only take a few seconds." + "The commands list will only be available after a run has completed" + " (whether successful, failed or stopped) and its data has been committed to the database." + " If a request is received before the run is completed, it will return a 503 Unavailable error." + " This is a faster alternative to fetching the full commands list using" + " `GET /runs/{runId}/commands`. For large protocols (10k+ commands), the above" + " endpoint can take minutes to respond, whereas this one should only take a few seconds." ), responses={ status.HTTP_404_NOT_FOUND: {"model": ErrorBody[RunNotFound]}, + status.HTTP_503_SERVICE_UNAVAILABLE: { + "model": ErrorBody[PreSerializedCommandsNotAvailable] + }, }, ) async def get_run_commands_as_pre_serialized_list( runId: str, run_data_manager: RunDataManager = Depends(get_run_data_manager), ) -> PydanticResponse[SimpleMultiBody[str]]: - """Get all commands in a run as a list of pre-serialized (string encoded) commands. + """Get all commands of a completed run as a list of pre-serialized (string encoded) commands. Arguments: runId: Requested run ID, from the URL @@ -383,7 +401,10 @@ async def get_run_commands_as_pre_serialized_list( commands = run_data_manager.get_all_commands_as_preserialized_list(runId) except RunNotFoundError as e: raise RunNotFound.from_exc(e).as_error(status.HTTP_404_NOT_FOUND) from e - + except PreSerializedCommandsNotAvailableError as e: + raise PreSerializedCommandsNotAvailable.from_exc(e).as_error( + status.HTTP_503_SERVICE_UNAVAILABLE + ) from e return await PydanticResponse.create( content=SimpleMultiBody.construct( data=commands, meta=MultiBodyMeta(cursor=0, totalLength=len(commands)) diff --git a/robot-server/robot_server/runs/run_data_manager.py b/robot-server/robot_server/runs/run_data_manager.py index c5024270b162..7631046b581a 100644 --- a/robot-server/robot_server/runs/run_data_manager.py +++ b/robot-server/robot_server/runs/run_data_manager.py @@ -112,6 +112,10 @@ class RunNotCurrentError(ValueError): """Error raised when a requested run is not the current run.""" +class PreSerializedCommandsNotAvailableError(LookupError): + """Error raised when a run's commands are not available as pre-serialized list of commands.""" + + class RunDataManager: """Collaborator to manage current and historical run data. @@ -388,7 +392,14 @@ def get_command(self, run_id: str, command_id: str) -> Command: return self._run_store.get_command(run_id=run_id, command_id=command_id) def get_all_commands_as_preserialized_list(self, run_id: str) -> List[str]: - """Get all commands of a run in a stringified json doc.""" + """Get all commands of a run in a serialized json list.""" + if ( + run_id == self._engine_store.current_run_id + and not self._engine_store.engine.state_view.commands.get_is_terminal() + ): + raise PreSerializedCommandsNotAvailableError( + "Pre-serialized commands are only available after a run has ended." + ) return self._run_store.get_all_commands_as_preserialized_list(run_id) def _get_state_summary(self, run_id: str) -> Union[StateSummary, BadStateSummary]: diff --git a/robot-server/robot_server/runs/run_store.py b/robot-server/robot_server/runs/run_store.py index be69c560138f..6cf86d14af1f 100644 --- a/robot-server/robot_server/runs/run_store.py +++ b/robot-server/robot_server/runs/run_store.py @@ -428,7 +428,6 @@ def get_commands_slice( ) .order_by(run_command_table.c.index_in_run) ) - # select_all_cmds_as_doc = sqlalchemy.select(sqlalchemy.func.json_array_elements) slice_result = transaction.execute(select_slice).all() sliced_commands: List[Command] = [ diff --git a/robot-server/tests/integration/http_api/runs/test_persistence.py b/robot-server/tests/integration/http_api/runs/test_persistence.py index 45b55202fdaa..943f644e8d39 100644 --- a/robot-server/tests/integration/http_api/runs/test_persistence.py +++ b/robot-server/tests/integration/http_api/runs/test_persistence.py @@ -1,3 +1,4 @@ +import json from copy import deepcopy from datetime import datetime from typing import Any, AsyncGenerator, Dict, NamedTuple, cast @@ -250,6 +251,9 @@ async def test_run_commands_persist(client_and_server: ClientServerFixture) -> N get_persisted_command_response = await client.get_run_command( run_id=run_id, command_id=command_id ) + get_preserialized_commands_response = await client.get_preserialized_commands( + run_id=run_id + ) # ensure the persisted commands still match the original ones assert get_all_persisted_commands_response.json()["data"] == [ @@ -259,6 +263,11 @@ async def test_run_commands_persist(client_and_server: ClientServerFixture) -> N ] assert get_persisted_command_response.json()["data"] == expected_command + json_converted_command = json.loads( + get_preserialized_commands_response.json()["data"][0] + ) + assert json_converted_command == expected_command + async def test_runs_completed_started_at_persist_via_actions_router( client_and_server: ClientServerFixture, diff --git a/robot-server/tests/integration/http_api/runs/test_run_queued_protocol_commands.tavern.yaml b/robot-server/tests/integration/http_api/runs/test_run_queued_protocol_commands.tavern.yaml index 0d4a00102810..9d188402debc 100644 --- a/robot-server/tests/integration/http_api/runs/test_run_queued_protocol_commands.tavern.yaml +++ b/robot-server/tests/integration/http_api/runs/test_run_queued_protocol_commands.tavern.yaml @@ -198,3 +198,19 @@ stages: createdAt: !re_search "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d+\\+\\d{2}:\\d{2}$" startedAt: !re_search "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d+\\+\\d{2}:\\d{2}$" completedAt: !re_search "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d+\\+\\d{2}:\\d{2}$" + + - name: Get all the commands in the run as a pre-serialized list + request: + url: '{ot2_server_base_url}/runs/{run_id}/commandsAsPreSerializedList' + method: GET + response: + status_code: 200 + json: + data: + - !anystr + - !anystr + - !anystr + - !anystr + meta: + cursor: 0 + totalLength: 4 \ No newline at end of file diff --git a/robot-server/tests/integration/robot_client.py b/robot-server/tests/integration/robot_client.py index c4511f8d3155..9af11d50cdb0 100644 --- a/robot-server/tests/integration/robot_client.py +++ b/robot-server/tests/integration/robot_client.py @@ -220,6 +220,14 @@ async def get_run_command(self, run_id: str, command_id: str) -> Response: response.raise_for_status() return response + async def get_preserialized_commands(self, run_id: str) -> Response: + """GET /runs/:run_id/commandsAsPreSerializedList.""" + response = await self.httpx_client.get( + url=f"{self.base_url}/runs/{run_id}/commandsAsPreSerializedList", + ) + response.raise_for_status() + return response + async def post_labware_offset( self, run_id: str, diff --git a/robot-server/tests/runs/test_run_data_manager.py b/robot-server/tests/runs/test_run_data_manager.py index 547ec0a7b747..dff1f28c1932 100644 --- a/robot-server/tests/runs/test_run_data_manager.py +++ b/robot-server/tests/runs/test_run_data_manager.py @@ -23,7 +23,11 @@ from robot_server.protocols.protocol_store import ProtocolResource from robot_server.runs.engine_store import EngineStore, EngineConflictError -from robot_server.runs.run_data_manager import RunDataManager, RunNotCurrentError +from robot_server.runs.run_data_manager import ( + RunDataManager, + RunNotCurrentError, + PreSerializedCommandsNotAvailableError, +) from robot_server.runs.run_models import Run, BadRun, RunNotFoundError, RunDataError from robot_server.runs.run_store import ( RunStore, @@ -932,6 +936,38 @@ def test_get_command_from_db_command_not_found( subject.get_command("run-id", "command-id") +def test_get_all_commands_as_preserialized_list( + decoy: Decoy, + subject: RunDataManager, + mock_run_store: RunStore, + mock_engine_store: EngineStore, +) -> None: + """It should return the pre-serialized commands list.""" + decoy.when(mock_engine_store.current_run_id).then_return(None) + decoy.when( + mock_run_store.get_all_commands_as_preserialized_list("run-id") + ).then_return(['{"id": command-1}', '{"id": command-2}']) + assert subject.get_all_commands_as_preserialized_list("run-id") == [ + '{"id": command-1}', + '{"id": command-2}', + ] + + +def test_get_all_commands_as_preserialized_list_errors_for_active_runs( + decoy: Decoy, + subject: RunDataManager, + mock_run_store: RunStore, + mock_engine_store: EngineStore, +) -> None: + """It should raise an error when fetching pre-serialized commands list while run is active.""" + decoy.when(mock_engine_store.current_run_id).then_return("current-run-id") + decoy.when( + mock_engine_store.engine.state_view.commands.get_is_terminal() + ).then_return(False) + with pytest.raises(PreSerializedCommandsNotAvailableError): + subject.get_all_commands_as_preserialized_list("current-run-id") + + async def test_get_current_run_labware_definition( decoy: Decoy, mock_engine_store: EngineStore, diff --git a/robot-server/tests/runs/test_run_store.py b/robot-server/tests/runs/test_run_store.py index c6108cf54076..ee7697107f67 100644 --- a/robot-server/tests/runs/test_run_store.py +++ b/robot-server/tests/runs/test_run_store.py @@ -734,3 +734,31 @@ def test_get_commands_slice_run_not_found(subject: RunStore) -> None: ) with pytest.raises(RunNotFoundError): subject.get_commands_slice(run_id="not-run-id", cursor=1, length=3) + + +def test_get_all_commands_as_preserialized_list( + subject: RunStore, + protocol_commands: List[pe_commands.Command], + state_summary: StateSummary, +) -> None: + """It should get all commands stored in DB as a pre-serialized list.""" + subject.insert( + run_id="run-id", + protocol_id=None, + created_at=datetime(year=2021, month=1, day=1, tzinfo=timezone.utc), + ) + subject.update_run_state( + run_id="run-id", + summary=state_summary, + commands=protocol_commands, + run_time_parameters=[], + ) + result = subject.get_all_commands_as_preserialized_list(run_id="run-id") + assert result == [ + '{"id": "pause-1", "createdAt": "2021-01-01T00:00:00", "commandType": "waitForResume",' + ' "key": "command-key", "status": "succeeded", "params": {"message": "hello world"}, "result": {}}', + '{"id": "pause-2", "createdAt": "2022-02-02T00:00:00", "commandType": "waitForResume",' + ' "key": "command-key", "status": "succeeded", "params": {"message": "hey world"}, "result": {}}', + '{"id": "pause-3", "createdAt": "2023-03-03T00:00:00", "commandType": "waitForResume",' + ' "key": "command-key", "status": "succeeded", "params": {"message": "sup world"}, "result": {}}', + ] From 6a59a883a8464dd4ce64d931ca23cfe4c6569608 Mon Sep 17 00:00:00 2001 From: Sanniti Date: Tue, 30 Apr 2024 15:43:51 -0400 Subject: [PATCH 2/2] added notification for pre-serialized commands list availability --- .../robot_server/runs/router/actions_router.py | 3 +++ robot-server/robot_server/runs/run_controller.py | 9 ++++++++- robot-server/robot_server/runs/run_data_manager.py | 11 ++++++++++- .../notifications/publishers/runs_publisher.py | 13 +++++++++++++ .../robot_server/service/notifications/topics.py | 1 + robot-server/tests/runs/test_run_controller.py | 11 +++++++++++ robot-server/tests/runs/test_run_data_manager.py | 7 +++++++ 7 files changed, 53 insertions(+), 2 deletions(-) diff --git a/robot-server/robot_server/runs/router/actions_router.py b/robot-server/robot_server/runs/router/actions_router.py index b662d59f5541..25aae8cfd193 100644 --- a/robot-server/robot_server/runs/router/actions_router.py +++ b/robot-server/robot_server/runs/router/actions_router.py @@ -28,6 +28,7 @@ MaintenanceEngineStore, ) from robot_server.maintenance_runs.dependencies import get_maintenance_engine_store +from robot_server.service.notifications import get_runs_publisher, RunsPublisher log = logging.getLogger(__name__) actions_router = APIRouter() @@ -45,6 +46,7 @@ async def get_run_controller( task_runner: TaskRunner = Depends(get_task_runner), engine_store: EngineStore = Depends(get_engine_store), run_store: RunStore = Depends(get_run_store), + runs_publisher: RunsPublisher = Depends(get_runs_publisher), ) -> RunController: """Get a RunController for the current run. @@ -67,6 +69,7 @@ async def get_run_controller( task_runner=task_runner, engine_store=engine_store, run_store=run_store, + runs_publisher=runs_publisher, ) diff --git a/robot-server/robot_server/runs/run_controller.py b/robot-server/robot_server/runs/run_controller.py index 923c9cfa64ef..e7e55080aed7 100644 --- a/robot-server/robot_server/runs/run_controller.py +++ b/robot-server/robot_server/runs/run_controller.py @@ -13,6 +13,8 @@ from opentrons.protocol_engine.types import DeckConfigurationType +from robot_server.service.notifications import RunsPublisher + log = logging.getLogger(__name__) @@ -21,7 +23,7 @@ class RunActionNotAllowedError(RoboticsInteractionError): class RunController: - """An interface to manage the side-effects of requested run actions.""" + """An interface to manage the side effects of requested run actions.""" def __init__( self, @@ -29,11 +31,13 @@ def __init__( task_runner: TaskRunner, engine_store: EngineStore, run_store: RunStore, + runs_publisher: RunsPublisher, ) -> None: self._run_id = run_id self._task_runner = task_runner self._engine_store = engine_store self._run_store = run_store + self._runs_publisher = runs_publisher def create_action( self, @@ -108,3 +112,6 @@ async def _run_protocol_and_insert_result( commands=result.commands, run_time_parameters=result.parameters, ) + await self._runs_publisher.publish_pre_serialized_commands_notification( + self._run_id + ) diff --git a/robot-server/robot_server/runs/run_data_manager.py b/robot-server/robot_server/runs/run_data_manager.py index 7631046b581a..311cfb93b404 100644 --- a/robot-server/robot_server/runs/run_data_manager.py +++ b/robot-server/robot_server/runs/run_data_manager.py @@ -294,10 +294,16 @@ async def delete(self, run_id: str) -> None: self._run_store.remove(run_id=run_id) async def update(self, run_id: str, current: Optional[bool]) -> Union[Run, BadRun]: - """Get and potentially archive a run. + """Get and potentially archive the current run. Args: run_id: The run to get and maybe archive. + current: Whether to mark the run as current or not. + If `current` set to False, then the run is 'un-current'ed by + stopping the run, saving the final run data to the run store, + and clearing the engine and runner. + If 'current' is True or not specified, we simply fetch the run's + data from memory and database. Returns: The updated run. @@ -324,6 +330,9 @@ async def update(self, run_id: str, current: Optional[bool]) -> Union[Run, BadRu commands=commands, run_time_parameters=parameters, ) + await self._runs_publisher.publish_pre_serialized_commands_notification( + run_id + ) else: state_summary = self._engine_store.engine.state_view.get_summary() parameters = self._engine_store.runner.run_time_parameters diff --git a/robot-server/robot_server/service/notifications/publishers/runs_publisher.py b/robot-server/robot_server/service/notifications/publishers/runs_publisher.py index fef23c8a8758..a296fd7bbbc7 100644 --- a/robot-server/robot_server/service/notifications/publishers/runs_publisher.py +++ b/robot-server/robot_server/service/notifications/publishers/runs_publisher.py @@ -72,6 +72,12 @@ async def initialize( self._engine_state_slice = EngineStateSlice() await self._publish_runs_advise_refetch_async(run_id=run_id) + engine_state_summary = self._run_hooks.get_state_summary(self._run_hooks.run_id) + if ( + engine_state_summary is not None + and engine_state_summary.completedAt is not None + ): + await self.publish_pre_serialized_commands_notification(run_id=run_id) async def clean_up_run(self, run_id: str) -> None: """Publish final refetch and unsubscribe flags for the given run.""" @@ -99,6 +105,13 @@ async def _publish_runs_advise_unsubscribe_async(self, run_id: str) -> None: topic=f"{Topics.RUNS}/{run_id}" ) + async def publish_pre_serialized_commands_notification(self, run_id: str) -> None: + """Publishes notification for GET /runs/:runId/commandsAsPreSerializedList.""" + if self._run_hooks is not None: + await self._client.publish_advise_refetch_async( + topic=f"{Topics.RUNS_PRE_SERIALIZED_COMMANDS}/{run_id}" + ) + async def _handle_current_command_change(self) -> None: """Publish a refetch flag if the current command has changed.""" if self._run_hooks is not None and self._engine_state_slice is not None: diff --git a/robot-server/robot_server/service/notifications/topics.py b/robot-server/robot_server/service/notifications/topics.py index 34f2fd0eea14..65c5aedb66fe 100644 --- a/robot-server/robot_server/service/notifications/topics.py +++ b/robot-server/robot_server/service/notifications/topics.py @@ -14,3 +14,4 @@ class Topics(str, Enum): MAINTENANCE_RUNS_CURRENT_RUN = f"{_TOPIC_BASE}/maintenance_runs/current_run" RUNS_CURRENT_COMMAND = f"{_TOPIC_BASE}/runs/current_command" RUNS = f"{_TOPIC_BASE}/runs" + RUNS_PRE_SERIALIZED_COMMANDS = f"{_TOPIC_BASE}/runs/pre_serialized_commands" diff --git a/robot-server/tests/runs/test_run_controller.py b/robot-server/tests/runs/test_run_controller.py index a844cdcc6d57..71fc92f84660 100644 --- a/robot-server/tests/runs/test_run_controller.py +++ b/robot-server/tests/runs/test_run_controller.py @@ -14,6 +14,7 @@ from opentrons.protocol_engine.types import RunTimeParameter, BooleanParameter from opentrons.protocol_runner import RunResult, JsonRunner, PythonAndLegacyRunner +from robot_server.service.notifications import RunsPublisher from robot_server.service.task_runner import TaskRunner from robot_server.runs.action_models import RunAction, RunActionType from robot_server.runs.engine_store import EngineStore @@ -41,6 +42,12 @@ def mock_task_runner(decoy: Decoy) -> TaskRunner: return decoy.mock(cls=TaskRunner) +@pytest.fixture() +def mock_runs_publisher(decoy: Decoy) -> RunsPublisher: + """Get a mock RunsPublisher.""" + return decoy.mock(cls=RunsPublisher) + + @pytest.fixture def run_id() -> str: """A run identifier value.""" @@ -90,6 +97,7 @@ def subject( mock_engine_store: EngineStore, mock_run_store: RunStore, mock_task_runner: TaskRunner, + mock_runs_publisher: RunsPublisher, ) -> RunController: """Get a RunController test subject.""" return RunController( @@ -97,6 +105,7 @@ def subject( engine_store=mock_engine_store, run_store=mock_run_store, task_runner=mock_task_runner, + runs_publisher=mock_runs_publisher, ) @@ -135,6 +144,7 @@ async def test_create_play_action_to_start( mock_engine_store: EngineStore, mock_run_store: RunStore, mock_task_runner: TaskRunner, + mock_runs_publisher: RunsPublisher, engine_state_summary: StateSummary, run_time_parameters: List[RunTimeParameter], protocol_commands: List[pe_commands.Command], @@ -181,6 +191,7 @@ async def test_create_play_action_to_start( commands=protocol_commands, run_time_parameters=run_time_parameters, ), + await mock_runs_publisher.publish_pre_serialized_commands_notification(run_id), times=1, ) diff --git a/robot-server/tests/runs/test_run_data_manager.py b/robot-server/tests/runs/test_run_data_manager.py index dff1f28c1932..12ced28fdb0b 100644 --- a/robot-server/tests/runs/test_run_data_manager.py +++ b/robot-server/tests/runs/test_run_data_manager.py @@ -587,6 +587,7 @@ async def test_update_current( run_command: commands.Command, mock_engine_store: EngineStore, mock_run_store: RunStore, + mock_runs_publisher: RunsPublisher, subject: RunDataManager, ) -> None: """It should persist the current run and clear the engine on current=false.""" @@ -611,6 +612,10 @@ async def test_update_current( result = await subject.update(run_id=run_id, current=False) + decoy.verify( + await mock_runs_publisher.publish_pre_serialized_commands_notification(run_id), + times=1, + ) assert result == Run( current=False, id=run_resource.run_id, @@ -637,6 +642,7 @@ async def test_update_current_noop( run_command: commands.Command, mock_engine_store: EngineStore, mock_run_store: RunStore, + mock_runs_publisher: RunsPublisher, subject: RunDataManager, current: Optional[bool], ) -> None: @@ -661,6 +667,7 @@ async def test_update_current_noop( commands=matchers.Anything(), run_time_parameters=matchers.Anything(), ), + await mock_runs_publisher.publish_pre_serialized_commands_notification(run_id), times=0, )