From cf64d54a01d1db3ed90c3413381bb581ca94eedc Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Thu, 21 Nov 2024 09:06:31 -0800 Subject: [PATCH 1/6] fix: avoid partition revocation on paused consumers it is valid for consumers to be paused for periods of time that exceed the max.poll.interval.ms. we need to continue polling the consumer in this scenario so that the partitions do not get revoked. librdkafka will not yield any new messages when polling a paused consumer --- arroyo/processing/processor.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/arroyo/processing/processor.py b/arroyo/processing/processor.py index 621f8344..8ed02feb 100644 --- a/arroyo/processing/processor.py +++ b/arroyo/processing/processor.py @@ -435,6 +435,10 @@ def _run_once(self) -> None: self.__consumer.pause([*self.__consumer.tell().keys()]) self.__is_paused = True + elif self.__is_paused: + # A paused consumer should still poll periodically to avoid it's partitions + # getting revoked by the broker after reaching thte max.poll.interval.ms + self.__consumer.poll(0.1) else: time.sleep(0.01) From 96c8c1dd62086feaaab540dd536ab5724c8ae185 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Thu, 21 Nov 2024 09:33:28 -0800 Subject: [PATCH 2/6] assert non --- arroyo/processing/processor.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/arroyo/processing/processor.py b/arroyo/processing/processor.py index 8ed02feb..aa680254 100644 --- a/arroyo/processing/processor.py +++ b/arroyo/processing/processor.py @@ -438,7 +438,9 @@ def _run_once(self) -> None: elif self.__is_paused: # A paused consumer should still poll periodically to avoid it's partitions # getting revoked by the broker after reaching thte max.poll.interval.ms - self.__consumer.poll(0.1) + # Polling a paused consumer should never yield a message. + msg = self.__consumer.poll(0.1) + assert msg is None else: time.sleep(0.01) From f4e12ca9fcd1a57f174e8d6cc0d799f59a4a2ff4 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Thu, 21 Nov 2024 09:41:06 -0800 Subject: [PATCH 3/6] cleanup --- arroyo/processing/processor.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/arroyo/processing/processor.py b/arroyo/processing/processor.py index aa680254..bd63269f 100644 --- a/arroyo/processing/processor.py +++ b/arroyo/processing/processor.py @@ -439,8 +439,7 @@ def _run_once(self) -> None: # A paused consumer should still poll periodically to avoid it's partitions # getting revoked by the broker after reaching thte max.poll.interval.ms # Polling a paused consumer should never yield a message. - msg = self.__consumer.poll(0.1) - assert msg is None + assert self.__consumer.poll(0.1) is None else: time.sleep(0.01) From 15d6403ef7b7ae2fd9216660b6a2209770005fbb Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Thu, 21 Nov 2024 09:41:29 -0800 Subject: [PATCH 4/6] typo --- arroyo/processing/processor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arroyo/processing/processor.py b/arroyo/processing/processor.py index bd63269f..5bacfd21 100644 --- a/arroyo/processing/processor.py +++ b/arroyo/processing/processor.py @@ -437,7 +437,7 @@ def _run_once(self) -> None: elif self.__is_paused: # A paused consumer should still poll periodically to avoid it's partitions - # getting revoked by the broker after reaching thte max.poll.interval.ms + # getting revoked by the broker after reaching the max.poll.interval.ms # Polling a paused consumer should never yield a message. assert self.__consumer.poll(0.1) is None else: From a2c56634d997c05680fdf19e25928cd6e8b8fe0b Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Thu, 21 Nov 2024 17:40:51 -0800 Subject: [PATCH 5/6] add a test --- tests/backends/test_kafka.py | 76 ++++++++++++++++++++++++++++++------ 1 file changed, 63 insertions(+), 13 deletions(-) diff --git a/tests/backends/test_kafka.py b/tests/backends/test_kafka.py index 296766b6..6160d4a2 100644 --- a/tests/backends/test_kafka.py +++ b/tests/backends/test_kafka.py @@ -19,6 +19,7 @@ from arroyo.commit import IMMEDIATE, Commit from arroyo.errors import ConsumerError, EndOfPartition from arroyo.processing.processor import StreamProcessor +from arroyo.processing.strategies.abstract import MessageRejected from arroyo.types import BrokerValue, Partition, Topic from tests.backends.mixins import StreamsTestMixin @@ -86,22 +87,28 @@ def get_topic(self, partitions: int = 1) -> Iterator[Topic]: def get_consumer( self, group: Optional[str] = None, - enable_end_of_partition: bool = True, auto_offset_reset: str = "earliest", strict_offset_reset: Optional[bool] = None, + max_poll_interval_ms: Optional[float] = None ) -> KafkaConsumer: - return KafkaConsumer( - { - **self.configuration, - "auto.offset.reset": auto_offset_reset, - "arroyo.strict.offset.reset": strict_offset_reset, - "enable.auto.commit": "false", - "enable.auto.offset.store": "false", - "enable.partition.eof": enable_end_of_partition, - "group.id": group if group is not None else uuid.uuid1().hex, - "session.timeout.ms": 10000, - }, - ) + configuration = { + **self.configuration, + "auto.offset.reset": auto_offset_reset, + "arroyo.strict.offset.reset": strict_offset_reset, + "enable.auto.commit": "false", + "enable.auto.offset.store": "false", + "group.id": group if group is not None else uuid.uuid1().hex, + "session.timeout.ms": 10000, + } + + if max_poll_interval_ms: + configuration["max.poll.interval.ms"] = max_poll_interval_ms + + # session timeout cannot be higher than max poll interval + if max_poll_interval_ms < 45000: + configuration["session.timeout.ms"] = max_poll_interval_ms + + return KafkaConsumer(configuration) def get_producer(self) -> KafkaProducer: return KafkaProducer(self.configuration) @@ -193,6 +200,49 @@ def test_consumer_stream_processor_shutdown(self) -> None: with pytest.raises(RuntimeError): processor.run() + def test_consumer_polls_when_paused(self) -> None: + strategy = mock.Mock() + factory = mock.Mock() + factory.create_with_partitions.return_value = strategy + + poll_interval = 6000 + + with self.get_topic() as topic: + with closing(self.get_producer()) as producer, closing(self.get_consumer(max_poll_interval_ms=poll_interval)) as consumer: + producer.produce(topic, next(self.get_payloads())).result(5.0) + + processor = StreamProcessor(consumer, topic, factory, IMMEDIATE) + + # Wait for the consumer to subscribe and first message to be processed + for _ in range(1000): + processor._run_once() + if strategy.submit.call_count > 0: + break + time.sleep(0.1) + + assert strategy.submit.call_count == 1 + + # Now we start raising message rejected. the next produced message doesn't get processed + strategy.submit.side_effect = MessageRejected() + + producer.produce(topic, next(self.get_payloads())).result(5.0) + processor._run_once() + assert consumer.paused() == [] + # After ~5 seconds the consumer should be paused. On the next two calls to run_once it + # will pause itself, then poll the consumer. + time.sleep(5.0) + processor._run_once() + processor._run_once() + assert len(consumer.paused()) == 1 + print("!!!!!") + + # Now we exceed the poll interval. After that we stop raising MessageRejected and + # the consumer unpauses itself. + time.sleep(2.0) + strategy.submit.side_effect = None + processor._run_once() + assert consumer.paused() == [] + def test_commit_codec() -> None: commit = Commit( From d19d0ebb68ac4ae25049330eb207843194edfa37 Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Thu, 21 Nov 2024 20:10:14 -0800 Subject: [PATCH 6/6] fixup --- tests/backends/test_kafka.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/backends/test_kafka.py b/tests/backends/test_kafka.py index 6160d4a2..5b0851f5 100644 --- a/tests/backends/test_kafka.py +++ b/tests/backends/test_kafka.py @@ -87,9 +87,10 @@ def get_topic(self, partitions: int = 1) -> Iterator[Topic]: def get_consumer( self, group: Optional[str] = None, + enable_end_of_partition: bool = True, auto_offset_reset: str = "earliest", strict_offset_reset: Optional[bool] = None, - max_poll_interval_ms: Optional[float] = None + max_poll_interval_ms: Optional[int] = None ) -> KafkaConsumer: configuration = { **self.configuration, @@ -97,6 +98,7 @@ def get_consumer( "arroyo.strict.offset.reset": strict_offset_reset, "enable.auto.commit": "false", "enable.auto.offset.store": "false", + "enable.partition.eof": enable_end_of_partition, "group.id": group if group is not None else uuid.uuid1().hex, "session.timeout.ms": 10000, } @@ -234,7 +236,6 @@ def test_consumer_polls_when_paused(self) -> None: processor._run_once() processor._run_once() assert len(consumer.paused()) == 1 - print("!!!!!") # Now we exceed the poll interval. After that we stop raising MessageRejected and # the consumer unpauses itself.