Skip to content

Commit

Permalink
[3.11] Restore FlowControlDataQueue class (#9963)
Browse files Browse the repository at this point in the history
  • Loading branch information
bdraco authored Nov 18, 2024
1 parent 97be030 commit 902e7b2
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 0 deletions.
3 changes: 3 additions & 0 deletions CHANGES/9963.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Restored the ``FlowControlDataQueue`` class -- by :user:`bdraco`.

This class is no longer used internally, and will be permanently removed in the next major version.
2 changes: 2 additions & 0 deletions aiohttp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
EMPTY_PAYLOAD as EMPTY_PAYLOAD,
DataQueue as DataQueue,
EofStream as EofStream,
FlowControlDataQueue as FlowControlDataQueue,
StreamReader as StreamReader,
)
from .tracing import (
Expand Down Expand Up @@ -148,6 +149,7 @@
"ConnectionTimeoutError",
"ContentTypeError",
"Fingerprint",
"FlowControlDataQueue",
"InvalidURL",
"InvalidUrlClientError",
"InvalidUrlRedirectClientError",
Expand Down
43 changes: 43 additions & 0 deletions aiohttp/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,3 +677,46 @@ async def read(self) -> _T:

def __aiter__(self) -> AsyncStreamIterator[_T]:
return AsyncStreamIterator(self.read)


class FlowControlDataQueue(DataQueue[_T]):
"""FlowControlDataQueue resumes and pauses an underlying stream.
It is a destination for parsed data.
This class is deprecated and will be removed in version 4.0.
"""

def __init__(
self, protocol: BaseProtocol, limit: int, *, loop: asyncio.AbstractEventLoop
) -> None:
super().__init__(loop=loop)
self._size = 0
self._protocol = protocol
self._limit = limit * 2

def feed_data(self, data: _T, size: int = 0) -> None:
super().feed_data(data, size)
self._size += size

if self._size > self._limit and not self._protocol._reading_paused:
self._protocol.pause_reading()

async def read(self) -> _T:
if not self._buffer and not self._eof:
assert not self._waiter
self._waiter = self._loop.create_future()
try:
await self._waiter
except (asyncio.CancelledError, asyncio.TimeoutError):
self._waiter = None
raise
if self._buffer:
data, size = self._buffer.popleft()
self._size -= size
if self._size < self._limit and self._protocol._reading_paused:
self._protocol.resume_reading()
return data
if self._exception is not None:
raise self._exception
raise EofStream
77 changes: 77 additions & 0 deletions tests/test_flowcontrol_streams.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from unittest import mock

import pytest
Expand All @@ -15,6 +16,13 @@ def stream(loop, protocol):
return streams.StreamReader(protocol, limit=1, loop=loop)


@pytest.fixture
def buffer(loop, protocol: mock.Mock) -> streams.FlowControlDataQueue:
out = streams.FlowControlDataQueue(protocol, limit=1, loop=loop)
out._allow_pause = True
return out


class TestFlowControlStreamReader:
async def test_read(self, stream) -> None:
stream.feed_data(b"da", 2)
Expand Down Expand Up @@ -103,3 +111,72 @@ async def test_read_nowait(self, stream) -> None:
res = stream.read_nowait(5)
assert res == b""
assert stream._protocol.resume_reading.call_count == 1 # type: ignore[attr-defined]


async def test_flow_control_data_queue_waiter_cancelled(
buffer: streams.FlowControlDataQueue,
) -> None:
"""Test that the waiter is cancelled it is cleared."""
task = asyncio.create_task(buffer.read())
await asyncio.sleep(0)
assert buffer._waiter is not None
buffer._waiter.cancel()

with pytest.raises(asyncio.CancelledError):
await task
assert buffer._waiter is None


async def test_flow_control_data_queue_has_buffer(
buffer: streams.FlowControlDataQueue,
) -> None:
"""Test reading from the buffer."""
data = object()
buffer.feed_data(data, 100)
assert buffer._size == 100
read_data = await buffer.read()
assert read_data is data
assert buffer._size == 0


async def test_flow_control_data_queue_read_with_exception(
buffer: streams.FlowControlDataQueue,
) -> None:
"""Test reading when the buffer is empty and an exception is set."""
buffer.set_exception(ValueError("unique_string"))
with pytest.raises(ValueError, match="unique_string"):
await buffer.read()


def test_flow_control_data_queue_feed_pause(
buffer: streams.FlowControlDataQueue,
) -> None:
"""Test feeding data and pausing the reader."""
buffer._protocol._reading_paused = False
buffer.feed_data(object(), 100)
assert buffer._protocol.pause_reading.called

buffer._protocol._reading_paused = True
buffer._protocol.pause_reading.reset_mock()
buffer.feed_data(object(), 100)
assert not buffer._protocol.pause_reading.called


async def test_flow_control_data_queue_resume_on_read(
buffer: streams.FlowControlDataQueue,
) -> None:
"""Test that the reader is resumed when reading."""
buffer.feed_data(object(), 100)

buffer._protocol._reading_paused = True
await buffer.read()
assert buffer._protocol.resume_reading.called


async def test_flow_control_data_queue_read_eof(
buffer: streams.FlowControlDataQueue,
) -> None:
"""Test that reading after eof raises EofStream."""
buffer.feed_eof()
with pytest.raises(streams.EofStream):
await buffer.read()

0 comments on commit 902e7b2

Please sign in to comment.