diff --git a/robot-server/robot_server/runs/router/actions_router.py b/robot-server/robot_server/runs/router/actions_router.py index b662d59f5541..25aae8cfd193 100644 --- a/robot-server/robot_server/runs/router/actions_router.py +++ b/robot-server/robot_server/runs/router/actions_router.py @@ -28,6 +28,7 @@ MaintenanceEngineStore, ) from robot_server.maintenance_runs.dependencies import get_maintenance_engine_store +from robot_server.service.notifications import get_runs_publisher, RunsPublisher log = logging.getLogger(__name__) actions_router = APIRouter() @@ -45,6 +46,7 @@ async def get_run_controller( task_runner: TaskRunner = Depends(get_task_runner), engine_store: EngineStore = Depends(get_engine_store), run_store: RunStore = Depends(get_run_store), + runs_publisher: RunsPublisher = Depends(get_runs_publisher), ) -> RunController: """Get a RunController for the current run. @@ -67,6 +69,7 @@ async def get_run_controller( task_runner=task_runner, engine_store=engine_store, run_store=run_store, + runs_publisher=runs_publisher, ) diff --git a/robot-server/robot_server/runs/run_controller.py b/robot-server/robot_server/runs/run_controller.py index 923c9cfa64ef..e7e55080aed7 100644 --- a/robot-server/robot_server/runs/run_controller.py +++ b/robot-server/robot_server/runs/run_controller.py @@ -13,6 +13,8 @@ from opentrons.protocol_engine.types import DeckConfigurationType +from robot_server.service.notifications import RunsPublisher + log = logging.getLogger(__name__) @@ -21,7 +23,7 @@ class RunActionNotAllowedError(RoboticsInteractionError): class RunController: - """An interface to manage the side-effects of requested run actions.""" + """An interface to manage the side effects of requested run actions.""" def __init__( self, @@ -29,11 +31,13 @@ def __init__( task_runner: TaskRunner, engine_store: EngineStore, run_store: RunStore, + runs_publisher: RunsPublisher, ) -> None: self._run_id = run_id self._task_runner = task_runner self._engine_store = engine_store self._run_store = run_store + self._runs_publisher = runs_publisher def create_action( self, @@ -108,3 +112,6 @@ async def _run_protocol_and_insert_result( commands=result.commands, run_time_parameters=result.parameters, ) + await self._runs_publisher.publish_pre_serialized_commands_notification( + self._run_id + ) diff --git a/robot-server/robot_server/runs/run_data_manager.py b/robot-server/robot_server/runs/run_data_manager.py index 7631046b581a..311cfb93b404 100644 --- a/robot-server/robot_server/runs/run_data_manager.py +++ b/robot-server/robot_server/runs/run_data_manager.py @@ -294,10 +294,16 @@ async def delete(self, run_id: str) -> None: self._run_store.remove(run_id=run_id) async def update(self, run_id: str, current: Optional[bool]) -> Union[Run, BadRun]: - """Get and potentially archive a run. + """Get and potentially archive the current run. Args: run_id: The run to get and maybe archive. + current: Whether to mark the run as current or not. + If `current` set to False, then the run is 'un-current'ed by + stopping the run, saving the final run data to the run store, + and clearing the engine and runner. + If 'current' is True or not specified, we simply fetch the run's + data from memory and database. Returns: The updated run. @@ -324,6 +330,9 @@ async def update(self, run_id: str, current: Optional[bool]) -> Union[Run, BadRu commands=commands, run_time_parameters=parameters, ) + await self._runs_publisher.publish_pre_serialized_commands_notification( + run_id + ) else: state_summary = self._engine_store.engine.state_view.get_summary() parameters = self._engine_store.runner.run_time_parameters diff --git a/robot-server/robot_server/service/notifications/publishers/runs_publisher.py b/robot-server/robot_server/service/notifications/publishers/runs_publisher.py index fef23c8a8758..a296fd7bbbc7 100644 --- a/robot-server/robot_server/service/notifications/publishers/runs_publisher.py +++ b/robot-server/robot_server/service/notifications/publishers/runs_publisher.py @@ -72,6 +72,12 @@ async def initialize( self._engine_state_slice = EngineStateSlice() await self._publish_runs_advise_refetch_async(run_id=run_id) + engine_state_summary = self._run_hooks.get_state_summary(self._run_hooks.run_id) + if ( + engine_state_summary is not None + and engine_state_summary.completedAt is not None + ): + await self.publish_pre_serialized_commands_notification(run_id=run_id) async def clean_up_run(self, run_id: str) -> None: """Publish final refetch and unsubscribe flags for the given run.""" @@ -99,6 +105,13 @@ async def _publish_runs_advise_unsubscribe_async(self, run_id: str) -> None: topic=f"{Topics.RUNS}/{run_id}" ) + async def publish_pre_serialized_commands_notification(self, run_id: str) -> None: + """Publishes notification for GET /runs/:runId/commandsAsPreSerializedList.""" + if self._run_hooks is not None: + await self._client.publish_advise_refetch_async( + topic=f"{Topics.RUNS_PRE_SERIALIZED_COMMANDS}/{run_id}" + ) + async def _handle_current_command_change(self) -> None: """Publish a refetch flag if the current command has changed.""" if self._run_hooks is not None and self._engine_state_slice is not None: diff --git a/robot-server/robot_server/service/notifications/topics.py b/robot-server/robot_server/service/notifications/topics.py index 34f2fd0eea14..65c5aedb66fe 100644 --- a/robot-server/robot_server/service/notifications/topics.py +++ b/robot-server/robot_server/service/notifications/topics.py @@ -14,3 +14,4 @@ class Topics(str, Enum): MAINTENANCE_RUNS_CURRENT_RUN = f"{_TOPIC_BASE}/maintenance_runs/current_run" RUNS_CURRENT_COMMAND = f"{_TOPIC_BASE}/runs/current_command" RUNS = f"{_TOPIC_BASE}/runs" + RUNS_PRE_SERIALIZED_COMMANDS = f"{_TOPIC_BASE}/runs/pre_serialized_commands" diff --git a/robot-server/tests/runs/test_run_controller.py b/robot-server/tests/runs/test_run_controller.py index a844cdcc6d57..71fc92f84660 100644 --- a/robot-server/tests/runs/test_run_controller.py +++ b/robot-server/tests/runs/test_run_controller.py @@ -14,6 +14,7 @@ from opentrons.protocol_engine.types import RunTimeParameter, BooleanParameter from opentrons.protocol_runner import RunResult, JsonRunner, PythonAndLegacyRunner +from robot_server.service.notifications import RunsPublisher from robot_server.service.task_runner import TaskRunner from robot_server.runs.action_models import RunAction, RunActionType from robot_server.runs.engine_store import EngineStore @@ -41,6 +42,12 @@ def mock_task_runner(decoy: Decoy) -> TaskRunner: return decoy.mock(cls=TaskRunner) +@pytest.fixture() +def mock_runs_publisher(decoy: Decoy) -> RunsPublisher: + """Get a mock RunsPublisher.""" + return decoy.mock(cls=RunsPublisher) + + @pytest.fixture def run_id() -> str: """A run identifier value.""" @@ -90,6 +97,7 @@ def subject( mock_engine_store: EngineStore, mock_run_store: RunStore, mock_task_runner: TaskRunner, + mock_runs_publisher: RunsPublisher, ) -> RunController: """Get a RunController test subject.""" return RunController( @@ -97,6 +105,7 @@ def subject( engine_store=mock_engine_store, run_store=mock_run_store, task_runner=mock_task_runner, + runs_publisher=mock_runs_publisher, ) @@ -135,6 +144,7 @@ async def test_create_play_action_to_start( mock_engine_store: EngineStore, mock_run_store: RunStore, mock_task_runner: TaskRunner, + mock_runs_publisher: RunsPublisher, engine_state_summary: StateSummary, run_time_parameters: List[RunTimeParameter], protocol_commands: List[pe_commands.Command], @@ -181,6 +191,7 @@ async def test_create_play_action_to_start( commands=protocol_commands, run_time_parameters=run_time_parameters, ), + await mock_runs_publisher.publish_pre_serialized_commands_notification(run_id), times=1, ) diff --git a/robot-server/tests/runs/test_run_data_manager.py b/robot-server/tests/runs/test_run_data_manager.py index dff1f28c1932..12ced28fdb0b 100644 --- a/robot-server/tests/runs/test_run_data_manager.py +++ b/robot-server/tests/runs/test_run_data_manager.py @@ -587,6 +587,7 @@ async def test_update_current( run_command: commands.Command, mock_engine_store: EngineStore, mock_run_store: RunStore, + mock_runs_publisher: RunsPublisher, subject: RunDataManager, ) -> None: """It should persist the current run and clear the engine on current=false.""" @@ -611,6 +612,10 @@ async def test_update_current( result = await subject.update(run_id=run_id, current=False) + decoy.verify( + await mock_runs_publisher.publish_pre_serialized_commands_notification(run_id), + times=1, + ) assert result == Run( current=False, id=run_resource.run_id, @@ -637,6 +642,7 @@ async def test_update_current_noop( run_command: commands.Command, mock_engine_store: EngineStore, mock_run_store: RunStore, + mock_runs_publisher: RunsPublisher, subject: RunDataManager, current: Optional[bool], ) -> None: @@ -661,6 +667,7 @@ async def test_update_current_noop( commands=matchers.Anything(), run_time_parameters=matchers.Anything(), ), + await mock_runs_publisher.publish_pre_serialized_commands_notification(run_id), times=0, )