diff --git a/kstreams/backends/kafka.py b/kstreams/backends/kafka.py index 37f50de6..f538a90c 100644 --- a/kstreams/backends/kafka.py +++ b/kstreams/backends/kafka.py @@ -26,8 +26,25 @@ class Kafka(BaseModel): It uses pydantic internally. + Attributes: + bootstrap_servers: kafka list of `hostname:port` + security_protocol: Protocol used to communicate with brokers + ssl_context: a python std `ssl.SSLContext` instance, you can generate + it with `create_ssl_context` + or `create_ssl_context_from_mem` + sasl_mechanism: Authentication mechanism when `security_protocol` is configured + for `SASL_PLAINTEXT` or `SASL_SSL` + sasl_plain_username: username for sasl PLAIN authentication + sasl_plain_password: password for sasl PLAIN authentication + sasl_oauth_token_provider: smth + + Raises: + ValidationError: a `pydantic.ValidationError` exception + + ## PLAINTEXT + !!! Example - ```python title="Backend with PLAINTEXT" + ```python from kstreams.backends.kafka import Kafka from kstreams import create_engine, Stream @@ -35,8 +52,10 @@ class Kafka(BaseModel): stream_engine = create_engine(title="my-stream-engine", backend=backend) ``` + ## SSL + !!! Example - ```python title="Backend with SSL" + ```python title="Create SSL context" import ssl from kstreams.backends.kafka import Kafka @@ -44,13 +63,13 @@ class Kafka(BaseModel): def get_ssl_context() -> ssl.SSLContext: - # SSL context can also be created from mem: - # https://kpn.github.io/kstreams/utils/#kstreams.utils.create_ssl_context_from_mem return utils.create_ssl_context( - certdata="path/to/client-certificate", - keydata="path/to/client-private-key", - cadata="path/to/ca-bundle", # Default None - password="password-to-load-certificate-chain" # Default None + cafile="certificate-authority-file-path", + capath="points-to-directory-with-several-ca-certificates", + cadata="same-as-cafile-but-ASCII-or-bytes-format", + certfile="client-certificate-file-name", + keyfile="client-private-key-file-name", + password="password-to-load-certificate-chain", ) backend = Kafka( @@ -62,20 +81,36 @@ def get_ssl_context() -> ssl.SSLContext: stream_engine = create_engine(title="my-stream-engine", backend=backend) ``` - Attributes: - bootstrap_servers: kafka list of `hostname:port` - security_protocol: Protocol used to communicate with brokers - ssl_context: a python std `ssl.SSLContext` instance, you can generate - it with `create_ssl_context` - or `create_ssl_context_from_mem` - sasl_mechanism: Authentication mechanism when `security_protocol` is configured - for `SASL_PLAINTEXT` or `SASL_SSL` - sasl_plain_username: username for sasl PLAIN authentication - sasl_plain_password: password for sasl PLAIN authentication - sasl_oauth_token_provider: smth + !!! note + Check [create ssl context util](https://kpn.github.io/kstreams/utils/#kstreams.utils.create_ssl_context) - Raises: - ValidationError: a `pydantic.ValidationError` exception + !!! Example + ```python title="Create SSL context from memory" + import ssl + + from kstreams.backends.kafka import Kafka + from kstreams import create_engine, utils, Stream + + + def get_ssl_context() -> ssl.SSLContext: + return utils.create_ssl_context_from_mem( + cadata="ca-certificates-as-unicode", + certdata="client-certificate-as-unicode", + keydata="client-private-key-as-unicode", + password="optional-password-to-load-certificate-chain", + ) + + backend = Kafka( + bootstrap_servers=["localhost:9094"], + security_protocol="SSL", + ssl_context=get_ssl_context(), + ) + + stream_engine = create_engine(title="my-stream-engine", backend=backend) + ``` + + !!! note + Check [create ssl context from memerory util](https://kpn.github.io/kstreams/utils/#kstreams.utils.create_ssl_context_from_mem) """ bootstrap_servers: List[str] = ["localhost:9092"]