Skip to content

Commit

Permalink
added notification for pre-serialized commands list availability
Browse files Browse the repository at this point in the history
  • Loading branch information
sanni-t committed Apr 30, 2024
1 parent a35f811 commit 6a59a88
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 2 deletions.
3 changes: 3 additions & 0 deletions robot-server/robot_server/runs/router/actions_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
MaintenanceEngineStore,
)
from robot_server.maintenance_runs.dependencies import get_maintenance_engine_store
from robot_server.service.notifications import get_runs_publisher, RunsPublisher

log = logging.getLogger(__name__)
actions_router = APIRouter()
Expand All @@ -45,6 +46,7 @@ async def get_run_controller(
task_runner: TaskRunner = Depends(get_task_runner),
engine_store: EngineStore = Depends(get_engine_store),
run_store: RunStore = Depends(get_run_store),
runs_publisher: RunsPublisher = Depends(get_runs_publisher),
) -> RunController:
"""Get a RunController for the current run.
Expand All @@ -67,6 +69,7 @@ async def get_run_controller(
task_runner=task_runner,
engine_store=engine_store,
run_store=run_store,
runs_publisher=runs_publisher,
)


Expand Down
9 changes: 8 additions & 1 deletion robot-server/robot_server/runs/run_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

from opentrons.protocol_engine.types import DeckConfigurationType

from robot_server.service.notifications import RunsPublisher

log = logging.getLogger(__name__)


Expand All @@ -21,19 +23,21 @@ class RunActionNotAllowedError(RoboticsInteractionError):


class RunController:
"""An interface to manage the side-effects of requested run actions."""
"""An interface to manage the side effects of requested run actions."""

def __init__(
self,
run_id: str,
task_runner: TaskRunner,
engine_store: EngineStore,
run_store: RunStore,
runs_publisher: RunsPublisher,
) -> None:
self._run_id = run_id
self._task_runner = task_runner
self._engine_store = engine_store
self._run_store = run_store
self._runs_publisher = runs_publisher

def create_action(
self,
Expand Down Expand Up @@ -108,3 +112,6 @@ async def _run_protocol_and_insert_result(
commands=result.commands,
run_time_parameters=result.parameters,
)
await self._runs_publisher.publish_pre_serialized_commands_notification(
self._run_id
)
11 changes: 10 additions & 1 deletion robot-server/robot_server/runs/run_data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,10 +294,16 @@ async def delete(self, run_id: str) -> None:
self._run_store.remove(run_id=run_id)

async def update(self, run_id: str, current: Optional[bool]) -> Union[Run, BadRun]:
"""Get and potentially archive a run.
"""Get and potentially archive the current run.
Args:
run_id: The run to get and maybe archive.
current: Whether to mark the run as current or not.
If `current` set to False, then the run is 'un-current'ed by
stopping the run, saving the final run data to the run store,
and clearing the engine and runner.
If 'current' is True or not specified, we simply fetch the run's
data from memory and database.
Returns:
The updated run.
Expand All @@ -324,6 +330,9 @@ async def update(self, run_id: str, current: Optional[bool]) -> Union[Run, BadRu
commands=commands,
run_time_parameters=parameters,
)
await self._runs_publisher.publish_pre_serialized_commands_notification(
run_id
)
else:
state_summary = self._engine_store.engine.state_view.get_summary()
parameters = self._engine_store.runner.run_time_parameters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ async def initialize(
self._engine_state_slice = EngineStateSlice()

await self._publish_runs_advise_refetch_async(run_id=run_id)
engine_state_summary = self._run_hooks.get_state_summary(self._run_hooks.run_id)
if (
engine_state_summary is not None
and engine_state_summary.completedAt is not None
):
await self.publish_pre_serialized_commands_notification(run_id=run_id)

async def clean_up_run(self, run_id: str) -> None:
"""Publish final refetch and unsubscribe flags for the given run."""
Expand Down Expand Up @@ -99,6 +105,13 @@ async def _publish_runs_advise_unsubscribe_async(self, run_id: str) -> None:
topic=f"{Topics.RUNS}/{run_id}"
)

async def publish_pre_serialized_commands_notification(self, run_id: str) -> None:
"""Publishes notification for GET /runs/:runId/commandsAsPreSerializedList."""
if self._run_hooks is not None:
await self._client.publish_advise_refetch_async(
topic=f"{Topics.RUNS_PRE_SERIALIZED_COMMANDS}/{run_id}"
)

async def _handle_current_command_change(self) -> None:
"""Publish a refetch flag if the current command has changed."""
if self._run_hooks is not None and self._engine_state_slice is not None:
Expand Down
1 change: 1 addition & 0 deletions robot-server/robot_server/service/notifications/topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ class Topics(str, Enum):
MAINTENANCE_RUNS_CURRENT_RUN = f"{_TOPIC_BASE}/maintenance_runs/current_run"
RUNS_CURRENT_COMMAND = f"{_TOPIC_BASE}/runs/current_command"
RUNS = f"{_TOPIC_BASE}/runs"
RUNS_PRE_SERIALIZED_COMMANDS = f"{_TOPIC_BASE}/runs/pre_serialized_commands"
11 changes: 11 additions & 0 deletions robot-server/tests/runs/test_run_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from opentrons.protocol_engine.types import RunTimeParameter, BooleanParameter
from opentrons.protocol_runner import RunResult, JsonRunner, PythonAndLegacyRunner

from robot_server.service.notifications import RunsPublisher
from robot_server.service.task_runner import TaskRunner
from robot_server.runs.action_models import RunAction, RunActionType
from robot_server.runs.engine_store import EngineStore
Expand Down Expand Up @@ -41,6 +42,12 @@ def mock_task_runner(decoy: Decoy) -> TaskRunner:
return decoy.mock(cls=TaskRunner)


@pytest.fixture()
def mock_runs_publisher(decoy: Decoy) -> RunsPublisher:
"""Get a mock RunsPublisher."""
return decoy.mock(cls=RunsPublisher)


@pytest.fixture
def run_id() -> str:
"""A run identifier value."""
Expand Down Expand Up @@ -90,13 +97,15 @@ def subject(
mock_engine_store: EngineStore,
mock_run_store: RunStore,
mock_task_runner: TaskRunner,
mock_runs_publisher: RunsPublisher,
) -> RunController:
"""Get a RunController test subject."""
return RunController(
run_id=run_id,
engine_store=mock_engine_store,
run_store=mock_run_store,
task_runner=mock_task_runner,
runs_publisher=mock_runs_publisher,
)


Expand Down Expand Up @@ -135,6 +144,7 @@ async def test_create_play_action_to_start(
mock_engine_store: EngineStore,
mock_run_store: RunStore,
mock_task_runner: TaskRunner,
mock_runs_publisher: RunsPublisher,
engine_state_summary: StateSummary,
run_time_parameters: List[RunTimeParameter],
protocol_commands: List[pe_commands.Command],
Expand Down Expand Up @@ -181,6 +191,7 @@ async def test_create_play_action_to_start(
commands=protocol_commands,
run_time_parameters=run_time_parameters,
),
await mock_runs_publisher.publish_pre_serialized_commands_notification(run_id),
times=1,
)

Expand Down
7 changes: 7 additions & 0 deletions robot-server/tests/runs/test_run_data_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,7 @@ async def test_update_current(
run_command: commands.Command,
mock_engine_store: EngineStore,
mock_run_store: RunStore,
mock_runs_publisher: RunsPublisher,
subject: RunDataManager,
) -> None:
"""It should persist the current run and clear the engine on current=false."""
Expand All @@ -611,6 +612,10 @@ async def test_update_current(

result = await subject.update(run_id=run_id, current=False)

decoy.verify(
await mock_runs_publisher.publish_pre_serialized_commands_notification(run_id),
times=1,
)
assert result == Run(
current=False,
id=run_resource.run_id,
Expand All @@ -637,6 +642,7 @@ async def test_update_current_noop(
run_command: commands.Command,
mock_engine_store: EngineStore,
mock_run_store: RunStore,
mock_runs_publisher: RunsPublisher,
subject: RunDataManager,
current: Optional[bool],
) -> None:
Expand All @@ -661,6 +667,7 @@ async def test_update_current_noop(
commands=matchers.Anything(),
run_time_parameters=matchers.Anything(),
),
await mock_runs_publisher.publish_pre_serialized_commands_notification(run_id),
times=0,
)

Expand Down

0 comments on commit 6a59a88

Please sign in to comment.