From facf0245b53ac363c2e8c738ff1180358896cebe Mon Sep 17 00:00:00 2001 From: Lyn Date: Wed, 13 Sep 2023 14:18:53 -0700 Subject: [PATCH] feat: Add supported configuration Add max.poll.interval.ms and session.timeout.ms Honestly not sure if this is even useful and we should drop this "validation" instead --- arroyo/backends/kafka/configuration.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/arroyo/backends/kafka/configuration.py b/arroyo/backends/kafka/configuration.py index ba262505..496eaa5c 100644 --- a/arroyo/backends/kafka/configuration.py +++ b/arroyo/backends/kafka/configuration.py @@ -1,6 +1,6 @@ import copy -import logging import json +import logging from typing import Any, Dict, Mapping, Optional, Sequence from arroyo.utils.logging import pylog_to_syslog_level @@ -42,6 +42,8 @@ "ssl.keystore.location", "ssl.keystore.password", "ssl.sigalgs.list", + "session.timeout.ms", + "max.poll.interval.ms", ) @@ -75,7 +77,9 @@ def build_kafka_configuration( def stats_callback(stats_json: str) -> None: stats = json.loads(stats_json) - get_metrics().gauge("arroyo.consumer.librdkafka.total_queue_size", stats.get("replyq", 0)) + get_metrics().gauge( + "arroyo.consumer.librdkafka.total_queue_size", stats.get("replyq", 0) + ) def build_kafka_consumer_configuration(