diff --git a/trio/_channel.py b/trio/_channel.py index dac7935c0c..66447661e1 100644 --- a/trio/_channel.py +++ b/trio/_channel.py @@ -130,10 +130,16 @@ def statistics(self): return self._state.statistics() @enable_ki_protection - def send_nowait(self, value): + def send_nowait(self, value, *, replace_on_overflow=False): """Like `~trio.abc.SendChannel.send`, but if the channel's buffer is full, raises `WouldBlock` instead of blocking. + Args: + replace_on_overflow (bool): If true, instead of raising `WouldBlock` + when the channel's buffer is full, replace the most recently + buffered item with ``value``. This is expected to be combined with + ``max_buffer_size`` > 0, since otherwise there can never be a buffered + item to replace. """ if self._closed: raise trio.ClosedResourceError @@ -146,6 +152,9 @@ def send_nowait(self, value): trio.lowlevel.reschedule(task, Value(value)) elif len(self._state.data) < self._state.max_buffer_size: self._state.data.append(value) + elif replace_on_overflow: + if len(self._state.data): + self._state.data[-1] = value else: raise trio.WouldBlock diff --git a/trio/tests/test_channel.py b/trio/tests/test_channel.py index b43466dd7d..3435deb93a 100644 --- a/trio/tests/test_channel.py +++ b/trio/tests/test_channel.py @@ -350,3 +350,22 @@ async def do_send(s, v): assert await r.receive() == 1 with pytest.raises(trio.WouldBlock): r.receive_nowait() + + +async def test_send_channel_replace_on_overflow(): + s, r = open_memory_channel(2) + s.send_nowait(1, replace_on_overflow=True) + s.send_nowait(2, replace_on_overflow=True) + s.send_nowait(3, replace_on_overflow=True) + assert await r.receive() == 1 + assert await r.receive() == 3 + with pytest.raises(trio.WouldBlock): + r.receive_nowait() + + +async def test_send_channel_replace_on_overflow_unbuffered(): + s, r = open_memory_channel(0) + s.send_nowait(1, replace_on_overflow=True) + s.send_nowait(2, replace_on_overflow=True) + with pytest.raises(trio.WouldBlock): + r.receive_nowait()