Skip to content

Commit

Permalink
Add method to MW that waits for a number of samples
Browse files Browse the repository at this point in the history
Up to now there was no clean way to wait until
the MovingWindow got updated with a certain number
of samples.

In this commit we introduce a `wait_for_samples`
method that finishes once the number of samples
arrived.

Signed-off-by: Matthias Wende <[email protected]>
  • Loading branch information
matthias-wende-frequenz committed May 15, 2023
1 parent dd66a46 commit c136a73
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 1 deletion.
43 changes: 42 additions & 1 deletion src/frequenz/sdk/timeseries/_moving_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from typing import SupportsIndex, overload

import numpy as np
from frequenz.channels import Broadcast, Receiver, Sender
from frequenz.channels import Anycast, Broadcast, Receiver, Sender
from numpy.typing import ArrayLike

from .._internal._asyncio import cancel_and_await
Expand Down Expand Up @@ -138,6 +138,16 @@ def __init__( # pylint: disable=too-many-arguments
self._resampler_sender: Sender[Sample] | None = None
self._resampler_task: asyncio.Task[None] | None = None

"""The number of samples that have been received."""
self.count_samples = 0
"""The number of samples to wait for before
the wait_for_samples method triggers."""
self.wait_for_num_samples = 0

self.wait_for_samples_channel = Anycast[None]()
self.wait_for_samples_sender = self.wait_for_samples_channel.new_sender()
self.wait_for_samples_recv = self.wait_for_samples_channel.new_receiver()

if resampler_config:
assert (
resampler_config.resampling_period <= size
Expand Down Expand Up @@ -177,12 +187,43 @@ async def _run_impl(self) -> None:
else:
self._buffer.update(sample)

self.count_samples += 1
if self.wait_for_num_samples != 0:
if self.count_samples % self.wait_for_num_samples == 0:
await self.wait_for_samples_sender.send(None)

except asyncio.CancelledError:
_logger.info("MovingWindow task has been cancelled.")
raise

_logger.error("Channel has been closed")

async def wait_for_samples(self, num_samples: int) -> None:
"""Wait until a given number of samples has been received.
The sample counter is updated irrespective of whether this
method is called or not. Thus this might trigger when
a smaller number of samples than the given number has been
updated.
Args:
num_samples: The number of samples to wait for.
Raises:
ValueError: if the number of samples is less than or equal to zero.
"""
if num_samples <= 0:
raise ValueError(
"The number of samples to wait for should be greater than zero."
)

if self.count_samples % num_samples == 0:
return

self.wait_for_num_samples = num_samples
await self.wait_for_samples_recv.receive()
self.wait_for_num_samples = 0

async def stop(self) -> None:
"""Cancel the running tasks and stop the MovingWindow."""
await cancel_and_await(self._update_window_task)
Expand Down
17 changes: 17 additions & 0 deletions tests/timeseries/test_moving_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,23 @@ async def test_window_size() -> None:
assert len(window) == 5


async def test_wait_for_samples() -> None:
"""Test waiting for samples."""

async def push_data_delayed() -> None:
"""Push data after a delay."""
await asyncio.sleep(0)
await sender.send(Sample(datetime.now(tz=timezone.utc), 1.0))

window, sender = init_moving_window(timedelta(seconds=1))
await push_lm_data(sender, [1])
await window.wait_for_samples(1)
assert np.array_equal(window[0], 1.0)

asyncio.create_task(push_data_delayed())
await window.wait_for_samples(1)


# pylint: disable=redefined-outer-name
async def test_resampling_window(fake_time: time_machine.Coordinates) -> None:
"""Test resampling in MovingWindow."""
Expand Down

0 comments on commit c136a73

Please sign in to comment.