-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add method to MovingWindow that waits for a number of samples #387
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -138,6 +138,13 @@ def __init__( # pylint: disable=too-many-arguments | |||||||||||||
self._resampler_sender: Sender[Sample] | None = None | ||||||||||||||
self._resampler_task: asyncio.Task[None] | None = None | ||||||||||||||
|
||||||||||||||
self._wait_for_num_samples: int = 0 | ||||||||||||||
"""The number of samples to wait for before the wait_for_num_samples channels | ||||||||||||||
sends out an event.""" | ||||||||||||||
self._wait_for_samples_channel = Broadcast[None]( | ||||||||||||||
"Wait for number of samples channel." | ||||||||||||||
) | ||||||||||||||
Comment on lines
+144
to
+146
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So the text here is for debugging purposes only, right @shsms? I'd suggest using a shorter string for that and using the long string as documentation for the variable:
Suggested change
|
||||||||||||||
|
||||||||||||||
if resampler_config: | ||||||||||||||
assert ( | ||||||||||||||
resampler_config.resampling_period <= size | ||||||||||||||
|
@@ -169,6 +176,9 @@ async def _run_impl(self) -> None: | |||||||||||||
Raises: | ||||||||||||||
asyncio.CancelledError: if the MovingWindow task is cancelled. | ||||||||||||||
""" | ||||||||||||||
received_samples_count = 0 | ||||||||||||||
wait_for_samples_sender = self._wait_for_samples_channel.new_sender() | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess once a MW was stopped there is no way to start it again, right? Otherwise the sender should probably be created in the constructor instead to avoid leaking sender objects. |
||||||||||||||
|
||||||||||||||
try: | ||||||||||||||
async for sample in self._resampled_data_recv: | ||||||||||||||
_logger.debug("Received new sample: %s", sample) | ||||||||||||||
|
@@ -177,12 +187,48 @@ async def _run_impl(self) -> None: | |||||||||||||
else: | ||||||||||||||
self._buffer.update(sample) | ||||||||||||||
|
||||||||||||||
# count the number of samples and send out a trigger when it matches | ||||||||||||||
# the number of samples to wait for. | ||||||||||||||
received_samples_count += 1 | ||||||||||||||
if self._wait_for_num_samples != 0: | ||||||||||||||
if received_samples_count == self._wait_for_num_samples: | ||||||||||||||
received_samples_count = 0 | ||||||||||||||
await wait_for_samples_sender.send(None) | ||||||||||||||
Comment on lines
+195
to
+196
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since it's free and even when the user should know it beforehand anyway, we could also send the number of samples received instead of
Suggested change
|
||||||||||||||
|
||||||||||||||
except asyncio.CancelledError: | ||||||||||||||
_logger.info("MovingWindow task has been cancelled.") | ||||||||||||||
raise | ||||||||||||||
|
||||||||||||||
_logger.error("Channel has been closed") | ||||||||||||||
|
||||||||||||||
def set_sample_counter(self, num_samples: int) -> None: | ||||||||||||||
"""Set the number of samples to wait for until the sample counter triggers. | ||||||||||||||
|
||||||||||||||
Args: | ||||||||||||||
num_samples: The number of samples to wait for. | ||||||||||||||
|
||||||||||||||
Raises: | ||||||||||||||
ValueError: if the number of samples is less than or equal to zero. | ||||||||||||||
""" | ||||||||||||||
if num_samples <= 0: | ||||||||||||||
raise ValueError( | ||||||||||||||
"The number of samples to wait for should be greater than zero." | ||||||||||||||
) | ||||||||||||||
self._wait_for_num_samples = num_samples | ||||||||||||||
Comment on lines
+204
to
+217
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I understand correctly, this indirectly enables sending the events if @property
def is_wait_for_samples_event_enabled(self) -> bool:
# Returns `self._wait_for_num_samples != 0`
def enable_wait_for_samples_event(self, num_samples: int) -> bool:
# Same as above but check for `num_samples > 0`, return whether it was enabled before
def disable_wait_for_samples_event(self) -> bool:
# Set `num_samples = 0`, return whether it was enabled before |
||||||||||||||
|
||||||||||||||
def new_sample_count_receiver(self) -> Receiver[None]: | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the above suggestion is applied, I would rename this for consistency:
Suggested change
We could go with something different than |
||||||||||||||
"""Wait until a given number of samples has been received. | ||||||||||||||
|
||||||||||||||
The sample counter is updated irrespective of whether this | ||||||||||||||
method is called or not. Thus this might trigger when | ||||||||||||||
a smaller number of samples than the given number has been | ||||||||||||||
updated. | ||||||||||||||
|
||||||||||||||
Returns: | ||||||||||||||
A receiver that triggers after a number of samples arrived. | ||||||||||||||
""" | ||||||||||||||
return self._wait_for_samples_channel.new_receiver() | ||||||||||||||
|
||||||||||||||
async def stop(self) -> None: | ||||||||||||||
"""Cancel the running tasks and stop the MovingWindow.""" | ||||||||||||||
await cancel_and_await(self._update_window_task) | ||||||||||||||
|
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -104,6 +104,27 @@ async def test_window_size() -> None: | |||
assert len(window) == 5 | ||||
|
||||
|
||||
@pytest.mark.parametrize("samples_to_wait_for", [-1, 0, 1, 10]) | ||||
async def test_wait_for_samples(samples_to_wait_for: int) -> None: | ||||
"""Test waiting for samples.""" | ||||
window, sender = init_moving_window(timedelta(seconds=1)) | ||||
|
||||
if samples_to_wait_for <= 0: | ||||
with pytest.raises(ValueError): | ||||
window.set_sample_counter(samples_to_wait_for) | ||||
return | ||||
sample_count_recv = window.new_sample_count_receiver() | ||||
|
||||
window.set_sample_counter(samples_to_wait_for) | ||||
|
||||
# asyncio.create_task(push_data_delayed()) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
? |
||||
for i in range(0, samples_to_wait_for): | ||||
await sender.send( | ||||
Sample(datetime.now(tz=timezone.utc) + timedelta(seconds=i), 1.0) | ||||
) | ||||
await sample_count_recv.receive() | ||||
Comment on lines
+121
to
+125
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are some failures that this will not detect or detect wrongly. For example if you trigger before the Maybe could check for the internal counter to verify it is incremented? If you also add my suggestion to return the number of samples received you could add the check here too. Maybe you can also move the Maybe you could also push different sample values and then check that the state of the moving window is what it would be expected after receiving all that samples, so if the even triggered before that check should also fail. |
||||
|
||||
|
||||
# pylint: disable=redefined-outer-name | ||||
async def test_resampling_window(fake_time: time_machine.Coordinates) -> None: | ||||
"""Test resampling in MovingWindow.""" | ||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend using nouns for non-boolean attributes: