Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kayrock] Add describe consumer groups endpoint as kayrock #487

Merged
merged 9 commits into from
Mar 11, 2024
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ on:

jobs:
setup:
name: test / setup
name: test | setup dependencies
runs-on: ubuntu-20.04
env:
MIX_ENV: test
Expand Down
94 changes: 94 additions & 0 deletions docker-compose-arm.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
version: '3.9'

services:
zookeeper:
image: confluentinc/cp-zookeeper:7.0.4.arm64
restart: unless-stopped
ports:
- '32181:32181'
environment:
ZOOKEEPER_CLIENT_PORT: 32181
ZOOKEEPER_TICK_TIME: 2000
healthcheck:
test: "zookeeper-shell 127.0.01:32181 ls /"
interval: 10s
timeout: 10s
retries: 5

kafka-1:
image: confluentinc/cp-kafka:7.0.4.arm64
ports:
- '9092:9092'
depends_on:
zookeeper:
condition: service_healthy
env_file: docker-compose-kafka.env
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:29092,EXTERNAL://localhost:9092
healthcheck:
test: kafka-topics --bootstrap-server kafka-1:29092 --list
interval: 30s
timeout: 10s
retries: 4
volumes:
- ./ssl/kafka.server.keystore.jks:/etc/kafka/secrets/kafka.server.keystore.jks:ro,z
- ./ssl/kafka.server.truststore.jks:/etc/kafka/secrets/kafka.server.truststore.jks:ro,z

kafka-2:
image: confluentinc/cp-kafka:7.0.4.arm64
ports:
- '9093:9093'
depends_on:
zookeeper:
condition: service_healthy
env_file: docker-compose-kafka.env
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-2:29093,EXTERNAL://localhost:9093
healthcheck:
test: kafka-topics --bootstrap-server kafka-2:29093 --list
interval: 30s
timeout: 10s
retries: 4
volumes:
- ./ssl/kafka.server.keystore.jks:/etc/kafka/secrets/kafka.server.keystore.jks:ro,z
- ./ssl/kafka.server.truststore.jks:/etc/kafka/secrets/kafka.server.truststore.jks:ro,z

kafka-3:
image: confluentinc/cp-kafka:7.0.4.arm64
ports:
- '9094:9094'
depends_on:
zookeeper:
condition: service_healthy
env_file: docker-compose-kafka.env
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-3:29094,EXTERNAL://localhost:9094
healthcheck:
test: kafka-topics --bootstrap-server kafka-2:29093 --list
interval: 30s
timeout: 10s
retries: 4
volumes:
- ./ssl/kafka.server.keystore.jks:/etc/kafka/secrets/kafka.server.keystore.jks:ro,z
- ./ssl/kafka.server.truststore.jks:/etc/kafka/secrets/kafka.server.truststore.jks:ro,z

kafka_setup:
image: confluentinc/cp-kafka:7.0.4.arm64
depends_on:
zookeeper:
condition: service_healthy
kafka-1:
condition: service_healthy
kafka-2:
condition: service_healthy
kafka-3:
condition: service_healthy
command: "bash -c 'kafka-topics --zookeeper zookeeper:32181 --create --if-not-exists --partitions 4 --replication-factor 2 --topic consumer_group_implementation_test && \
kafka-topics --zookeeper zookeeper:32181 --create --if-not-exists --partitions 4 --replication-factor 2 --topic test0p8p0 && \
kafka-topics --zookeeper zookeeper:32181 --list'"
16 changes: 16 additions & 0 deletions lib/kafka_ex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,22 @@
Server.call(worker, :consumer_group)
end

@doc """
Sends a request to describe a group identified by its name.
We support only one consumer group per request for now, as we don't
group requests by group coordinator.
This is a new client implementation, and is not compatible with the old clients
"""
@spec describe_group(binary, Keyword.t()) :: {:ok, any} | {:error, any}
def describe_group(consumer_group_name, opts \\ []) do
worker_name = Keyword.get(opts, :worker_name, Config.default_worker())

case Server.call(worker_name, {:describe_groups, [consumer_group_name]}) do
{:ok, [group]} -> {:ok, group}
{:error, error} -> {:error, error}
end
end

@doc """
Sends a request to join a consumer group.
"""
Expand Down Expand Up @@ -643,7 +659,7 @@
end
end

defp current_offset(

Check warning on line 662 in lib/kafka_ex.ex

View workflow job for this annotation

GitHub Actions / runner / Test (1.14, 24.3)

default values for the optional arguments in current_offset/6 are never used

Check warning on line 662 in lib/kafka_ex.ex

View workflow job for this annotation

GitHub Actions / runner / Test (1.12, 22.3)

default values for the optional arguments in current_offset/6 are never used
supplied_offset,
partition,
topic,
Expand Down
72 changes: 72 additions & 0 deletions lib/kafka_ex/new/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ defmodule KafkaEx.New.Client do
alias KafkaEx.Config
alias KafkaEx.NetworkClient

alias KafkaEx.New.Client.RequestBuilder
alias KafkaEx.New.Client.ResponseParser
alias KafkaEx.New.Structs.Broker
alias KafkaEx.New.Structs.ClusterMetadata
alias KafkaEx.New.Structs.NodeSelector
Expand Down Expand Up @@ -163,6 +165,16 @@ defmodule KafkaEx.New.Client do
{:reply, {:ok, topic_metadata}, updated_state}
end

def handle_call({:describe_groups, [consumer_group_name]}, _from, state) do
if KafkaEx.valid_consumer_group?(consumer_group_name) do
{response, updated_state} = describe_group_request(consumer_group_name, state)

{:reply, response, updated_state}
else
{:reply, {:error, :invalid_consumer_group}, state}
end
end

def handle_call({:kayrock_request, request, node_selector}, _from, state) do
{response, updated_state} = kayrock_network_request(request, node_selector, state)

Expand Down Expand Up @@ -245,6 +257,66 @@ defmodule KafkaEx.New.Client do
end
end

defp describe_group_request(consumer_group_name, state) do
node_selector = NodeSelector.consumer_group(consumer_group_name)

[consumer_group_name]
|> RequestBuilder.describe_groups_request(state)
|> handle_describe_group_request(node_selector, state)
end

defp handle_describe_group_request(
_,
_,
_,
retry_count \\ @retry_count,
_last_error \\ nil
)

defp handle_describe_group_request(_, _, state, 0, last_error) do
{{:error, last_error}, state}
end

defp handle_describe_group_request(
request,
node_selector,
state,
retry_count,
_last_error
) do
case kayrock_network_request(request, node_selector, state) do
{{:ok, response}, state_out} ->
case ResponseParser.describe_groups_response(response) do
{:ok, consumer_groups} ->
{{:ok, consumer_groups}, state_out}

{:error, [error | _]} ->
Logger.warn(
"Unable to fetch consumer group metadata for #{inspect(request.group_ids)}"
)

handle_describe_group_request(
request,
node_selector,
state,
retry_count - 1,
error
)
end

{_, _state_out} ->
Logger.warn("Unable to fetch consumer group metadata for #{inspect(request.group_ids)}")

handle_describe_group_request(
request,
node_selector,
state,
retry_count - 1,
:unknown
)
end
end

defp maybe_connect_broker(broker, state) do
case Broker.connected?(broker) do
true ->
Expand Down
34 changes: 34 additions & 0 deletions lib/kafka_ex/new/client/request_builder.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
defmodule KafkaEx.New.Client.RequestBuilder do
@moduledoc """
This module is used to build request for KafkaEx.New.Client.
It's main decision point which protocol to use for building request and what
is required version.
"""
@protocol Application.compile_env(
:kafka_ex,
:protocol,
KafkaEx.New.Protocols.KayrockProtocol
)

@default_api_version %{
describe_groups: 1
}

alias KafkaEx.New.Client.State

@doc """
Builds request for Describe Groups API
"""
@spec describe_groups_request([binary], State.t()) :: term
def describe_groups_request(group_names, state) do
api_version = get_api_version(state, :describe_groups)

@protocol.build_request(:describe_groups, api_version, group_names: group_names)
end

# -----------------------------------------------------------------------------
defp get_api_version(state, request_type) do
default = Map.fetch!(@default_api_version, request_type)
State.max_supported_api_version(state, request_type, default)
end
end
22 changes: 22 additions & 0 deletions lib/kafka_ex/new/client/response_parser.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
defmodule KafkaEx.New.Client.ResponseParser do
@moduledoc """
This module is used to parse response from KafkaEx.New.Client.
It's main decision point which protocol to use for parsing response
"""
alias KafkaEx.New.Structs.ConsumerGroup

@protocol Application.compile_env(
:kafka_ex,
:protocol,
KafkaEx.New.Protocols.KayrockProtocol
)

@doc """
Parses response for Describe Groups API
"""
@spec describe_groups_response(term) ::
{:ok, [ConsumerGroup.t()]} | {:error, term}
def describe_groups_response(response) do
@protocol.parse_response(:describe_groups, response)
end
end
17 changes: 16 additions & 1 deletion lib/kafka_ex/new/kafka_ex_api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ defmodule KafkaEx.New.KafkaExAPI do

alias KafkaEx.New.Client
alias KafkaEx.New.Structs.ClusterMetadata
alias KafkaEx.New.Structs.ConsumerGroup
alias KafkaEx.New.Structs.Topic
alias KafkaEx.New.Structs.NodeSelector

Expand Down Expand Up @@ -56,6 +57,20 @@ defmodule KafkaEx.New.KafkaExAPI do
end
end

@doc """
Sends a request to describe a group identified by its name.
We support only one consumer group per request for now, as we don't
group requests by group coordinator.
"""
@spec describe_group(client, Keyword.t()) ::
{:ok, ConsumerGroup.t()} | {:error, any}
def describe_group(client, consumer_group_name) do
case GenServer.call(client, {:describe_groups, [consumer_group_name]}) do
{:ok, [group]} -> {:ok, group}
{:error, error} -> {:error, error}
end
end

@doc """
Get topic metadata for the given topics

Expand All @@ -73,7 +88,7 @@ defmodule KafkaEx.New.KafkaExAPI do
Returns the cluster metadata from the given client
"""
@spec cluster_metadata(client) :: {:ok, ClusterMetadata.t()}
def(cluster_metadata(client)) do
def cluster_metadata(client) do
GenServer.call(client, :cluster_metadata)
end

Expand Down
25 changes: 25 additions & 0 deletions lib/kafka_ex/new/protocols/kayrock/describe_groups.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
defprotocol KafkaEx.New.Protocols.Kayrock.DescribeGroups do

Check warning on line 1 in lib/kafka_ex/new/protocols/kayrock/describe_groups.ex

View workflow job for this annotation

GitHub Actions / runner / Test (1.14, 24.3)

protocols must define at least one function, but none was defined
@moduledoc """
This module handles Describe Groups request & response parsing.
Request is built using Kayrock protocol, response is parsed to
native KafkaEx structs.
"""

defprotocol Request do
@moduledoc """
This protocol is used to build Describe Groups request
"""
@spec build_request(t(), [binary]) :: t()
def build_request(request, consumer_group_names)
end

defprotocol Response do
@moduledoc """
This protocol is used to parse Describe Groups response
"""
alias KafkaEx.New.Structs.ConsumerGroup

@spec parse_response(t()) :: {:ok, [ConsumerGroup.t()]} | {:error, term}
def parse_response(response)
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
defimpl KafkaEx.New.Protocols.Kayrock.DescribeGroups.Request,
for: [Kayrock.DescribeGroups.V0.Request, Kayrock.DescribeGroups.V1.Request] do
def build_request(request_template, consumer_group_names) do
Map.put(request_template, :group_ids, consumer_group_names)
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
defimpl KafkaEx.New.Protocols.Kayrock.DescribeGroups.Response,
for: [Kayrock.DescribeGroups.V0.Response, Kayrock.DescribeGroups.V1.Response] do
def parse_response(%{groups: groups}) do
case Enum.filter(groups, &(&1.error_code != 0)) do
[] ->
groups = Enum.map(groups, &build_consumer_group/1)
{:ok, groups}

errors ->
error_list =
Enum.map(errors, fn %{group_id: group_id, error_code: error_code} ->
{group_id, Kayrock.ErrorCode.code_to_atom!(error_code)}
end)

{:error, error_list}
end
end

defp build_consumer_group(kayrock_group) do
KafkaEx.New.Structs.ConsumerGroup.from_describe_group_response(kayrock_group)
end
end
Loading
Loading