diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index a33e6b3..cc11ff0 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -78,7 +78,7 @@ jobs: env: NOX_SESSION: ${{ matrix.nox-session }} run: nox -R -e "$NOX_SESSION" - timeout-minutes: 10 + timeout-minutes: 2 # This job runs if all the `nox` matrix jobs ran and succeeded. # It is only used to have a single job that we can require in branch @@ -185,7 +185,7 @@ jobs: --platform linux/${{ matrix.arch }} \ localhost/nox-cross-arch:latest \ bash -c "pip install -e .[dev-noxfile]; nox --install-only -e ${{ matrix.nox-session }}; pip freeze; nox -R -e ${{ matrix.nox-session }}" - timeout-minutes: 30 + timeout-minutes: 3 # This ensures that the runner has access to the pip cache. - name: Reset pip cache ownership diff --git a/README.md b/README.md index 95ae323..870cbfe 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ The [`Dispatcher` class](https://frequenz-floss.github.io/frequenz-dispatch-pyth ```python import os -from frequenz.dispatch import Dispatcher, RunningState +from frequenz.dispatch import Dispatcher from unittest.mock import MagicMock async def run(): @@ -42,29 +42,29 @@ async def run(): changed_running_status_rx = dispatcher.running_status_change.new_receiver() async for dispatch in changed_running_status_rx: - match dispatch.running("DEMO_TYPE"): - case RunningState.RUNNING: - print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}") - if actor.is_running: - actor.reconfigure( - components=dispatch.target, - run_parameters=dispatch.payload, # custom actor parameters - dry_run=dispatch.dry_run, - until=dispatch.until, - ) # this will reconfigure the actor - else: - # this will start a new actor with the given components - # and run it for the duration of the dispatch - actor.start( - components=dispatch.target, - run_parameters=dispatch.payload, # custom actor parameters - dry_run=dispatch.dry_run, - until=dispatch.until, - ) - case RunningState.STOPPED: - actor.stop() # this will stop the actor - case RunningState.DIFFERENT_TYPE: - pass # dispatch not for this type + if dispatch.type != "MY_TYPE": + continue + + if dispatch.started: + print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}") + if actor.is_running: + actor.reconfigure( + components=dispatch.target, + run_parameters=dispatch.payload, # custom actor parameters + dry_run=dispatch.dry_run, + until=dispatch.until, + ) # this will reconfigure the actor + else: + # this will start a new actor with the given components + # and run it for the duration of the dispatch + actor.start( + components=dispatch.target, + run_parameters=dispatch.payload, # custom actor parameters + dry_run=dispatch.dry_run, + until=dispatch.until, + ) + else: + actor.stop() # this will stop the actor ``` ## Supported Platforms diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 4b5cede..961bb7e 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -6,7 +6,7 @@ ## Upgrading - +* The method `Dispatch.running(type: str)` was replaced with the property `Dispatch.started: bool`. ## New Features diff --git a/pyproject.toml b/pyproject.toml index 2230c4c..bc146e9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,7 +34,6 @@ classifiers = [ ] requires-python = ">= 3.11, < 4" dependencies = [ - "python-dateutil >= 2.8.2, < 3.0", "typing-extensions >= 4.11.0, < 5.0.0", # Make sure to update the version for cross-referencing also in the # mkdocs.yml file when changing the version here (look for the config key @@ -72,9 +71,9 @@ dev-mkdocs = [ ] dev-mypy = [ "mypy == 1.13.0", - "grpc-stubs == 1.53.0.5", # This dependency introduces breaking changes in patch releases + # This dependency introduces breaking changes in patch releases + "grpc-stubs == 1.53.0.5", "types-Markdown == 3.7.0.20240822", - "types-python-dateutil==2.9.0.20241003", # For checking the noxfile, docs/ script, and tests "frequenz-dispatch[dev-mkdocs,dev-noxfile,dev-pytest]", ] diff --git a/src/frequenz/dispatch/__init__.py b/src/frequenz/dispatch/__init__.py index 037665c..0dee73a 100644 --- a/src/frequenz/dispatch/__init__.py +++ b/src/frequenz/dispatch/__init__.py @@ -15,7 +15,7 @@ """ -from ._dispatch import Dispatch, RunningState +from ._dispatch import Dispatch from ._dispatcher import Dispatcher, ReceiverFetcher from ._event import Created, Deleted, DispatchEvent, Updated from ._managing_actor import DispatchManagingActor, DispatchUpdate @@ -28,7 +28,6 @@ "ReceiverFetcher", "Updated", "Dispatch", - "RunningState", "DispatchManagingActor", "DispatchUpdate", ] diff --git a/src/frequenz/dispatch/_dispatch.py b/src/frequenz/dispatch/_dispatch.py index e28bc7e..5d0d3a0 100644 --- a/src/frequenz/dispatch/_dispatch.py +++ b/src/frequenz/dispatch/_dispatch.py @@ -3,53 +3,12 @@ """Dispatch type with support for next_run calculation.""" - -import logging from dataclasses import dataclass from datetime import datetime, timezone -from enum import Enum -from typing import Iterator, cast +from typing import Iterator -from dateutil import rrule -from frequenz.client.dispatch.recurrence import Frequency, Weekday from frequenz.client.dispatch.types import Dispatch as BaseDispatch -_logger = logging.getLogger(__name__) -"""The logger for this module.""" - -_RRULE_FREQ_MAP = { - Frequency.MINUTELY: rrule.MINUTELY, - Frequency.HOURLY: rrule.HOURLY, - Frequency.DAILY: rrule.DAILY, - Frequency.WEEKLY: rrule.WEEKLY, - Frequency.MONTHLY: rrule.MONTHLY, -} -"""To map from our Frequency enum to the dateutil library enum.""" - -_RRULE_WEEKDAY_MAP = { - Weekday.MONDAY: rrule.MO, - Weekday.TUESDAY: rrule.TU, - Weekday.WEDNESDAY: rrule.WE, - Weekday.THURSDAY: rrule.TH, - Weekday.FRIDAY: rrule.FR, - Weekday.SATURDAY: rrule.SA, - Weekday.SUNDAY: rrule.SU, -} -"""To map from our Weekday enum to the dateutil library enum.""" - - -class RunningState(Enum): - """The running state of a dispatch.""" - - RUNNING = "RUNNING" - """The dispatch is running.""" - - STOPPED = "STOPPED" - """The dispatch is stopped.""" - - DIFFERENT_TYPE = "DIFFERENT_TYPE" - """The dispatch is for a different type.""" - @dataclass(frozen=True) class Dispatch(BaseDispatch): @@ -58,215 +17,55 @@ class Dispatch(BaseDispatch): deleted: bool = False """Whether the dispatch is deleted.""" - running_state_change_synced: datetime | None = None - """The last time a message was sent about the running state change.""" - def __init__( self, client_dispatch: BaseDispatch, deleted: bool = False, - running_state_change_synced: datetime | None = None, ): """Initialize the dispatch. Args: client_dispatch: The client dispatch. deleted: Whether the dispatch is deleted. - running_state_change_synced: Timestamp of the last running state change message. """ super().__init__(**client_dispatch.__dict__) # Work around frozen to set deleted object.__setattr__(self, "deleted", deleted) - object.__setattr__( - self, - "running_state_change_synced", - running_state_change_synced, - ) def _set_deleted(self) -> None: """Mark the dispatch as deleted.""" object.__setattr__(self, "deleted", True) @property - def _running_status_notified(self) -> bool: - """Check that the latest running state change notification was sent. - - Returns: - True if the latest running state change notification was sent, False otherwise. - """ - return self.running_state_change_synced == self.update_time - - def _set_running_status_notified(self) -> None: - """Mark the latest running state change notification as sent.""" - object.__setattr__(self, "running_state_change_synced", self.update_time) - - def running(self, type_: str) -> RunningState: - """Check if the dispatch is currently supposed to be running. - - Args: - type_: The type of the dispatch that should be running. - - Returns: - RUNNING if the dispatch is running, - STOPPED if it is stopped, - DIFFERENT_TYPE if it is for a different type. - """ - if self.type != type_: - return RunningState.DIFFERENT_TYPE - - if not self.active or self.deleted: - return RunningState.STOPPED - - now = datetime.now(tz=timezone.utc) - - if now < self.start_time: - return RunningState.STOPPED - # A dispatch without duration is always running once it started - if self.duration is None: - return RunningState.RUNNING - - if until := self._until(now): - return RunningState.RUNNING if now < until else RunningState.STOPPED - - return RunningState.STOPPED - - @property - def until(self) -> datetime | None: - """Time when the dispatch should end. - - Returns the time that a running dispatch should end. - If the dispatch is not running, None is returned. + def started(self) -> bool: + """Check if the dispatch is started. Returns: - The time when the dispatch should end or None if the dispatch is not running. + True if the dispatch is started, False otherwise. """ - if not self.active or self.deleted: - return None + if self.deleted: + return False - now = datetime.now(tz=timezone.utc) - return self._until(now) + return super().started - @property # noqa is needed because of a bug in pydoclint that makes it think a `return` without a return # value needs documenting - def missed_runs(self) -> Iterator[datetime]: # noqa: DOC405 + def missed_runs(self, since: datetime) -> Iterator[datetime]: # noqa: DOC405 """Yield all missed runs of a dispatch. Yields all missed runs of a dispatch. - If a running state change notification was not sent in time - due to connection issues, this method will yield all missed runs - since the last sent notification. + Args: + since: The time to start checking for missed runs. Returns: A generator that yields all missed runs of a dispatch. - """ - if self.update_time == self.running_state_change_synced: - return - from_time = self.update_time + Yields: + datetime: The missed run. + """ now = datetime.now(tz=timezone.utc) - while (next_run := self.next_run_after(from_time)) and next_run < now: + while (next_run := self.next_run_after(since)) and next_run < now: yield next_run - from_time = next_run - - @property - def next_run(self) -> datetime | None: - """Calculate the next run of a dispatch. - - Returns: - The next run of the dispatch or None if the dispatch is finished. - """ - return self.next_run_after(datetime.now(tz=timezone.utc)) - - def next_run_after(self, after: datetime) -> datetime | None: - """Calculate the next run of a dispatch. - - Args: - after: The time to calculate the next run from. - - Returns: - The next run of the dispatch or None if the dispatch is finished. - """ - if ( - not self.recurrence.frequency - or self.recurrence.frequency == Frequency.UNSPECIFIED - or self.duration is None # Infinite duration - ): - if after > self.start_time: - return None - return self.start_time - - # Make sure no weekday is UNSPECIFIED - if Weekday.UNSPECIFIED in self.recurrence.byweekdays: - _logger.warning("Dispatch %s has UNSPECIFIED weekday, ignoring...", self.id) - return None - - # No type information for rrule, so we need to cast - return cast(datetime | None, self._prepare_rrule().after(after, inc=True)) - - def _prepare_rrule(self) -> rrule.rrule: - """Prepare the rrule object. - - Returns: - The rrule object. - - Raises: - ValueError: If the interval is invalid. - """ - count, until = (None, None) - if end := self.recurrence.end_criteria: - count = end.count - until = end.until - - if self.recurrence.interval is None or self.recurrence.interval < 1: - raise ValueError("Interval must be at least 1") - - rrule_obj = rrule.rrule( - freq=_RRULE_FREQ_MAP[self.recurrence.frequency], - dtstart=self.start_time, - count=count, - until=until, - byminute=self.recurrence.byminutes or None, - byhour=self.recurrence.byhours or None, - byweekday=[ - _RRULE_WEEKDAY_MAP[weekday] for weekday in self.recurrence.byweekdays - ] - or None, - bymonthday=self.recurrence.bymonthdays or None, - bymonth=self.recurrence.bymonths or None, - interval=self.recurrence.interval, - ) - - return rrule_obj - - def _until(self, now: datetime) -> datetime | None: - """Calculate the time when the dispatch should end. - - If no previous run is found, None is returned. - - Args: - now: The current time. - - Returns: - The time when the dispatch should end or None if the dispatch is not running. - - Raises: - ValueError: If the dispatch has no duration. - """ - if self.duration is None: - raise ValueError("_until: Dispatch has no duration") - - if ( - not self.recurrence.frequency - or self.recurrence.frequency == Frequency.UNSPECIFIED - ): - return self.start_time + self.duration - - latest_past_start: datetime | None = self._prepare_rrule().before(now, inc=True) - - if not latest_past_start: - return None - - return latest_past_start + self.duration + since = next_run diff --git a/src/frequenz/dispatch/_dispatcher.py b/src/frequenz/dispatch/_dispatcher.py index 01b327d..c0c8a60 100644 --- a/src/frequenz/dispatch/_dispatcher.py +++ b/src/frequenz/dispatch/_dispatcher.py @@ -54,7 +54,7 @@ class Dispatcher: Example: Processing running state change dispatches ```python import os - from frequenz.dispatch import Dispatcher, RunningState + from frequenz.dispatch import Dispatcher from unittest.mock import MagicMock async def run(): @@ -75,29 +75,29 @@ async def run(): changed_running_status = dispatcher.running_status_change.new_receiver() async for dispatch in changed_running_status: - match dispatch.running("DEMO_TYPE"): - case RunningState.RUNNING: - print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}") - if actor.is_running: - actor.reconfigure( - components=dispatch.target, - run_parameters=dispatch.payload, # custom actor parameters - dry_run=dispatch.dry_run, - until=dispatch.until, - ) # this will reconfigure the actor - else: - # this will start a new actor with the given components - # and run it for the duration of the dispatch - actor.start( - components=dispatch.target, - run_parameters=dispatch.payload, # custom actor parameters - dry_run=dispatch.dry_run, - until=dispatch.until, - ) - case RunningState.STOPPED: - actor.stop() # this will stop the actor - case RunningState.DIFFERENT_TYPE: - pass # dispatch not for this type + if dispatch.type != "YOUR_DISPATCH_TYPE": + continue + + if dispatch.started: + print(f"Executing dispatch {dispatch.id}, due on {dispatch.start_time}") + if actor.is_running: + actor.reconfigure( + components=dispatch.target, + run_parameters=dispatch.payload, # custom actor parameters + dry_run=dispatch.dry_run, + until=dispatch.until, + ) # this will reconfigure the actor + else: + # this will start a new actor with the given components + # and run it for the duration of the dispatch + actor.start( + components=dispatch.target, + run_parameters=dispatch.payload, # custom actor parameters + dry_run=dispatch.dry_run, + until=dispatch.until, + ) + else: + actor.stop() # this will stop the actor ``` Example: Getting notification about dispatch lifecycle events @@ -255,10 +255,6 @@ def running_status_change(self) -> ReceiverFetcher[Dispatch]: - The payload changed - The dispatch was deleted - Note: Reaching the end time (start_time + duration) will not - send a message, except when it was reached by modifying the duration. - - Returns: A new receiver for dispatches whose running status changed. """ diff --git a/src/frequenz/dispatch/_managing_actor.py b/src/frequenz/dispatch/_managing_actor.py index 5538e95..6ed4e57 100644 --- a/src/frequenz/dispatch/_managing_actor.py +++ b/src/frequenz/dispatch/_managing_actor.py @@ -11,7 +11,7 @@ from frequenz.client.dispatch.types import TargetComponents from frequenz.sdk.actor import Actor -from ._dispatch import Dispatch, RunningState +from ._dispatch import Dispatch _logger = logging.getLogger(__name__) @@ -121,7 +121,9 @@ def __init__( """ super().__init__() self._dispatch_rx = running_status_receiver - self._actors = frozenset([actor] if isinstance(actor, Actor) else actor) + self._actors: frozenset[Actor] = frozenset( + [actor] if isinstance(actor, Actor) else actor + ) self._dispatch_type = dispatch_type self._updates_sender = updates_sender @@ -156,25 +158,23 @@ async def _handle_dispatch(self, dispatch: Dispatch) -> None: Args: dispatch: The dispatch to handle. """ - running = dispatch.running(self._dispatch_type) - match running: - case RunningState.STOPPED: - _logger.info("Stopped by dispatch %s", dispatch.id) - await self._stop_actors("Dispatch stopped") - case RunningState.RUNNING: - if self._updates_sender is not None: - _logger.info("Updated by dispatch %s", dispatch.id) - await self._updates_sender.send( - DispatchUpdate( - components=dispatch.target, - dry_run=dispatch.dry_run, - options=dispatch.payload, - ) + if dispatch.type != self._dispatch_type: + _logger.debug("Ignoring dispatch %s", dispatch.id) + return + + if dispatch.started: + if self._updates_sender is not None: + _logger.info("Updated by dispatch %s", dispatch.id) + await self._updates_sender.send( + DispatchUpdate( + components=dispatch.target, + dry_run=dispatch.dry_run, + options=dispatch.payload, ) - - _logger.info("Started by dispatch %s", dispatch.id) - self._start_actors() - case RunningState.DIFFERENT_TYPE: - _logger.debug( - "Unknown dispatch! Ignoring dispatch of type %s", dispatch.type ) + + _logger.info("Started by dispatch %s", dispatch.id) + self._start_actors() + else: + _logger.info("Stopped by dispatch %s", dispatch.id) + await self._stop_actors("Dispatch stopped") diff --git a/src/frequenz/dispatch/actor.py b/src/frequenz/dispatch/actor.py index 2f58845..7bd6e49 100644 --- a/src/frequenz/dispatch/actor.py +++ b/src/frequenz/dispatch/actor.py @@ -15,7 +15,7 @@ from frequenz.client.dispatch.types import Event from frequenz.sdk.actor import Actor -from ._dispatch import Dispatch, RunningState +from ._dispatch import Dispatch from ._event import Created, Deleted, DispatchEvent, Updated _logger = logging.getLogger(__name__) @@ -126,7 +126,6 @@ async def _run(self) -> None: self._dispatches.pop(dispatch.id) await self._update_dispatch_schedule_and_notify(None, dispatch) - dispatch._set_deleted() # pylint: disable=protected-access await self._lifecycle_updates_sender.send( Deleted(dispatch=dispatch) ) @@ -142,7 +141,7 @@ async def _execute_scheduled_event(self, dispatch: Dispatch) -> None: # The timer is always a tiny bit delayed, so we need to check if the # actor is supposed to be running now (we're assuming it wasn't already # running, as all checks are done before scheduling) - if dispatch.running(dispatch.type) == RunningState.RUNNING: + if dispatch.started: # If it should be running, schedule the stop event self._schedule_stop(dispatch) # If the actor is not running, we need to schedule the next start @@ -193,7 +192,7 @@ async def _fetch(self) -> None: await self._lifecycle_updates_sender.send(Deleted(dispatch=dispatch)) await self._update_dispatch_schedule_and_notify(None, dispatch) - # Set deleted only here as it influences the result of dispatch.running() + # Set deleted only here as it influences the result of dispatch.started # which is used in above in _running_state_change dispatch._set_deleted() # pylint: disable=protected-access await self._lifecycle_updates_sender.send(Deleted(dispatch=dispatch)) @@ -221,8 +220,11 @@ async def _update_dispatch_schedule_and_notify( if not dispatch and old_dispatch: self._remove_scheduled(old_dispatch) + was_running = old_dispatch.started + old_dispatch._set_deleted() # pylint: disable=protected-access) + # If the dispatch was running, we need to notify - if old_dispatch.running(old_dispatch.type) == RunningState.RUNNING: + if was_running: await self._send_running_state_change(old_dispatch) # A new dispatch was created @@ -230,9 +232,8 @@ async def _update_dispatch_schedule_and_notify( assert not self._remove_scheduled( dispatch ), "New dispatch already scheduled?!" - # If its currently running, send notification right away - if dispatch.running(dispatch.type) == RunningState.RUNNING: + if dispatch.started: await self._send_running_state_change(dispatch) self._schedule_stop(dispatch) @@ -249,7 +250,7 @@ async def _update_dispatch_schedule_and_notify( if self._update_changed_running_state(dispatch, old_dispatch): await self._send_running_state_change(dispatch) - if dispatch.running(dispatch.type) == RunningState.RUNNING: + if dispatch.started: self._schedule_stop(dispatch) else: self._schedule_start(dispatch) @@ -336,7 +337,7 @@ def _update_changed_running_state( """ # If any of the runtime attributes changed, we need to send a message runtime_state_attributes = [ - "running", + "started", "type", "target", "duration", @@ -359,6 +360,3 @@ async def _send_running_state_change(self, dispatch: Dispatch) -> None: dispatch: The dispatch that changed. """ await self._running_state_change_sender.send(dispatch) - # Update the last sent notification time - # so we know if this change was already sent - dispatch._set_running_status_notified() # pylint: disable=protected-access diff --git a/tests/test_frequenz_dispatch.py b/tests/test_frequenz_dispatch.py index 9771210..1406121 100644 --- a/tests/test_frequenz_dispatch.py +++ b/tests/test_frequenz_dispatch.py @@ -18,14 +18,7 @@ from frequenz.client.dispatch.types import Dispatch as BaseDispatch from pytest import fixture -from frequenz.dispatch import ( - Created, - Deleted, - Dispatch, - DispatchEvent, - RunningState, - Updated, -) +from frequenz.dispatch import Created, Deleted, Dispatch, DispatchEvent, Updated from frequenz.dispatch.actor import DispatchingActor @@ -151,8 +144,6 @@ async def _test_new_dispatch_created( assert False, "Expected a created event" case Created(dispatch): received = Dispatch(update_dispatch(sample, dispatch)) - received._set_running_status_notified() # pylint: disable=protected-access - dispatch._set_running_status_notified() # pylint: disable=protected-access assert dispatch == received return dispatch @@ -191,10 +182,7 @@ async def test_existing_dispatch_updated( case Created(dispatch) | Deleted(dispatch): assert False, f"Expected an updated event, got {dispatch_event}" case Updated(dispatch): - assert dispatch == Dispatch( - updated, - running_state_change_synced=dispatch.running_state_change_synced, - ) + assert dispatch == Dispatch(updated) await asyncio.sleep(1) @@ -219,7 +207,6 @@ async def test_existing_dispatch_deleted( assert False, "Expected a deleted event" case Deleted(dispatch): sample._set_deleted() # pylint: disable=protected-access - dispatch._set_running_status_notified() # pylint: disable=protected-access assert dispatch == sample @@ -241,7 +228,7 @@ async def test_dispatch_inf_duration_deleted( await asyncio.sleep(40) # Expect notification of the dispatch being ready to run ready_dispatch = await actor_env.running_state_change.receive() - assert ready_dispatch.running(sample.type) == RunningState.RUNNING + assert ready_dispatch.started # Now delete the dispatch await actor_env.client.delete( @@ -251,7 +238,7 @@ async def test_dispatch_inf_duration_deleted( await asyncio.sleep(1) # Expect notification to stop the dispatch done_dispatch = await actor_env.running_state_change.receive() - assert done_dispatch.running(sample.type) == RunningState.STOPPED + assert done_dispatch.started is False async def test_dispatch_inf_duration_updated_stopped_started( @@ -272,7 +259,7 @@ async def test_dispatch_inf_duration_updated_stopped_started( await asyncio.sleep(40) # Expect notification of the dispatch being ready to run ready_dispatch = await actor_env.running_state_change.receive() - assert ready_dispatch.running(sample.type) == RunningState.RUNNING + assert ready_dispatch.started # Now update the dispatch to set active=False (stop it) await actor_env.client.update( @@ -284,7 +271,7 @@ async def test_dispatch_inf_duration_updated_stopped_started( await asyncio.sleep(1) # Expect notification to stop the dispatch stopped_dispatch = await actor_env.running_state_change.receive() - assert stopped_dispatch.running(sample.type) == RunningState.STOPPED + assert stopped_dispatch.started is False # Now update the dispatch to set active=True (start it again) await actor_env.client.update( @@ -296,7 +283,7 @@ async def test_dispatch_inf_duration_updated_stopped_started( await asyncio.sleep(1) # Expect notification of the dispatch being ready to run again started_dispatch = await actor_env.running_state_change.receive() - assert started_dispatch.running(sample.type) == RunningState.RUNNING + assert started_dispatch.started async def test_dispatch_inf_duration_updated_to_finite_and_stops( @@ -321,7 +308,7 @@ async def test_dispatch_inf_duration_updated_to_finite_and_stops( await asyncio.sleep(1) # Expect notification of the dispatch being ready to run ready_dispatch = await actor_env.running_state_change.receive() - assert ready_dispatch.running(sample.type) == RunningState.RUNNING + assert ready_dispatch.started # Update the dispatch to set duration to a finite duration that has already passed # The dispatch has been running for 5 seconds; set duration to 5 seconds @@ -335,7 +322,7 @@ async def test_dispatch_inf_duration_updated_to_finite_and_stops( await asyncio.sleep(1) # Expect notification to stop the dispatch because the duration has passed stopped_dispatch = await actor_env.running_state_change.receive() - assert stopped_dispatch.running(sample.type) == RunningState.STOPPED + assert stopped_dispatch.started is False async def test_dispatch_schedule( @@ -359,9 +346,6 @@ async def test_dispatch_schedule( # Expect notification of the dispatch being ready to run ready_dispatch = await actor_env.running_state_change.receive() - # Set flag we expect to be different to compare the dispatch with the one received - dispatch._set_running_status_notified() # pylint: disable=protected-access - assert ready_dispatch == dispatch assert dispatch.duration is not None @@ -396,7 +380,7 @@ async def test_dispatch_inf_duration_updated_to_finite_and_continues( await asyncio.sleep(1) # Expect notification of the dispatch being ready to run ready_dispatch = await actor_env.running_state_change.receive() - assert ready_dispatch.running(sample.type) == RunningState.RUNNING + assert ready_dispatch.started # Update the dispatch to set duration to a finite duration that hasn't passed yet # The dispatch has been running for 5 seconds; set duration to 100 seconds @@ -414,7 +398,7 @@ async def test_dispatch_inf_duration_updated_to_finite_and_continues( await asyncio.sleep(1) # Expect notification to stop the dispatch because the duration has now passed stopped_dispatch = await actor_env.running_state_change.receive() - assert stopped_dispatch.running(sample.type) == RunningState.STOPPED + assert stopped_dispatch.started is False async def test_dispatch_new_but_finished( @@ -497,4 +481,4 @@ async def test_notification_on_actor_start( # Expect notification of the running dispatch being ready to run ready_dispatch = await actor_env.running_state_change.receive() - assert ready_dispatch.running(running_dispatch.type) == RunningState.RUNNING + assert ready_dispatch.started