Skip to content
This repository has been archived by the owner on Jan 14, 2025. It is now read-only.

Commit

Permalink
feat: include SASL security settings for Kafka (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
uklft authored Jul 6, 2021
1 parent 01771fa commit 193d9f6
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 12 deletions.
28 changes: 26 additions & 2 deletions ahd2fhir/config.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from os import path

from aiokafka.helpers import create_ssl_context
from pydantic import BaseSettings
from pydantic import BaseSettings, validator

TLS_ROOT_DIR = "/opt/kafka-certs/"

# pylint: disable=E0213


class KafkaConsumerSettings(BaseSettings):
group_id: str = "ahd2fhir"
Expand Down Expand Up @@ -40,12 +42,34 @@ class KafkaSettings(BaseSettings):

# Kafka-related settings
bootstrap_servers: str = "localhost:9092"
security_protocol: str = "PLAINTEXT"
max_message_size_bytes: int = 5242880 # 5 MiB

# SSL Settings
security_protocol: str = "PLAINTEXT"
ssl_cafile: str = path.join(TLS_ROOT_DIR, "ca.crt")
ssl_certfile: str = path.join(TLS_ROOT_DIR, "user.crt")
ssl_keyfile: str = path.join(TLS_ROOT_DIR, "user.key")
# SASL Settings
sasl_mechanism: str = None
sasl_plain_username: str = None
sasl_plain_password: str = None

# For using SASL without SSL certificates the *file args need to be None.
# Otherwise AIOKafkaClient will try to parse them even if they
# consist of an empty string.
@validator("ssl_cafile", "ssl_certfile", "ssl_keyfile")
def parse_to_none(cls, v):
return None if v in ["", "None", 0, False] else v

def get_connection_context(self):
return {
"ssl_context": self.get_ssl_context(),
"bootstrap_servers": self.bootstrap_servers,
"security_protocol": self.security_protocol,
"sasl_plain_username": self.sasl_plain_username,
"sasl_plain_password": self.sasl_plain_password,
"sasl_mechanism": self.sasl_mechanism,
}

class Config:
env_prefix = "kafka_"
Expand Down
13 changes: 3 additions & 10 deletions ahd2fhir/kafka_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ async def initialize_kafka(handler: ResourceHandler): # pragma: no cover
global resource_handler
resource_handler = handler

ssl_context = settings.kafka.get_ssl_context()
group_id = settings.kafka.consumer.group_id

logger.info(
Expand All @@ -33,22 +32,16 @@ async def initialize_kafka(handler: ResourceHandler): # pragma: no cover
)

global consumer
# TODO: could this setup be made cleaner by binding the settings.kafka_* to
# AIOKafkaConsumer's ctor args? So kafka_max_poll_records gets automatically
# turned into max_poll_records.

consumer = aiokafka.AIOKafkaConsumer(
settings.kafka.input_topic,
bootstrap_servers=settings.kafka.bootstrap_servers,
security_protocol=settings.kafka.security_protocol,
ssl_context=ssl_context,
**settings.kafka.get_connection_context(),
**settings.kafka.consumer.dict(),
)

global producer
producer = aiokafka.AIOKafkaProducer(
bootstrap_servers=settings.kafka.bootstrap_servers,
security_protocol=settings.kafka.security_protocol,
ssl_context=ssl_context,
**settings.kafka.get_connection_context(),
max_request_size=settings.kafka.max_message_size_bytes,
**settings.kafka.producer.dict(),
)
Expand Down

0 comments on commit 193d9f6

Please sign in to comment.