diff --git a/ahd2fhir/config.py b/ahd2fhir/config.py index d6f2b7e..e7cd1ab 100644 --- a/ahd2fhir/config.py +++ b/ahd2fhir/config.py @@ -38,3 +38,4 @@ class Settings(BaseSettings): # that value. It can be adjusted even lower to control # the expected time for normal rebalances. kafka_heartbeat_interval_ms: int = 3000 + kafka_auto_offset_reset: str = "earliest" diff --git a/ahd2fhir/kafka_setup.py b/ahd2fhir/kafka_setup.py index aeb5a0e..398ad62 100644 --- a/ahd2fhir/kafka_setup.py +++ b/ahd2fhir/kafka_setup.py @@ -55,6 +55,7 @@ async def initialize_kafka(handler: ResourceHandler): # pragma: no cover settings.kafka_input_topic, loop=loop, bootstrap_servers=settings.bootstrap_servers, + auto_offset_reset=settings.kafka_auto_offset_reset, group_id=group_id, max_poll_interval_ms=settings.kafka_max_poll_interval_ms, # 600 s = 10 min max_poll_records=settings.kafka_max_poll_records,