This library integrates the Broadway data processing pipeline library with Kafka.
It communicates with Kafka using the Elsa Elixir library, which itself uses the Brod Erlang library.
It can dynamically create Broadway concurrency stages on a per-topic or per-partition basis for a given Kafka topic.
Add off_broadway_kafka
to the list of dependencies in mix.exs
:
def deps do
[
{:off_broadway_kafka, "~> 2.0"}
]
end
Docs can be found on HexDocs or generated with ExDoc.
This example uses Broadway in the traditional way.
It calls Elsa.Supervisor.start_link/1
with the specified kafka_config
to
set up a Kafka consumer. It receives Kafka messages and adds them as events to
the OffBroadway.Kafka.Producer
Broadway Producer, which are then handled by
the specified Broadway pipeline.
defmodule ClassicBroadway do
use Broadway
def start_link(opts) do
kafka_config = [
connection: :client1,
endpoints: [localhost: 9092],
group_consumer: [
group: "classic",
topics: ["topic1"],
config: [
begin_offset: :earliest
]
]
]
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {OffBroadway.Kafka.Producer, kafka_config},
concurrency: 1
],
processors: [
default: [
concurrency: 1
]
],
context: %{pid: Keyword.get(opts, :pid)}
)
end
def handle_message(processor, message, context) do
send(context.pid, {:message, message})
message
end
end
This example uses the OffBroadway.Kafka
macro.
It starts a Broadway pipeline for each topic and partition for increased concurrency processing events.
defmodule ShowtimeBroadway do
use OffBroadway.Kafka
def kafka_config(_opts) do
[
connection: :per_partition,
endpoints: [localhost: 9092],
group_consumer: [
group: "per_partition",
topics: ["topic1"],
config: [
prefetch_count: 5,
prefetch_bytes: 0,
begin_offset: :earliest
]
]
]
end
def broadway_config(opts, topic, partition) do
[
name: :"broadway_per_partition_#{topic}_#{partition}",
processors: [
default: [
concurrency: 5
]
],
context: %{
pid: Keyword.get(opts, :pid)
}
]
end
def handle_message(processor, message, context) do
send(context.pid, {:message, message})
message
end
end