Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid partition revocation on paused consumers #391

Merged
merged 6 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions arroyo/processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,11 @@ def _run_once(self) -> None:
self.__consumer.pause([*self.__consumer.tell().keys()])
self.__is_paused = True

elif self.__is_paused:
# A paused consumer should still poll periodically to avoid it's partitions
# getting revoked by the broker after reaching the max.poll.interval.ms
# Polling a paused consumer should never yield a message.
assert self.__consumer.poll(0.1) is None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we not want to sleep in this elif block?

Copy link
Member Author

@lynnagara lynnagara Nov 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's already effectively got a sleep for 0.1 in there

else:
time.sleep(0.01)

Expand Down
75 changes: 63 additions & 12 deletions tests/backends/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from arroyo.commit import IMMEDIATE, Commit
from arroyo.errors import ConsumerError, EndOfPartition
from arroyo.processing.processor import StreamProcessor
from arroyo.processing.strategies.abstract import MessageRejected
from arroyo.types import BrokerValue, Partition, Topic
from tests.backends.mixins import StreamsTestMixin

Expand Down Expand Up @@ -89,19 +90,27 @@ def get_consumer(
enable_end_of_partition: bool = True,
auto_offset_reset: str = "earliest",
strict_offset_reset: Optional[bool] = None,
max_poll_interval_ms: Optional[int] = None
) -> KafkaConsumer:
return KafkaConsumer(
{
**self.configuration,
"auto.offset.reset": auto_offset_reset,
"arroyo.strict.offset.reset": strict_offset_reset,
"enable.auto.commit": "false",
"enable.auto.offset.store": "false",
"enable.partition.eof": enable_end_of_partition,
"group.id": group if group is not None else uuid.uuid1().hex,
"session.timeout.ms": 10000,
},
)
configuration = {
**self.configuration,
"auto.offset.reset": auto_offset_reset,
"arroyo.strict.offset.reset": strict_offset_reset,
"enable.auto.commit": "false",
"enable.auto.offset.store": "false",
"enable.partition.eof": enable_end_of_partition,
"group.id": group if group is not None else uuid.uuid1().hex,
"session.timeout.ms": 10000,
}

if max_poll_interval_ms:
configuration["max.poll.interval.ms"] = max_poll_interval_ms

# session timeout cannot be higher than max poll interval
if max_poll_interval_ms < 45000:
configuration["session.timeout.ms"] = max_poll_interval_ms

return KafkaConsumer(configuration)

def get_producer(self) -> KafkaProducer:
return KafkaProducer(self.configuration)
Expand Down Expand Up @@ -193,6 +202,48 @@ def test_consumer_stream_processor_shutdown(self) -> None:
with pytest.raises(RuntimeError):
processor.run()

def test_consumer_polls_when_paused(self) -> None:
strategy = mock.Mock()
factory = mock.Mock()
factory.create_with_partitions.return_value = strategy

poll_interval = 6000

with self.get_topic() as topic:
with closing(self.get_producer()) as producer, closing(self.get_consumer(max_poll_interval_ms=poll_interval)) as consumer:
producer.produce(topic, next(self.get_payloads())).result(5.0)

processor = StreamProcessor(consumer, topic, factory, IMMEDIATE)

# Wait for the consumer to subscribe and first message to be processed
for _ in range(1000):
processor._run_once()
if strategy.submit.call_count > 0:
break
time.sleep(0.1)

assert strategy.submit.call_count == 1

# Now we start raising message rejected. the next produced message doesn't get processed
strategy.submit.side_effect = MessageRejected()

producer.produce(topic, next(self.get_payloads())).result(5.0)
processor._run_once()
assert consumer.paused() == []
# After ~5 seconds the consumer should be paused. On the next two calls to run_once it
# will pause itself, then poll the consumer.
time.sleep(5.0)
processor._run_once()
processor._run_once()
assert len(consumer.paused()) == 1

# Now we exceed the poll interval. After that we stop raising MessageRejected and
# the consumer unpauses itself.
time.sleep(2.0)
strategy.submit.side_effect = None
processor._run_once()
assert consumer.paused() == []


def test_commit_codec() -> None:
commit = Commit(
Expand Down
Loading