Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add method to MovingWindow that waits for a number of samples #387

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions src/frequenz/sdk/timeseries/_moving_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,13 @@ def __init__( # pylint: disable=too-many-arguments
self._resampler_sender: Sender[Sample] | None = None
self._resampler_task: asyncio.Task[None] | None = None

self._wait_for_num_samples: int = 0
"""The number of samples to wait for before the wait_for_num_samples channels
sends out an event."""
Comment on lines +141 to +143

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend using nouns for non-boolean attributes:

Suggested change
self._wait_for_num_samples: int = 0
"""The number of samples to wait for before the wait_for_num_samples channels
sends out an event."""
self._num_samples_to_wait_for: int = 0
"""The number of samples to wait for before triggering an event through the channel."""

self._wait_for_samples_channel = Broadcast[None](
"Wait for number of samples channel."
)
Comment on lines +144 to +146

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the text here is for debugging purposes only, right @shsms?

I'd suggest using a shorter string for that and using the long string as documentation for the variable:

Suggested change
self._wait_for_samples_channel = Broadcast[None](
"Wait for number of samples channel."
)
self._wait_for_samples_channel = Broadcast[None]("wait-for-samples")
"""Channel to send events to when wait for number of samples is triggered."""


if resampler_config:
assert (
resampler_config.resampling_period <= size
Expand Down Expand Up @@ -169,6 +176,9 @@ async def _run_impl(self) -> None:
Raises:
asyncio.CancelledError: if the MovingWindow task is cancelled.
"""
received_samples_count = 0
wait_for_samples_sender = self._wait_for_samples_channel.new_sender()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess once a MW was stopped there is no way to start it again, right? Otherwise the sender should probably be created in the constructor instead to avoid leaking sender objects.


try:
async for sample in self._resampled_data_recv:
_logger.debug("Received new sample: %s", sample)
Expand All @@ -177,12 +187,48 @@ async def _run_impl(self) -> None:
else:
self._buffer.update(sample)

# count the number of samples and send out a trigger when it matches
# the number of samples to wait for.
received_samples_count += 1
if self._wait_for_num_samples != 0:
if received_samples_count == self._wait_for_num_samples:
received_samples_count = 0
await wait_for_samples_sender.send(None)
Comment on lines +195 to +196

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it's free and even when the user should know it beforehand anyway, we could also send the number of samples received instead of None, maybe it could become a handy shortcut.

Suggested change
received_samples_count = 0
await wait_for_samples_sender.send(None)
await wait_for_samples_sender.send(received_samples_count)
received_samples_count = 0


except asyncio.CancelledError:
_logger.info("MovingWindow task has been cancelled.")
raise

_logger.error("Channel has been closed")

def set_sample_counter(self, num_samples: int) -> None:
"""Set the number of samples to wait for until the sample counter triggers.

Args:
num_samples: The number of samples to wait for.

Raises:
ValueError: if the number of samples is less than or equal to zero.
"""
if num_samples <= 0:
raise ValueError(
"The number of samples to wait for should be greater than zero."
)
self._wait_for_num_samples = num_samples
Comment on lines +204 to +217

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, this indirectly enables sending the events if num_samples > 0, right? If so I would rename this method to something that makes it easier to realize that happens. What about:

@property
def is_wait_for_samples_event_enabled(self) -> bool:
    # Returns `self._wait_for_num_samples != 0`

def enable_wait_for_samples_event(self, num_samples: int) -> bool:
    # Same as above but check for `num_samples > 0`, return whether it was enabled before

def disable_wait_for_samples_event(self) -> bool:
    # Set `num_samples = 0`, return whether it was enabled before


def new_sample_count_receiver(self) -> Receiver[None]:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the above suggestion is applied, I would rename this for consistency:

Suggested change
def new_sample_count_receiver(self) -> Receiver[None]:
def new_wait_for_samples_event_receiver(self) -> Receiver[None]:

We could go with something different than wait_for_samples_event as it is a bit long, but the 4 methods should use the same term IMHO.

"""Wait until a given number of samples has been received.

The sample counter is updated irrespective of whether this
method is called or not. Thus this might trigger when
a smaller number of samples than the given number has been
updated.

Returns:
A receiver that triggers after a number of samples arrived.
"""
return self._wait_for_samples_channel.new_receiver()

async def stop(self) -> None:
"""Cancel the running tasks and stop the MovingWindow."""
await cancel_and_await(self._update_window_task)
Expand Down
21 changes: 21 additions & 0 deletions tests/timeseries/test_moving_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,27 @@ async def test_window_size() -> None:
assert len(window) == 5


@pytest.mark.parametrize("samples_to_wait_for", [-1, 0, 1, 10])
async def test_wait_for_samples(samples_to_wait_for: int) -> None:
"""Test waiting for samples."""
window, sender = init_moving_window(timedelta(seconds=1))

if samples_to_wait_for <= 0:
with pytest.raises(ValueError):
window.set_sample_counter(samples_to_wait_for)
return
sample_count_recv = window.new_sample_count_receiver()

window.set_sample_counter(samples_to_wait_for)

# asyncio.create_task(push_data_delayed())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# asyncio.create_task(push_data_delayed())

?

for i in range(0, samples_to_wait_for):
await sender.send(
Sample(datetime.now(tz=timezone.utc) + timedelta(seconds=i), 1.0)
)
await sample_count_recv.receive()
Comment on lines +121 to +125

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are some failures that this will not detect or detect wrongly. For example if you trigger before the samples_to_wait_for this will succeed too. If the event doesn't trigger, this test will hang forever, which might be quite annoying.

Maybe could check for the internal counter to verify it is incremented? If you also add my suggestion to return the number of samples received you could add the check here too.

Maybe you can also move the await sample_count_recv.receive() to a different task so you can check it didn't fire before you sent enough samples and that it was fired after you sent all the expected samples but without blocking the main thread if it fails?

Maybe you could also push different sample values and then check that the state of the moving window is what it would be expected after receiving all that samples, so if the even triggered before that check should also fail.



# pylint: disable=redefined-outer-name
async def test_resampling_window(fake_time: time_machine.Coordinates) -> None:
"""Test resampling in MovingWindow."""
Expand Down