A configurable router for Kafka messages. Can also be used to replay selected messages from a dead letter queue (DLQ) topic. See the DLQ replay examples for more details.
What's with the name a-fake-rotor-true? The official explanation now is that it's just a random name 😊
We use the Confluent Kafka Python client, so the consumer and producer need to be configured with settings that will be recognised by that package. All configuration settings are provided by environment variables.
Any environment variable that is prefixed with KAFKA_CONSUMER_
is
considered to be a configuration setting for the consumer. This will then:
- Remove the
KAFKA_CONSUMER_
prefix. - Substitute any occurrence of "
_
" with ".
". - Change all uppercase letters to lowercase.
For example, an environment variable called
KAFKA_CONSUMER_BOOTSTRAP_SERVERS
that is set to host1:9092,host2:9092
will configure bootstrap.servers=host1:9092,host2:9092
in the consumer.
Please not that KAFKA_CONSUMER_ENABLE_AUTO_COMMIT
MUST be set to "false".
A similar method of configuration for the consumer, except using a prefix
of KAFKA_PRODUCER_
. So KAFKA_PRODUCER_BOOTSTRAP_SERVERS
will configure
bootstrap.servers
in the producer configuration.
Routing rules (of which at least one is required) have a prefix of
KAFKA_ROUTER_RULE_
. These are added and processed alphabetically, sorted
on the name of the environment variable. For more details on configuring
rules, see docs/rules.md and an example configuration
can be seen in the router
service in
docker-compose.yml
.
If no default is provided, the configuration item is mandatory.
Configuration | Default | Notes |
---|---|---|
KAFKA_ROUTER_DLQ_ID | "" | If not provided will be set to KAFKA_CONSUMER_CLIENT_ID (if present) or KAFKA_CONSUMER_GROUP_ID. |
KAFKA_ROUTER_DLQ_MODE | False | If True, obeys KAFKA_ROUTER_TIMEOUT_MS and will not commit on the consumer. |
KAFKA_ROUTER_DLQ_TOPIC_NAME | "" | Will attempt to write messages that no rules apply to this topic. If blank, the router warn no matches were found for the message and continue. |
KAFKA_ROUTER_DRY_RUN_MODE | False | If True AND KAFKA_ROUTER_DLQ_MODE is True then don't produce any messages. |
KAFKA_ROUTER_PROMETHEUS_PORT | 8000 | The port for Prometheus metrics. |
KAFKA_ROUTER_PROMETHEUS_PREFIX | "" | A prefix name to add to the prometheus metrics (e.g. "dev_"). |
KAFKA_ROUTER_TIMEOUT_MS | 500 | Exit if no message is available for consumption for the specified interval. Ignored unless KAFKA_ROUTER_DLQ_MODE is "True" |
LOG_LEVEL | WARN | Can be DEBUG, INFO, WARN or ERROR. |
SENTRY_DSN | "" | If configured in the environment, will be used to configure the DSN in Sentry. |
If a message is placed upon the DLQ topic, we follow the example as set in Confluent Cloud Dead Letter Queue. In this example we have set KAFKA_ROUTER_DLQ_ID to "router" and the headers will look something like this:
Key | Example Value | Description |
---|---|---|
__router.errors.topic | input | The name of the topic tha the message was consumed from. |
__router.errors.partition | 1 | The partition number that the consumed message was on. |
__router.errors.offset | 8583 | The offset of the consumed consumed message within the partition. |
__router.errors.exception.message | No matching rules for message. | The exception message of why the message is on the DLQ. |
__router.errors.exception.stacktrace | json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0) | Any stack trace (if available) associated with the exception. |
The container runs as a non-root user called router. It has a uid of 1000 and a group also called router. This is the primary group for the user and the GID is also 1000.