diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 58068333..0a8719cc 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -8,7 +8,7 @@ on: jobs: setup: - name: test / setup + name: test | setup dependencies runs-on: ubuntu-20.04 env: MIX_ENV: test diff --git a/docker-compose-arm.yml b/docker-compose-arm.yml new file mode 100644 index 00000000..ecaf5c1a --- /dev/null +++ b/docker-compose-arm.yml @@ -0,0 +1,94 @@ +version: '3.9' + +services: + zookeeper: + image: confluentinc/cp-zookeeper:7.0.4.arm64 + restart: unless-stopped + ports: + - '32181:32181' + environment: + ZOOKEEPER_CLIENT_PORT: 32181 + ZOOKEEPER_TICK_TIME: 2000 + healthcheck: + test: "zookeeper-shell 127.0.01:32181 ls /" + interval: 10s + timeout: 10s + retries: 5 + + kafka-1: + image: confluentinc/cp-kafka:7.0.4.arm64 + ports: + - '9092:9092' + depends_on: + zookeeper: + condition: service_healthy + env_file: docker-compose-kafka.env + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181 + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:29092,EXTERNAL://localhost:9092 + healthcheck: + test: kafka-topics --bootstrap-server kafka-1:29092 --list + interval: 30s + timeout: 10s + retries: 4 + volumes: + - ./ssl/kafka.server.keystore.jks:/etc/kafka/secrets/kafka.server.keystore.jks:ro,z + - ./ssl/kafka.server.truststore.jks:/etc/kafka/secrets/kafka.server.truststore.jks:ro,z + + kafka-2: + image: confluentinc/cp-kafka:7.0.4.arm64 + ports: + - '9093:9093' + depends_on: + zookeeper: + condition: service_healthy + env_file: docker-compose-kafka.env + environment: + KAFKA_BROKER_ID: 2 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181 + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-2:29093,EXTERNAL://localhost:9093 + healthcheck: + test: kafka-topics --bootstrap-server kafka-2:29093 --list + interval: 30s + timeout: 10s + retries: 4 + volumes: + - ./ssl/kafka.server.keystore.jks:/etc/kafka/secrets/kafka.server.keystore.jks:ro,z + - ./ssl/kafka.server.truststore.jks:/etc/kafka/secrets/kafka.server.truststore.jks:ro,z + + kafka-3: + image: confluentinc/cp-kafka:7.0.4.arm64 + ports: + - '9094:9094' + depends_on: + zookeeper: + condition: service_healthy + env_file: docker-compose-kafka.env + environment: + KAFKA_BROKER_ID: 3 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181 + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-3:29094,EXTERNAL://localhost:9094 + healthcheck: + test: kafka-topics --bootstrap-server kafka-2:29093 --list + interval: 30s + timeout: 10s + retries: 4 + volumes: + - ./ssl/kafka.server.keystore.jks:/etc/kafka/secrets/kafka.server.keystore.jks:ro,z + - ./ssl/kafka.server.truststore.jks:/etc/kafka/secrets/kafka.server.truststore.jks:ro,z + + kafka_setup: + image: confluentinc/cp-kafka:7.0.4.arm64 + depends_on: + zookeeper: + condition: service_healthy + kafka-1: + condition: service_healthy + kafka-2: + condition: service_healthy + kafka-3: + condition: service_healthy + command: "bash -c 'kafka-topics --zookeeper zookeeper:32181 --create --if-not-exists --partitions 4 --replication-factor 2 --topic consumer_group_implementation_test && \ + kafka-topics --zookeeper zookeeper:32181 --create --if-not-exists --partitions 4 --replication-factor 2 --topic test0p8p0 && \ + kafka-topics --zookeeper zookeeper:32181 --list'" diff --git a/lib/kafka_ex.ex b/lib/kafka_ex.ex index 7a6d39ee..cd8bd7b6 100644 --- a/lib/kafka_ex.ex +++ b/lib/kafka_ex.ex @@ -123,6 +123,22 @@ defmodule KafkaEx do Server.call(worker, :consumer_group) end + @doc """ + Sends a request to describe a group identified by its name. + We support only one consumer group per request for now, as we don't + group requests by group coordinator. + This is a new client implementation, and is not compatible with the old clients + """ + @spec describe_group(binary, Keyword.t()) :: {:ok, any} | {:error, any} + def describe_group(consumer_group_name, opts \\ []) do + worker_name = Keyword.get(opts, :worker_name, Config.default_worker()) + + case Server.call(worker_name, {:describe_groups, [consumer_group_name]}) do + {:ok, [group]} -> {:ok, group} + {:error, error} -> {:error, error} + end + end + @doc """ Sends a request to join a consumer group. """ diff --git a/lib/kafka_ex/new/client.ex b/lib/kafka_ex/new/client.ex index 47bd2a0d..3587e5d5 100644 --- a/lib/kafka_ex/new/client.ex +++ b/lib/kafka_ex/new/client.ex @@ -14,6 +14,8 @@ defmodule KafkaEx.New.Client do alias KafkaEx.Config alias KafkaEx.NetworkClient + alias KafkaEx.New.Client.RequestBuilder + alias KafkaEx.New.Client.ResponseParser alias KafkaEx.New.Structs.Broker alias KafkaEx.New.Structs.ClusterMetadata alias KafkaEx.New.Structs.NodeSelector @@ -163,6 +165,16 @@ defmodule KafkaEx.New.Client do {:reply, {:ok, topic_metadata}, updated_state} end + def handle_call({:describe_groups, [consumer_group_name]}, _from, state) do + if KafkaEx.valid_consumer_group?(consumer_group_name) do + {response, updated_state} = describe_group_request(consumer_group_name, state) + + {:reply, response, updated_state} + else + {:reply, {:error, :invalid_consumer_group}, state} + end + end + def handle_call({:kayrock_request, request, node_selector}, _from, state) do {response, updated_state} = kayrock_network_request(request, node_selector, state) @@ -245,6 +257,66 @@ defmodule KafkaEx.New.Client do end end + defp describe_group_request(consumer_group_name, state) do + node_selector = NodeSelector.consumer_group(consumer_group_name) + + [consumer_group_name] + |> RequestBuilder.describe_groups_request(state) + |> handle_describe_group_request(node_selector, state) + end + + defp handle_describe_group_request( + _, + _, + _, + retry_count \\ @retry_count, + _last_error \\ nil + ) + + defp handle_describe_group_request(_, _, state, 0, last_error) do + {{:error, last_error}, state} + end + + defp handle_describe_group_request( + request, + node_selector, + state, + retry_count, + _last_error + ) do + case kayrock_network_request(request, node_selector, state) do + {{:ok, response}, state_out} -> + case ResponseParser.describe_groups_response(response) do + {:ok, consumer_groups} -> + {{:ok, consumer_groups}, state_out} + + {:error, [error | _]} -> + Logger.warn( + "Unable to fetch consumer group metadata for #{inspect(request.group_ids)}" + ) + + handle_describe_group_request( + request, + node_selector, + state, + retry_count - 1, + error + ) + end + + {_, _state_out} -> + Logger.warn("Unable to fetch consumer group metadata for #{inspect(request.group_ids)}") + + handle_describe_group_request( + request, + node_selector, + state, + retry_count - 1, + :unknown + ) + end + end + defp maybe_connect_broker(broker, state) do case Broker.connected?(broker) do true -> diff --git a/lib/kafka_ex/new/client/request_builder.ex b/lib/kafka_ex/new/client/request_builder.ex new file mode 100644 index 00000000..8e43d58d --- /dev/null +++ b/lib/kafka_ex/new/client/request_builder.ex @@ -0,0 +1,34 @@ +defmodule KafkaEx.New.Client.RequestBuilder do + @moduledoc """ + This module is used to build request for KafkaEx.New.Client. + It's main decision point which protocol to use for building request and what + is required version. + """ + @protocol Application.compile_env( + :kafka_ex, + :protocol, + KafkaEx.New.Protocols.KayrockProtocol + ) + + @default_api_version %{ + describe_groups: 1 + } + + alias KafkaEx.New.Client.State + + @doc """ + Builds request for Describe Groups API + """ + @spec describe_groups_request([binary], State.t()) :: term + def describe_groups_request(group_names, state) do + api_version = get_api_version(state, :describe_groups) + + @protocol.build_request(:describe_groups, api_version, group_names: group_names) + end + + # ----------------------------------------------------------------------------- + defp get_api_version(state, request_type) do + default = Map.fetch!(@default_api_version, request_type) + State.max_supported_api_version(state, request_type, default) + end +end diff --git a/lib/kafka_ex/new/client/response_parser.ex b/lib/kafka_ex/new/client/response_parser.ex new file mode 100644 index 00000000..80394d33 --- /dev/null +++ b/lib/kafka_ex/new/client/response_parser.ex @@ -0,0 +1,22 @@ +defmodule KafkaEx.New.Client.ResponseParser do + @moduledoc """ + This module is used to parse response from KafkaEx.New.Client. + It's main decision point which protocol to use for parsing response + """ + alias KafkaEx.New.Structs.ConsumerGroup + + @protocol Application.compile_env( + :kafka_ex, + :protocol, + KafkaEx.New.Protocols.KayrockProtocol + ) + + @doc """ + Parses response for Describe Groups API + """ + @spec describe_groups_response(term) :: + {:ok, [ConsumerGroup.t()]} | {:error, term} + def describe_groups_response(response) do + @protocol.parse_response(:describe_groups, response) + end +end diff --git a/lib/kafka_ex/new/kafka_ex_api.ex b/lib/kafka_ex/new/kafka_ex_api.ex index 937c6547..1d6354d1 100644 --- a/lib/kafka_ex/new/kafka_ex_api.ex +++ b/lib/kafka_ex/new/kafka_ex_api.ex @@ -15,6 +15,7 @@ defmodule KafkaEx.New.KafkaExAPI do alias KafkaEx.New.Client alias KafkaEx.New.Structs.ClusterMetadata + alias KafkaEx.New.Structs.ConsumerGroup alias KafkaEx.New.Structs.Topic alias KafkaEx.New.Structs.NodeSelector @@ -56,6 +57,20 @@ defmodule KafkaEx.New.KafkaExAPI do end end + @doc """ + Sends a request to describe a group identified by its name. + We support only one consumer group per request for now, as we don't + group requests by group coordinator. + """ + @spec describe_group(client, Keyword.t()) :: + {:ok, ConsumerGroup.t()} | {:error, any} + def describe_group(client, consumer_group_name) do + case GenServer.call(client, {:describe_groups, [consumer_group_name]}) do + {:ok, [group]} -> {:ok, group} + {:error, error} -> {:error, error} + end + end + @doc """ Get topic metadata for the given topics @@ -73,7 +88,7 @@ defmodule KafkaEx.New.KafkaExAPI do Returns the cluster metadata from the given client """ @spec cluster_metadata(client) :: {:ok, ClusterMetadata.t()} - def(cluster_metadata(client)) do + def cluster_metadata(client) do GenServer.call(client, :cluster_metadata) end diff --git a/lib/kafka_ex/new/protocols/kayrock/describe_groups.ex b/lib/kafka_ex/new/protocols/kayrock/describe_groups.ex new file mode 100644 index 00000000..7fce8dc3 --- /dev/null +++ b/lib/kafka_ex/new/protocols/kayrock/describe_groups.ex @@ -0,0 +1,25 @@ +defprotocol KafkaEx.New.Protocols.Kayrock.DescribeGroups do + @moduledoc """ + This module handles Describe Groups request & response parsing. + Request is built using Kayrock protocol, response is parsed to + native KafkaEx structs. + """ + + defprotocol Request do + @moduledoc """ + This protocol is used to build Describe Groups request + """ + @spec build_request(t(), [binary]) :: t() + def build_request(request, consumer_group_names) + end + + defprotocol Response do + @moduledoc """ + This protocol is used to parse Describe Groups response + """ + alias KafkaEx.New.Structs.ConsumerGroup + + @spec parse_response(t()) :: {:ok, [ConsumerGroup.t()]} | {:error, term} + def parse_response(response) + end +end diff --git a/lib/kafka_ex/new/protocols/kayrock/describe_groups/default_request_impl.ex b/lib/kafka_ex/new/protocols/kayrock/describe_groups/default_request_impl.ex new file mode 100644 index 00000000..089e6a2e --- /dev/null +++ b/lib/kafka_ex/new/protocols/kayrock/describe_groups/default_request_impl.ex @@ -0,0 +1,6 @@ +defimpl KafkaEx.New.Protocols.Kayrock.DescribeGroups.Request, + for: [Kayrock.DescribeGroups.V0.Request, Kayrock.DescribeGroups.V1.Request] do + def build_request(request_template, consumer_group_names) do + Map.put(request_template, :group_ids, consumer_group_names) + end +end diff --git a/lib/kafka_ex/new/protocols/kayrock/describe_groups/default_response_impl.ex b/lib/kafka_ex/new/protocols/kayrock/describe_groups/default_response_impl.ex new file mode 100644 index 00000000..9bf5cee9 --- /dev/null +++ b/lib/kafka_ex/new/protocols/kayrock/describe_groups/default_response_impl.ex @@ -0,0 +1,22 @@ +defimpl KafkaEx.New.Protocols.Kayrock.DescribeGroups.Response, + for: [Kayrock.DescribeGroups.V0.Response, Kayrock.DescribeGroups.V1.Response] do + def parse_response(%{groups: groups}) do + case Enum.filter(groups, &(&1.error_code != 0)) do + [] -> + groups = Enum.map(groups, &build_consumer_group/1) + {:ok, groups} + + errors -> + error_list = + Enum.map(errors, fn %{group_id: group_id, error_code: error_code} -> + {group_id, Kayrock.ErrorCode.code_to_atom!(error_code)} + end) + + {:error, error_list} + end + end + + defp build_consumer_group(kayrock_group) do + KafkaEx.New.Structs.ConsumerGroup.from_describe_group_response(kayrock_group) + end +end diff --git a/lib/kafka_ex/new/protocols/kayrock_protocol.ex b/lib/kafka_ex/new/protocols/kayrock_protocol.ex new file mode 100644 index 00000000..a348aa6a --- /dev/null +++ b/lib/kafka_ex/new/protocols/kayrock_protocol.ex @@ -0,0 +1,32 @@ +defmodule KafkaEx.New.Protocols.KayrockProtocol do + @moduledoc """ + This module handles Kayrock request & response handling & parsing. + Once Kafka Ex v1.0 is released, this module will be renamed to KayrockProtocol + and will become a separated package. + """ + @behaviour KafkaEx.New.Client.Protocol + + alias KafkaEx.New.Protocols.Kayrock, as: KayrockProtocol + + # ----------------------------------------------------------------------------- + @doc """ + Builds request for Describe Groups API + """ + @impl KafkaEx.New.Client.Protocol + def build_request(:describe_groups, api_version, opts) do + group_names = Keyword.fetch!(opts, :group_names) + + api_version + |> Kayrock.DescribeGroups.get_request_struct() + |> KayrockProtocol.DescribeGroups.Request.build_request(group_names) + end + + # ----------------------------------------------------------------------------- + @doc """ + Parses response for Describe Groups API + """ + @impl KafkaEx.New.Client.Protocol + def parse_response(:describe_groups, response) do + KayrockProtocol.DescribeGroups.Response.parse_response(response) + end +end diff --git a/lib/kafka_ex/new/protocols/protocol.ex b/lib/kafka_ex/new/protocols/protocol.ex new file mode 100644 index 00000000..b677357a --- /dev/null +++ b/lib/kafka_ex/new/protocols/protocol.ex @@ -0,0 +1,17 @@ +defmodule KafkaEx.New.Client.Protocol do + @moduledoc """ + This module is responsible for defining the behaviour of a protocol. + """ + # ------------------------------------------------------------------------------ + @type api_version :: non_neg_integer + @type params :: Keyword.t() + + # ------------------------------------------------------------------------------ + @callback build_request(:describe_groups, integer, params) :: term + + # ------------------------------------------------------------------------------ + @type consumer_group :: KafkaEx.New.Structs.ConsumerGroup + + @callback parse_response(:describe_groups, term) :: + {:ok, [consumer_group]} | {:error, term} +end diff --git a/lib/kafka_ex/new/structs/consumer_group.ex b/lib/kafka_ex/new/structs/consumer_group.ex new file mode 100644 index 00000000..9a9b3462 --- /dev/null +++ b/lib/kafka_ex/new/structs/consumer_group.ex @@ -0,0 +1,32 @@ +defmodule KafkaEx.New.Structs.ConsumerGroup do + @moduledoc """ + Encapsulates what we know about consumer group + """ + + alias KafkaEx.New.Structs.ConsumerGroup.Member + + @type t :: %__MODULE__{ + group_id: binary, + state: binary, + protocol_type: binary, + protocol: binary, + members: list(Member.t()) + } + + defstruct ~w(group_id state protocol_type protocol members)a + + @spec from_describe_group_response(map) :: __MODULE__.t() + def from_describe_group_response(describe_group) do + %__MODULE__{ + group_id: describe_group.group_id, + state: describe_group.state, + protocol_type: describe_group.protocol_type, + protocol: describe_group.protocol, + members: Enum.map(describe_group.members, &build_consumer_group_member/1) + } + end + + defp build_consumer_group_member(group_member) do + Member.from_describe_group_response(group_member) + end +end diff --git a/lib/kafka_ex/new/structs/consumer_group/member.ex b/lib/kafka_ex/new/structs/consumer_group/member.ex new file mode 100644 index 00000000..0064ea6a --- /dev/null +++ b/lib/kafka_ex/new/structs/consumer_group/member.ex @@ -0,0 +1,41 @@ +defmodule KafkaEx.New.Structs.ConsumerGroup.Member do + @moduledoc """ + Encapsulates what we know about a consumer group member + """ + alias KafkaEx.New.Structs.ConsumerGroup.Member.MemberAssignment + + @type t :: %__MODULE__{ + member_id: binary, + client_id: binary, + client_host: binary, + member_metadata: term, + member_assignment: MemberAssignment.t() | nil + } + + defstruct ~w(member_id client_id client_host member_metadata member_assignment)a + + @type partial_response :: %{ + required(:member_id) => binary, + required(:client_id) => binary, + required(:client_host) => binary, + required(:member_metadata) => term, + optional(:member_assignment) => map | nil + } + + @spec from_describe_group_response(partial_response()) :: __MODULE__.t() + def from_describe_group_response(response) do + %__MODULE__{ + member_id: response.member_id, + client_id: response.client_id, + client_host: response.client_host, + member_metadata: response.member_metadata, + member_assignment: build_member_assignment(Map.get(response, :member_assignment)) + } + end + + defp build_member_assignment(nil), do: nil + + defp build_member_assignment(member_assignment) do + MemberAssignment.from_describe_group_response(member_assignment) + end +end diff --git a/lib/kafka_ex/new/structs/consumer_group/member_assignment.ex b/lib/kafka_ex/new/structs/consumer_group/member_assignment.ex new file mode 100644 index 00000000..676f506e --- /dev/null +++ b/lib/kafka_ex/new/structs/consumer_group/member_assignment.ex @@ -0,0 +1,36 @@ +defmodule KafkaEx.New.Structs.ConsumerGroup.Member.MemberAssignment do + @moduledoc """ + Encapsulates what we know about a consumer group. + The current assignment provided by the group leader (will only be present if the group is stable). + """ + + alias KafkaEx.New.Structs.ConsumerGroup.Member.MemberAssignment.PartitionAssignment + + @type t :: %__MODULE__{ + version: non_neg_integer, + user_data: binary, + partition_assignments: [PartitionAssignment.t()] + } + + defstruct ~w(version partition_assignments user_data)a + + @type response_partial :: %{ + required(:version) => non_neg_integer, + required(:partition_assignments) => [ + PartitionAssignment.response_partial() + ], + required(:user_data) => binary + } + + @spec from_describe_group_response(response_partial()) :: __MODULE__.t() + def from_describe_group_response(response) do + %__MODULE__{ + version: response.version, + user_data: response.user_data, + partition_assignments: + Enum.map(response.partition_assignments, fn assignment -> + PartitionAssignment.from_describe_group_response(assignment) + end) + } + end +end diff --git a/lib/kafka_ex/new/structs/consumer_group/partition_assignment.ex b/lib/kafka_ex/new/structs/consumer_group/partition_assignment.ex new file mode 100644 index 00000000..2299d9b7 --- /dev/null +++ b/lib/kafka_ex/new/structs/consumer_group/partition_assignment.ex @@ -0,0 +1,26 @@ +defmodule KafkaEx.New.Structs.ConsumerGroup.Member.MemberAssignment.PartitionAssignment do + @moduledoc """ + Encapsulates what we know about a consumer group member partition assignment. + Will only be present if the group is stable and is assigned to given topic. + """ + + @type t :: %__MODULE__{ + topic: binary, + partitions: list(non_neg_integer) + } + + defstruct ~w(topic partitions)a + + @type response_partial :: %{ + required(:topic) => binary, + required(:partitions) => list(non_neg_integer) + } + + @spec from_describe_group_response(response_partial()) :: __MODULE__.t() + def from_describe_group_response(response) do + %__MODULE__{ + topic: response.topic, + partitions: response.partitions + } + end +end diff --git a/scripts/ci_tests.sh b/scripts/ci_tests.sh index e83f3e38..85bffc3f 100755 --- a/scripts/ci_tests.sh +++ b/scripts/ci_tests.sh @@ -26,4 +26,4 @@ export TEST_COMMAND ALL_TESTS=${PROJECT_DIR}/scripts/all_tests.sh # Retry if it doesn't work the first time -${ALL_TESTS} || ${ALL_TESTS} --failed +${ALL_TESTS} diff --git a/test/integration/consumer_group_implementation_test.exs b/test/integration/consumer_group_implementation_test.exs index 3c6cf934..d3f5a39e 100644 --- a/test/integration/consumer_group_implementation_test.exs +++ b/test/integration/consumer_group_implementation_test.exs @@ -3,7 +3,7 @@ defmodule KafkaEx.ConsumerGroupImplementationTest do alias KafkaEx.ConsumerGroup alias KafkaEx.GenConsumer - import TestHelper + import KafkaEx.TestHelpers require Logger diff --git a/test/integration/consumer_group_test.exs b/test/integration/consumer_group_test.exs index 6ff6cfd1..45b67494 100644 --- a/test/integration/consumer_group_test.exs +++ b/test/integration/consumer_group_test.exs @@ -2,7 +2,7 @@ defmodule KafkaEx.ConsumerGroup.Test do alias KafkaEx.Protocol, as: Proto alias KafkaEx.Config use ExUnit.Case - import TestHelper + import KafkaEx.TestHelpers @moduletag :consumer_group @@ -233,7 +233,7 @@ defmodule KafkaEx.ConsumerGroup.Test do consumer_group: consumer_group ) - offset_before = TestHelper.latest_offset_number(topic, 0, worker_name) + offset_before = KafkaEx.TestHelpers.latest_offset_number(topic, 0, worker_name) Enum.each(1..10, fn _ -> msg = %Proto.Produce.Message{value: "hey #{inspect(:os.timestamp())}"} @@ -249,7 +249,8 @@ defmodule KafkaEx.ConsumerGroup.Test do ) end) - offset_after = TestHelper.latest_offset_number(topic, 0, worker_name) + offset_after = KafkaEx.TestHelpers.latest_offset_number(topic, 0, worker_name) + assert offset_after == offset_before + 10 [logs] = @@ -466,7 +467,7 @@ defmodule KafkaEx.ConsumerGroup.Test do offset: 0 ) - log = TestHelper.wait_for_any(fn -> Enum.take(stream, 2) end) + log = KafkaEx.TestHelpers.wait_for_any(fn -> Enum.take(stream, 2) end) refute Enum.empty?(log) @@ -515,9 +516,9 @@ defmodule KafkaEx.ConsumerGroup.Test do # make sure the offset commit is actually committed before we # start streaming again :ok = - TestHelper.wait_for(fn -> + KafkaEx.TestHelpers.wait_for(fn -> 3 == - TestHelper.latest_consumer_offset_number( + KafkaEx.TestHelpers.latest_consumer_offset_number( random_string, 0, consumer_group, @@ -526,7 +527,7 @@ defmodule KafkaEx.ConsumerGroup.Test do end) stream = KafkaEx.stream(random_string, 0, worker_name: worker_name) - log = TestHelper.wait_for_any(fn -> Enum.take(stream, 2) end) + log = KafkaEx.TestHelpers.wait_for_any(fn -> Enum.take(stream, 2) end) refute Enum.empty?(log) first_message = log |> hd @@ -569,7 +570,7 @@ defmodule KafkaEx.ConsumerGroup.Test do assert "message 3" == m2.value offset = - TestHelper.latest_consumer_offset_number( + KafkaEx.TestHelpers.latest_consumer_offset_number( topic_name, 0, consumer_group, @@ -618,7 +619,7 @@ defmodule KafkaEx.ConsumerGroup.Test do assert "message 5" == m4.value offset = - TestHelper.latest_consumer_offset_number( + KafkaEx.TestHelpers.latest_consumer_offset_number( topic_name, 0, consumer_group, @@ -644,7 +645,7 @@ defmodule KafkaEx.ConsumerGroup.Test do Enum.map(map_stream, fn m -> m.value end) offset = - TestHelper.latest_consumer_offset_number( + KafkaEx.TestHelpers.latest_consumer_offset_number( topic_name, 0, other_consumer_group, diff --git a/test/integration/integration_test.exs b/test/integration/integration_test.exs index 086265cd..b16276fb 100644 --- a/test/integration/integration_test.exs +++ b/test/integration/integration_test.exs @@ -2,7 +2,7 @@ defmodule KafkaEx.Integration.Test do alias KafkaEx.Protocol, as: Proto alias KafkaEx.Config use ExUnit.Case - import TestHelper + import KafkaEx.TestHelpers @moduletag :integration @@ -203,7 +203,7 @@ defmodule KafkaEx.Integration.Test do random_string = generate_random_string() metadata = - TestHelper.wait_for_value( + KafkaEx.TestHelpers.wait_for_value( fn -> KafkaEx.metadata(topic: random_string) end, fn metadata -> metadata != nil && length(metadata.topic_metadatas) > 0 @@ -361,7 +361,10 @@ defmodule KafkaEx.Integration.Test do messages: [%Proto.Produce.Message{value: "foo"}] }) - [offset_response] = TestHelper.wait_for_any(fn -> KafkaEx.latest_offset(random_string, 0) end) + [offset_response] = + KafkaEx.TestHelpers.wait_for_any(fn -> + KafkaEx.latest_offset(random_string, 0) + end) offset = offset_response.partition_offsets |> hd |> Map.get(:offset) |> hd diff --git a/test/integration/kayrock/compatibility_0_p_10_and_later_test.exs b/test/integration/kayrock/compatibility_0_p_10_and_later_test.exs index db0d3fe1..f5a31b29 100644 --- a/test/integration/kayrock/compatibility_0_p_10_and_later_test.exs +++ b/test/integration/kayrock/compatibility_0_p_10_and_later_test.exs @@ -35,7 +35,7 @@ defmodule KafkaEx.KayrockCompatibility0p10AndLaterTest do resp = create_topic(name, config, client) assert {:topic_already_exists, name} == parse_create_topic_resp(resp) - TestHelper.wait_for(fn -> + KafkaEx.TestHelpers.wait_for(fn -> {:ok, metadatas} = KafkaExAPI.topics_metadata(client, [name]) length(metadatas) > 0 end) @@ -55,7 +55,7 @@ defmodule KafkaEx.KayrockCompatibility0p10AndLaterTest do resp = KafkaEx.delete_topics([name], timeout: 5_000, worker_name: client) assert {:no_error, name} = parse_delete_topic_resp(resp) - TestHelper.wait_for(fn -> + KafkaEx.TestHelpers.wait_for(fn -> {:ok, []} == KafkaExAPI.topics_metadata(client, [name]) end) end diff --git a/test/integration/kayrock/compatibility_0_p_8_p_0_test.exs b/test/integration/kayrock/compatibility_0_p_8_p_0_test.exs index f5ad92af..f9e462c1 100644 --- a/test/integration/kayrock/compatibility_0_p_8_p_0_test.exs +++ b/test/integration/kayrock/compatibility_0_p_8_p_0_test.exs @@ -26,7 +26,7 @@ defmodule KafkaEx.KayrockCompatibility0p8p0Test do partition = 0 :ok = KafkaEx.produce(@topic, partition, msg, worker_name: client) - TestHelper.wait_for(fn -> + KafkaEx.TestHelpers.wait_for(fn -> [got] = KafkaEx.fetch( @topic, diff --git a/test/integration/kayrock/compatibility_0_p_9_p_0_test.exs b/test/integration/kayrock/compatibility_0_p_9_p_0_test.exs index df472690..486ec28d 100644 --- a/test/integration/kayrock/compatibility_0_p_9_p_0_test.exs +++ b/test/integration/kayrock/compatibility_0_p_9_p_0_test.exs @@ -24,7 +24,7 @@ defmodule KafkaEx.KayrockCompatibility0p9p0Test do end test "can join a consumer group", %{client: client} do - random_group = TestHelper.generate_random_string() + random_group = KafkaEx.TestHelpers.generate_random_string() request = %JoinGroupRequest{ group_name: random_group, @@ -44,7 +44,7 @@ defmodule KafkaEx.KayrockCompatibility0p9p0Test do test "can send a simple leader sync for a consumer group", %{client: client} do # A lot of repetition with the previous test. Leaving it in now, waiting for # how this pans out eventually as we add more and more 0.9 consumer group code - random_group = TestHelper.generate_random_string() + random_group = KafkaEx.TestHelpers.generate_random_string() request = %JoinGroupRequest{ group_name: random_group, @@ -80,7 +80,7 @@ defmodule KafkaEx.KayrockCompatibility0p9p0Test do test "can leave a consumer group", %{client: client} do # A lot of repetition with the previous tests. Leaving it in now, waiting for # how this pans out eventually as we add more and more 0.9 consumer group code - random_group = TestHelper.generate_random_string() + random_group = KafkaEx.TestHelpers.generate_random_string() request = %JoinGroupRequest{ group_name: random_group, @@ -106,7 +106,7 @@ defmodule KafkaEx.KayrockCompatibility0p9p0Test do test "can heartbeat", %{client: client} do # See sync test. Removing repetition in the next iteration - random_group = TestHelper.generate_random_string() + random_group = KafkaEx.TestHelpers.generate_random_string() request = %JoinGroupRequest{ group_name: random_group, diff --git a/test/integration/kayrock/compatibility_consumer_group_implementation_test.exs b/test/integration/kayrock/compatibility_consumer_group_implementation_test.exs index 47f1d3cf..aea59a64 100644 --- a/test/integration/kayrock/compatibility_consumer_group_implementation_test.exs +++ b/test/integration/kayrock/compatibility_consumer_group_implementation_test.exs @@ -128,7 +128,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do end def sync_stop(pid) when is_pid(pid) do - TestHelper.wait_for(fn -> + KafkaEx.TestHelpers.wait_for(fn -> if Process.alive?(pid) do Process.exit(pid, :normal) end @@ -170,7 +170,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do topic_name = "#{@topic_name_prefix}#{:rand.uniform(2_000_000)}" - {:ok, topic_name} = TestHelper.ensure_append_timestamp_topic(client_pid, topic_name) + {:ok, topic_name} = KafkaEx.TestHelpers.ensure_append_timestamp_topic(client_pid, topic_name) {:ok, consumer_group_pid1} = ConsumerGroup.start_link( @@ -195,7 +195,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do ) # wait for both consumer groups to join - TestHelper.wait_for(fn -> + KafkaEx.TestHelpers.wait_for(fn -> ConsumerGroup.active?(consumer_group_pid1, 30000) && ConsumerGroup.active?(consumer_group_pid2, 30000) end) @@ -287,7 +287,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do starting_offsets = partition_range |> Enum.map(fn px -> - {px, TestHelper.latest_offset_number(topic_name, px)} + {px, KafkaEx.TestHelpers.latest_offset_number(topic_name, px)} end) |> Enum.into(%{}) @@ -311,7 +311,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do |> Enum.map(fn px -> consumer_pid = Map.get(consumers, {topic_name, px}) - TestHelper.wait_for(fn -> + KafkaEx.TestHelpers.wait_for(fn -> message_set = TestConsumer.last_message_set(consumer_pid) correct_last_message?(message_set, messages[px], starting_offsets[px]) end) @@ -331,9 +331,9 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupImplementationTest do # offsets should be committed on exit for px <- partition_range do - TestHelper.wait_for(fn -> + KafkaEx.TestHelpers.wait_for(fn -> ending_offset = - TestHelper.latest_consumer_offset_number( + KafkaEx.TestHelpers.latest_consumer_offset_number( topic_name, px, @consumer_group_name, diff --git a/test/integration/kayrock/compatibility_consumer_group_test.exs b/test/integration/kayrock/compatibility_consumer_group_test.exs index 7d7fe030..e1b93de7 100644 --- a/test/integration/kayrock/compatibility_consumer_group_test.exs +++ b/test/integration/kayrock/compatibility_consumer_group_test.exs @@ -30,7 +30,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupTest do test "fetch with auto_commit doesn't blow up on no messages", %{ client: client } do - topic = TestHelper.generate_random_string() + topic = KafkaEx.TestHelpers.generate_random_string() consumer_group = "auto_commit_consumer_group" KafkaExAPI.set_consumer_group_for_auto_commit(client, consumer_group) @@ -166,7 +166,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupTest do end test "fetch starts consuming from last committed offset", %{client: client} do - random_string = TestHelper.generate_random_string() + random_string = KafkaEx.TestHelpers.generate_random_string() consumer_group = "auto_commit_consumer_group" KafkaExAPI.set_consumer_group_for_auto_commit(client, consumer_group) @@ -203,7 +203,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupTest do test "fetch does not commit offset with auto_commit is set to false", %{ client: client } do - topic = TestHelper.generate_random_string() + topic = KafkaEx.TestHelpers.generate_random_string() Enum.each(1..10, fn _ -> KafkaEx.produce( @@ -246,7 +246,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupTest do end test "offset_fetch does not override consumer_group", %{client: client} do - topic = TestHelper.generate_random_string() + topic = KafkaEx.TestHelpers.generate_random_string() consumer_group = "bar#{topic}" KafkaExAPI.set_consumer_group_for_auto_commit(client, consumer_group) @@ -274,7 +274,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupTest do test "offset_commit commits an offset and offset_fetch retrieves the committed offset", %{client: client} do - random_string = TestHelper.generate_random_string() + random_string = KafkaEx.TestHelpers.generate_random_string() Enum.each(1..10, fn _ -> KafkaEx.produce( @@ -323,7 +323,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupTest do end test "stream auto_commits offset by default", %{client: client} do - random_string = TestHelper.generate_random_string() + random_string = KafkaEx.TestHelpers.generate_random_string() KafkaEx.produce( %Proto.Produce.Request{ @@ -346,7 +346,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupTest do offset: 0 ) - log = TestHelper.wait_for_any(fn -> Enum.take(stream, 2) end) + log = KafkaEx.TestHelpers.wait_for_any(fn -> Enum.take(stream, 2) end) refute Enum.empty?(log) @@ -366,7 +366,7 @@ defmodule KafkaEx.KayrockCompatibilityConsumerGroupTest do test "streams with a consumer group begin at the last committed offset", %{ client: client } do - topic_name = TestHelper.generate_random_string() + topic_name = KafkaEx.TestHelpers.generate_random_string() consumer_group = "stream_test" KafkaExAPI.set_consumer_group_for_auto_commit(client, consumer_group) diff --git a/test/integration/kayrock/compatibility_streaming_test.exs b/test/integration/kayrock/compatibility_streaming_test.exs index e7d2626f..f441e636 100644 --- a/test/integration/kayrock/compatibility_streaming_test.exs +++ b/test/integration/kayrock/compatibility_streaming_test.exs @@ -22,7 +22,7 @@ defmodule KafkaEx.KayrockCompatibilityStreamingTest do partition = 0 consumer_group = "streamers" - {:ok, topic} = TestHelper.ensure_append_timestamp_topic(client, topic) + {:ok, topic} = KafkaEx.TestHelpers.ensure_append_timestamp_topic(client, topic) KafkaEx.produce(topic, partition, "foo 1", api_version: 3) KafkaEx.produce(topic, partition, "foo 2", api_version: 3) @@ -41,7 +41,7 @@ defmodule KafkaEx.KayrockCompatibilityStreamingTest do } ) - TestHelper.wait_for(fn -> + KafkaEx.TestHelpers.wait_for(fn -> length(Enum.take(stream, 3)) == 3 end) @@ -81,7 +81,7 @@ defmodule KafkaEx.KayrockCompatibilityStreamingTest do topic_name = "kayrock_stream_with_empty_log" consumer_group = "streamers_with_empty_log" - {:ok, topic} = TestHelper.ensure_append_timestamp_topic(client, topic_name) + {:ok, topic} = KafkaEx.TestHelpers.ensure_append_timestamp_topic(client, topic_name) {:ok, agent} = Agent.start(fn -> [] end) diff --git a/test/integration/kayrock/compatibility_test.exs b/test/integration/kayrock/compatibility_test.exs index adf050c7..aa8fcd79 100644 --- a/test/integration/kayrock/compatibility_test.exs +++ b/test/integration/kayrock/compatibility_test.exs @@ -4,8 +4,8 @@ defmodule KafkaEx.KayrockCompatibilityTest do These mostly come from the original integration_test.exs file """ - use ExUnit.Case + import KafkaEx.TestHelpers @moduletag :new_client @@ -34,6 +34,45 @@ defmodule KafkaEx.KayrockCompatibilityTest do assert Process.alive?(pid) end + describe "describe_groups/1" do + setup do + consumer_group = generate_random_string() + topic = "new_client_implementation" + + {:ok, %{consumer_group: consumer_group, topic: topic}} + end + + test "with new client - returns group metadata", %{ + client: client, + consumer_group: consumer_group, + topic: topic + } do + join_to_group(client, topic, consumer_group) + + {:ok, group_metadata} = KafkaExAPI.describe_group(client, consumer_group) + + assert group_metadata.group_id == consumer_group + assert group_metadata.protocol_type == "consumer" + assert group_metadata.protocol == "" + assert length(group_metadata.members) == 1 + end + + test "with old client - returns group metadata", %{ + client: client, + consumer_group: consumer_group, + topic: topic + } do + join_to_group(client, topic, consumer_group) + + {:ok, group_metadata} = KafkaEx.describe_group(consumer_group, worker_name: client) + + assert group_metadata.group_id == consumer_group + assert group_metadata.protocol_type == "consumer" + assert group_metadata.protocol == "" + assert length(group_metadata.members) == 1 + end + end + test "worker updates metadata after specified interval" do {:ok, args} = KafkaEx.build_worker_options(metadata_update_interval: 100) {:ok, pid} = Client.start_link(args, :no_name) @@ -118,7 +157,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do end test "produce creates log for a non-existing topic", %{client: client} do - random_string = TestHelper.generate_random_string() + random_string = KafkaEx.TestHelpers.generate_random_string() KafkaEx.produce( %Proto.Produce.Request{ @@ -147,14 +186,14 @@ defmodule KafkaEx.KayrockCompatibilityTest do test "fetch returns ':topic_not_found' for non-existing topic", %{ client: client } do - random_string = TestHelper.generate_random_string() + random_string = KafkaEx.TestHelpers.generate_random_string() assert KafkaEx.fetch(random_string, 0, offset: 0, worker_name: client) == :topic_not_found end test "fetch works", %{client: client} do - topic_name = TestHelper.generate_random_string() + topic_name = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -185,7 +224,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do end test "fetch with empty topic", %{client: client} do - random_string = TestHelper.generate_random_string() + random_string = KafkaEx.TestHelpers.generate_random_string() response = KafkaEx.fetch(random_string, 0, @@ -198,7 +237,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do end test "fetch nonexistent offset", %{client: client} do - random_string = TestHelper.generate_random_string() + random_string = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -224,7 +263,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do end test "fetch nonexistent partition", %{client: client} do - random_string = TestHelper.generate_random_string() + random_string = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -248,7 +287,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do end test "earliest_offset retrieves offset of 0", %{client: client} do - random_string = TestHelper.generate_random_string() + random_string = KafkaEx.TestHelpers.generate_random_string() KafkaEx.produce( %Proto.Produce.Request{ @@ -270,7 +309,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do test "latest_offset retrieves offset of 0 for non-existing topic", %{ client: client } do - random_string = TestHelper.generate_random_string() + random_string = KafkaEx.TestHelpers.generate_random_string() {:ok, produce_offset} = KafkaEx.produce( @@ -293,7 +332,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do test "latest_offset retrieves a non-zero offset for a topic published to", %{ client: client } do - random_string = TestHelper.generate_random_string() + random_string = KafkaEx.TestHelpers.generate_random_string() KafkaEx.produce( %Proto.Produce.Request{ @@ -306,7 +345,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do ) [offset_response] = - TestHelper.wait_for_any(fn -> + KafkaEx.TestHelpers.wait_for_any(fn -> KafkaEx.latest_offset(random_string, 0, client) end) @@ -317,7 +356,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do # compression test "compresses / decompresses using gzip", %{client: client} do - random_string = TestHelper.generate_random_string() + random_string = KafkaEx.TestHelpers.generate_random_string() message1 = %Proto.Produce.Message{value: "value 1"} message2 = %Proto.Produce.Message{key: "key 2", value: "value 2"} @@ -352,7 +391,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do end test "compresses / decompresses using snappy", %{client: client} do - random_string = TestHelper.generate_random_string() + random_string = KafkaEx.TestHelpers.generate_random_string() message1 = %Proto.Produce.Message{value: "value 1"} message2 = %Proto.Produce.Message{key: "key 2", value: "value 2"} @@ -440,7 +479,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do # stream test "streams kafka logs", %{client: client} do - topic_name = TestHelper.generate_random_string() + topic_name = KafkaEx.TestHelpers.generate_random_string() KafkaEx.produce( %Proto.Produce.Request{ @@ -479,7 +518,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do test "stream with small max_bytes makes multiple requests if necessary", %{ client: client } do - topic_name = TestHelper.generate_random_string() + topic_name = KafkaEx.TestHelpers.generate_random_string() KafkaEx.produce( %Proto.Produce.Request{ @@ -519,7 +558,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do end test "stream blocks until new messages are available", %{client: client} do - topic_name = TestHelper.generate_random_string() + topic_name = KafkaEx.TestHelpers.generate_random_string() KafkaEx.produce( %Proto.Produce.Request{ @@ -570,7 +609,7 @@ defmodule KafkaEx.KayrockCompatibilityTest do end test "stream is non-blocking with no_wait_at_logend", %{client: client} do - topic_name = TestHelper.generate_random_string() + topic_name = KafkaEx.TestHelpers.generate_random_string() KafkaEx.produce( %Proto.Produce.Request{ @@ -606,13 +645,25 @@ defmodule KafkaEx.KayrockCompatibilityTest do end test "doesn't error when re-creating an existing stream", %{client: client} do - random_string = TestHelper.generate_random_string() + random_string = KafkaEx.TestHelpers.generate_random_string() KafkaEx.stream(random_string, 0, offset: 0, worker_name: client) KafkaEx.stream(random_string, 0, offset: 0, worker_name: client) end test "produce with the default partitioner works", %{client: client} do - topic = TestHelper.generate_random_string() + topic = KafkaEx.TestHelpers.generate_random_string() :ok = KafkaEx.produce(topic, nil, "hello", worker_name: client) end + + # ----------------------------------------------------------------------------- + defp join_to_group(client, topic, consumer_group) do + request = %KafkaEx.Protocol.JoinGroup.Request{ + group_name: consumer_group, + member_id: "", + topics: [topic], + session_timeout: 6000 + } + + KafkaEx.join_group(request, worker_name: client, timeout: 10000) + end end diff --git a/test/integration/kayrock/offset_test.exs b/test/integration/kayrock/offset_test.exs index 9df47510..71ce7ca0 100644 --- a/test/integration/kayrock/offset_test.exs +++ b/test/integration/kayrock/offset_test.exs @@ -19,7 +19,7 @@ defmodule KafkaEx.KayrockOffsetTest do test "offset commit v0 and fetch v0", %{client: client} do topic = "food" consumer_group = "commit_v0_fetch_v0" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -61,7 +61,7 @@ defmodule KafkaEx.KayrockOffsetTest do test "offset commit v1 and fetch v0", %{client: client} do topic = "food" consumer_group = "commit_v1_fetch_v0" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -106,7 +106,7 @@ defmodule KafkaEx.KayrockOffsetTest do test "offset commit v1 and fetch v1", %{client: client} do topic = "food" consumer_group = "commit_v1_fetch_v1" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -148,7 +148,7 @@ defmodule KafkaEx.KayrockOffsetTest do test "offset commit v0 and fetch v1", %{client: client} do topic = "food" consumer_group = "commit_v0_fetch_v1" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -191,7 +191,7 @@ defmodule KafkaEx.KayrockOffsetTest do test "offset commit v0 and fetch v2", %{client: client} do topic = "food" consumer_group = "commit_v0_fetch_v2" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -234,7 +234,7 @@ defmodule KafkaEx.KayrockOffsetTest do test "offset commit v0 and fetch v3", %{client: client} do topic = "food" consumer_group = "commit_v0_fetch_v3" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -277,7 +277,7 @@ defmodule KafkaEx.KayrockOffsetTest do test "offset commit v2 and fetch v2", %{client: client} do topic = "food" consumer_group = "commit_v2_fetch_v2" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -319,7 +319,7 @@ defmodule KafkaEx.KayrockOffsetTest do test "offset commit v3 and fetch v3", %{client: client} do topic = "food" consumer_group = "commit_v3_fetch_v3" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( diff --git a/test/integration/kayrock/record_batch_test.exs b/test/integration/kayrock/record_batch_test.exs index 08a35eb7..830d5d4f 100644 --- a/test/integration/kayrock/record_batch_test.exs +++ b/test/integration/kayrock/record_batch_test.exs @@ -15,7 +15,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do test "can specify protocol version for fetch - v3", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -44,7 +44,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do test "fetch empty message set - v3", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -71,7 +71,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do # v2 is the highest that will accept the MessageSet format test "can specify protocol version for produce - v2", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -101,7 +101,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do test "can specify protocol version for fetch - v5", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -130,7 +130,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do test "fetch empty message set - v5", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -157,7 +157,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do # v3 is the lowest that requires the RecordBatch format test "can specify protocol version for produce - v3", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -240,7 +240,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do test "gzip compression - produce v0, fetch v3", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -271,7 +271,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do test "gzip compression - produce v0, fetch v5", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -302,7 +302,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do test "gzip compression - produce v3, fetch v0", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -333,7 +333,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do test "gzip compression - produce v3, fetch v3", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -364,7 +364,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do test "gzip compression - produce v3, fetch v5", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -395,7 +395,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do test "snappy compression - produce v0, fetch v3", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -426,7 +426,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do test "snappy compression - produce v0, fetch v5", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -457,7 +457,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do test "snappy compression - produce v3, fetch v0", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -488,7 +488,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do test "snappy compression - produce v3, fetch v3", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -519,7 +519,7 @@ defmodule KafkaEx.KayrockRecordBatchTest do test "snappy compression - produce v3, fetch v5", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( diff --git a/test/integration/kayrock/timestamp_test.exs b/test/integration/kayrock/timestamp_test.exs index a6a45ede..c9527bec 100644 --- a/test/integration/kayrock/timestamp_test.exs +++ b/test/integration/kayrock/timestamp_test.exs @@ -19,7 +19,7 @@ defmodule KafkaEx.KayrockTimestampTest do test "fetch timestamp is nil by default on v0 messages", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -49,7 +49,7 @@ defmodule KafkaEx.KayrockTimestampTest do test "fetch timestamp is -1 by default on v3 messages", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -79,7 +79,7 @@ defmodule KafkaEx.KayrockTimestampTest do test "fetch timestamp is -1 by default on v5 messages", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -111,12 +111,12 @@ defmodule KafkaEx.KayrockTimestampTest do topic = "test_log_append_timestamp_#{:rand.uniform(2_000_000)}" {:ok, ^topic} = - TestHelper.ensure_append_timestamp_topic( + KafkaEx.TestHelpers.ensure_append_timestamp_topic( client, topic ) - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -148,12 +148,12 @@ defmodule KafkaEx.KayrockTimestampTest do topic = "test_log_append_timestamp_#{:rand.uniform(2_000_000)}" {:ok, ^topic} = - TestHelper.ensure_append_timestamp_topic( + KafkaEx.TestHelpers.ensure_append_timestamp_topic( client, topic ) - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -186,12 +186,12 @@ defmodule KafkaEx.KayrockTimestampTest do topic = "test_log_append_timestamp_#{:rand.uniform(2_000_000)}" {:ok, ^topic} = - TestHelper.ensure_append_timestamp_topic( + KafkaEx.TestHelpers.ensure_append_timestamp_topic( client, topic ) - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -223,7 +223,7 @@ defmodule KafkaEx.KayrockTimestampTest do test "set timestamp with v0 throws an error", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() Process.flag(:trap_exit, true) @@ -245,7 +245,7 @@ defmodule KafkaEx.KayrockTimestampTest do test "set timestamp with v1 throws an error", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() Process.flag(:trap_exit, true) @@ -266,7 +266,7 @@ defmodule KafkaEx.KayrockTimestampTest do test "set timestamp for v3 message, fetch v0", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -298,7 +298,7 @@ defmodule KafkaEx.KayrockTimestampTest do test "set timestamp for v3 message, fetch v3", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( @@ -330,7 +330,7 @@ defmodule KafkaEx.KayrockTimestampTest do test "set timestamp for v3 message, fetch v5", %{client: client} do topic = "food" - msg = TestHelper.generate_random_string() + msg = KafkaEx.TestHelpers.generate_random_string() {:ok, offset} = KafkaEx.produce( diff --git a/test/integration/new_client_test.exs b/test/integration/new_client_test.exs index 821bea0b..f4556bc4 100644 --- a/test/integration/new_client_test.exs +++ b/test/integration/new_client_test.exs @@ -1,5 +1,6 @@ defmodule KafkaEx.New.Client.Test do use ExUnit.Case + import KafkaEx.TestHelpers alias KafkaEx.New.Client @@ -22,6 +23,37 @@ defmodule KafkaEx.New.Client.Test do {:ok, %{client: pid}} end + describe "describe_groups/1" do + setup do + consumer_group = generate_random_string() + topic = "new_client_implementation" + + {:ok, %{consumer_group: consumer_group, topic: topic}} + end + + test "returns group metadata for single consumer group", %{ + consumer_group: consumer_group, + topic: topic, + client: client + } do + join_to_group(client, topic, consumer_group) + + {:ok, [group_metadata]} = GenServer.call(client, {:describe_groups, [consumer_group]}) + + assert group_metadata.group_id == consumer_group + assert group_metadata.protocol_type == "consumer" + assert group_metadata.protocol == "" + assert length(group_metadata.members) == 1 + end + + test "returns dead when consumer group does not exist", %{client: client} do + {:ok, [group_metadata]} = GenServer.call(client, {:describe_groups, ["non-existing-group"]}) + + assert group_metadata.group_id == "non-existing-group" + assert group_metadata.state == "Dead" + end + end + test "update metadata", %{client: client} do {:ok, updated_metadata} = GenServer.call(client, :update_metadata) %ClusterMetadata{topics: topics} = updated_metadata @@ -159,7 +191,7 @@ defmodule KafkaEx.New.Client.Test do test "client can receive {:ssl_closed, _}", %{client: client} do send(client, {:ssl_closed, :unused}) - TestHelper.wait_for(fn -> + KafkaEx.TestHelpers.wait_for(fn -> {:message_queue_len, m} = Process.info(client, :message_queue_len) m == 0 end) @@ -170,11 +202,23 @@ defmodule KafkaEx.New.Client.Test do test "client can receive {:tcp_closed, _}", %{client: client} do send(client, {:tcp_closed, :unused}) - TestHelper.wait_for(fn -> + KafkaEx.TestHelpers.wait_for(fn -> {:message_queue_len, m} = Process.info(client, :message_queue_len) m == 0 end) assert Process.alive?(client) end + + # ------------------------------------------------------------------------------------------------ + defp join_to_group(client, topic, consumer_group) do + request = %KafkaEx.Protocol.JoinGroup.Request{ + group_name: consumer_group, + member_id: "", + topics: [topic], + session_timeout: 6000 + } + + KafkaEx.join_group(request, worker_name: client, timeout: 10000) + end end diff --git a/test/integration/server0_p_10_and_later_test.exs b/test/integration/server0_p_10_and_later_test.exs index bc30266d..2375f778 100644 --- a/test/integration/server0_p_10_and_later_test.exs +++ b/test/integration/server0_p_10_and_later_test.exs @@ -1,6 +1,6 @@ defmodule KafkaEx.Server0P10P1AndLater.Test do use ExUnit.Case - import TestHelper + import KafkaEx.TestHelpers @moduletag :server_0_p_10_and_later @num_partitions 10 diff --git a/test/integration/server0_p_8_p_0_test.exs b/test/integration/server0_p_8_p_0_test.exs index 3019a0cf..579f9616 100644 --- a/test/integration/server0_p_8_p_0_test.exs +++ b/test/integration/server0_p_8_p_0_test.exs @@ -1,6 +1,6 @@ defmodule KafkaEx.Server0P8P0.Test do use ExUnit.Case - import TestHelper + import KafkaEx.TestHelpers @moduletag :server_0_p_8_p_0 diff --git a/test/integration/server0_p_9_p_0_test.exs b/test/integration/server0_p_9_p_0_test.exs index a50c4d7c..150b0358 100644 --- a/test/integration/server0_p_9_p_0_test.exs +++ b/test/integration/server0_p_9_p_0_test.exs @@ -1,6 +1,6 @@ defmodule KafkaEx.Server0P9P0.Test do use ExUnit.Case - import TestHelper + import KafkaEx.TestHelpers alias KafkaEx.Protocol.Heartbeat.Request, as: HeartbeatRequest alias KafkaEx.Protocol.JoinGroup.Request, as: JoinGroupRequest diff --git a/test/kafka_ex/network_client_test.exs b/test/kafka_ex/network_client_test.exs index 29fa58f3..c3da6db5 100644 --- a/test/kafka_ex/network_client_test.exs +++ b/test/kafka_ex/network_client_test.exs @@ -1,5 +1,6 @@ defmodule KafkaEx.NetworkClientTest do use ExUnit.Case, async: true + import KafkaEx.TestHelpers use Hammox.Protect, module: KafkaEx.NetworkClient, @@ -28,22 +29,26 @@ defmodule KafkaEx.NetworkClientTest do describe "create_socket/3" do setup do - pid = KafkaEx.TestSupport.Server.start(3040) + port = get_free_port(3040) + pid = KafkaEx.TestSupport.Server.start(port) on_exit(fn -> Process.exit(pid, :normal) end) + + {:ok, [port: port]} end - test "creates a socket" do - kafka_ex_socket = create_socket("localhost", 3040, [], false) + test "creates a socket", %{port: port} do + kafka_ex_socket = create_socket("localhost", port, [], false) assert kafka_ex_socket.socket assert kafka_ex_socket.ssl == false end test "returns nil if socket creation fails" do - assert nil == create_socket("localhost", 3002, [], true) + port = get_free_port(3040) + assert nil == create_socket("localhost", port, [], true) end end diff --git a/test/kafka_ex/new/client/request_builder_test.exs b/test/kafka_ex/new/client/request_builder_test.exs new file mode 100644 index 00000000..0f06ca92 --- /dev/null +++ b/test/kafka_ex/new/client/request_builder_test.exs @@ -0,0 +1,20 @@ +defmodule KafkaEx.New.Client.RequestBuilderTest do + use ExUnit.Case, async: true + + alias KafkaEx.New.Client.RequestBuilder + + describe "describe_groups_request/2" do + test "returns request for DescribeGroups API" do + state = %KafkaEx.New.Client.State{api_versions: %{describe_groups: 1}} + group_names = ["group1", "group2"] + + expected_request = %Kayrock.DescribeGroups.V1.Request{ + group_ids: group_names + } + + request = RequestBuilder.describe_groups_request(group_names, state) + + assert expected_request == request + end + end +end diff --git a/test/kafka_ex/new/protocols/kayrock/describe_groups/request_test.exs b/test/kafka_ex/new/protocols/kayrock/describe_groups/request_test.exs new file mode 100644 index 00000000..18a900c4 --- /dev/null +++ b/test/kafka_ex/new/protocols/kayrock/describe_groups/request_test.exs @@ -0,0 +1,29 @@ +defmodule KafkaEx.New.Protocols.Kayrock.DescribeGroups.RequestTest do + use ExUnit.Case, async: true + + alias KafkaEx.New.Protocols.Kayrock.DescribeGroups.Request, + as: DescribeGroupsRequest + + alias Kayrock.DescribeGroups.V0 + alias Kayrock.DescribeGroups.V1 + + describe "build_request/2" do + test "for api version 0 - builds describe group request" do + groups = ["group1", "group2"] + + expected_request = %V0.Request{group_ids: groups} + + assert expected_request == + DescribeGroupsRequest.build_request(%V0.Request{}, groups) + end + + test "for api version 1 - builds describe group request" do + groups = ["group1", "group2"] + + expected_request = %V1.Request{group_ids: groups} + + assert expected_request == + DescribeGroupsRequest.build_request(%V1.Request{}, groups) + end + end +end diff --git a/test/kafka_ex/new/protocols/kayrock/describe_groups/response_test.exs b/test/kafka_ex/new/protocols/kayrock/describe_groups/response_test.exs new file mode 100644 index 00000000..717d30d8 --- /dev/null +++ b/test/kafka_ex/new/protocols/kayrock/describe_groups/response_test.exs @@ -0,0 +1,124 @@ +defmodule KafkaEx.New.Protocols.Kayrock.DescribeGroups.ResponseTest do + use ExUnit.Case, async: true + + alias KafkaEx.New.Protocols.Kayrock.DescribeGroups.Response, + as: DescribeGroupsResponse + + alias Kayrock.DescribeGroups.V0 + alias Kayrock.DescribeGroups.V1 + + describe "parse_response/1" do + @expected_group %KafkaEx.New.Structs.ConsumerGroup{ + group_id: "succeeded", + state: "stable", + protocol_type: "protocol_type", + protocol: "protocol", + members: [ + %KafkaEx.New.Structs.ConsumerGroup.Member{ + member_id: "member_id", + client_id: "client_id", + client_host: "client_host", + member_metadata: "member_metadata", + member_assignment: %KafkaEx.New.Structs.ConsumerGroup.Member.MemberAssignment{ + version: 0, + user_data: "user_data", + partition_assignments: [ + %KafkaEx.New.Structs.ConsumerGroup.Member.MemberAssignment.PartitionAssignment{ + topic: "test-topic", + partitions: [1, 2, 3] + } + ] + } + } + ] + } + + test "for api version 0 - returns response if all groups succeeded" do + response = %V0.Response{ + groups: [ + %{ + group_id: "succeeded", + error_code: 0, + state: "stable", + protocol_type: "protocol_type", + protocol: "protocol", + members: [ + %{ + member_id: "member_id", + client_id: "client_id", + client_host: "client_host", + member_metadata: "member_metadata", + member_assignment: %{ + version: 0, + user_data: "user_data", + partition_assignments: [ + %{topic: "test-topic", partitions: [1, 2, 3]} + ] + } + } + ] + } + ] + } + + assert {:ok, [@expected_group]} == + DescribeGroupsResponse.parse_response(response) + end + + test "for api version 0 - returns error if any group failed" do + response = %V0.Response{ + groups: [ + %{group_id: "succeeded", error_code: 0}, + %{group_id: "failed", error_code: 1} + ] + } + + assert {:error, [{"failed", :offset_out_of_range}]} == + DescribeGroupsResponse.parse_response(response) + end + + test "for api version 1 - returns response if all groups succeeded" do + response = %V1.Response{ + groups: [ + %{ + group_id: "succeeded", + error_code: 0, + state: "stable", + protocol_type: "protocol_type", + protocol: "protocol", + members: [ + %{ + member_id: "member_id", + client_id: "client_id", + client_host: "client_host", + member_metadata: "member_metadata", + member_assignment: %{ + version: 0, + user_data: "user_data", + partition_assignments: [ + %{topic: "test-topic", partitions: [1, 2, 3]} + ] + } + } + ] + } + ] + } + + assert {:ok, [@expected_group]} == + DescribeGroupsResponse.parse_response(response) + end + + test "for api version 1 - returns error if any group failed" do + response = %V1.Response{ + groups: [ + %{group_id: "succeeded", error_code: 0}, + %{group_id: "failed", error_code: 1} + ] + } + + assert {:error, [{"failed", :offset_out_of_range}]} == + DescribeGroupsResponse.parse_response(response) + end + end +end diff --git a/test/kafka_ex/new/protocols/kayrock/describe_groups_test.exs b/test/kafka_ex/new/protocols/kayrock/describe_groups_test.exs new file mode 100644 index 00000000..eb07b7a5 --- /dev/null +++ b/test/kafka_ex/new/protocols/kayrock/describe_groups_test.exs @@ -0,0 +1,80 @@ +defmodule KafkaEx.New.Protocols.Kayrock.DescribeGroupsTest do + use ExUnit.Case, async: true + + alias KafkaEx.New.Protocols.Kayrock.DescribeGroups, as: KayrockDescribeGroups + + alias Kayrock.DescribeGroups.V0 + + describe "build_request/2" do + test "builds request for Describe Groups API" do + consumer_group_names = ["test-group"] + expected_request = %V0.Request{group_ids: consumer_group_names} + + assert KayrockDescribeGroups.Request.build_request( + %V0.Request{}, + consumer_group_names + ) == expected_request + end + end + + describe "build_response/1" do + test "for api version 0 - returns response if all groups succeeded" do + response = %V0.Response{ + groups: [ + %{ + group_id: "succeeded", + error_code: 0, + state: "stable", + protocol_type: "protocol_type", + protocol: "protocol", + members: [ + %{ + member_id: "member_id", + client_id: "client_id", + client_host: "client_host", + member_metadata: "member_metadata", + member_assignment: %{ + version: 0, + user_data: "user_data", + partition_assignments: [ + %{topic: "test-topic", partitions: [1, 2, 3]} + ] + } + } + ] + } + ] + } + + assert {:ok, + [ + %KafkaEx.New.Structs.ConsumerGroup{ + group_id: "succeeded", + state: "stable", + protocol_type: "protocol_type", + protocol: "protocol", + members: [ + %KafkaEx.New.Structs.ConsumerGroup.Member{ + member_id: "member_id", + client_id: "client_id", + client_host: "client_host", + member_metadata: "member_metadata", + member_assignment: + %KafkaEx.New.Structs.ConsumerGroup.Member.MemberAssignment{ + version: 0, + user_data: "user_data", + partition_assignments: [ + %KafkaEx.New.Structs.ConsumerGroup.Member.MemberAssignment.PartitionAssignment{ + topic: "test-topic", + partitions: [1, 2, 3] + } + ] + } + } + ] + } + ]} == + KayrockDescribeGroups.Response.parse_response(response) + end + end +end diff --git a/test/kafka_ex/new/structs/broker_test.exs b/test/kafka_ex/new/structs/broker_test.exs index 6b9f397a..6b17219c 100644 --- a/test/kafka_ex/new/structs/broker_test.exs +++ b/test/kafka_ex/new/structs/broker_test.exs @@ -1,10 +1,12 @@ defmodule KafkaEx.New.Structs.BrokerTest do use ExUnit.Case, async: false + import KafkaEx.TestHelpers alias KafkaEx.New.Structs.Broker setup do - pid = KafkaEx.TestSupport.Server.start(3040) + port = get_free_port(3040) + pid = KafkaEx.TestSupport.Server.start(port) {:ok, socket} = KafkaEx.Socket.create(~c"localhost", 3040, [:binary, {:packet, 0}], false) @@ -13,7 +15,7 @@ defmodule KafkaEx.New.Structs.BrokerTest do Process.exit(pid, :normal) end) - {:ok, [socket: socket]} + {:ok, [socket: socket, port: port]} end describe "connect_broker/1" do @@ -65,9 +67,9 @@ defmodule KafkaEx.New.Structs.BrokerTest do refute Broker.has_socket?(broker, socket) end - test "returns false if broker has different socket", %{socket: socket_one} do + test "returns false if broker has different socket", %{socket: socket_one, port: port} do {:ok, socket_two} = - KafkaEx.Socket.create(~c"localhost", 3040, [:binary, {:packet, 0}], false) + KafkaEx.Socket.create(~c"localhost", port, [:binary, {:packet, 0}], false) broker = %Broker{socket: nil} |> Broker.put_socket(socket_one) diff --git a/test/kafka_ex/new/structs/consumer_group/member_assignment_test.exs b/test/kafka_ex/new/structs/consumer_group/member_assignment_test.exs new file mode 100644 index 00000000..e9d2ade8 --- /dev/null +++ b/test/kafka_ex/new/structs/consumer_group/member_assignment_test.exs @@ -0,0 +1,42 @@ +defmodule KafkaEx.New.Structs.ConsumerGroup.Member.MemberAssignmentTest do + use ExUnit.Case, async: true + + alias KafkaEx.New.Structs.ConsumerGroup.Member.MemberAssignment + + describe "from_describe_group_response/1" do + test "returns a MemberAssignment struct without partitions" do + response = %{ + version: 0, + user_data: "user_data", + partition_assignments: [] + } + + assert %MemberAssignment{} = MemberAssignment.from_describe_group_response(response) + end + + test "returns a MemberAssignment struct with partitions" do + response = %{ + version: 0, + user_data: "user_data", + partition_assignments: [ + %{ + topic: "topic", + partitions: [0, 1] + } + ] + } + + result = MemberAssignment.from_describe_group_response(response) + + assert 0 == result.version + assert "user_data" == result.user_data + + assert result.partition_assignments == [ + %MemberAssignment.PartitionAssignment{ + topic: "topic", + partitions: [0, 1] + } + ] + end + end +end diff --git a/test/kafka_ex/new/structs/consumer_group/member_test.exs b/test/kafka_ex/new/structs/consumer_group/member_test.exs new file mode 100644 index 00000000..ab02af3e --- /dev/null +++ b/test/kafka_ex/new/structs/consumer_group/member_test.exs @@ -0,0 +1,86 @@ +defmodule KafkaEx.New.Structs.ConsumerGroup.MemberTest do + use ExUnit.Case, async: true + + alias KafkaEx.New.Structs.ConsumerGroup.Member + + describe "from_describe_group_response/1" do + test "returns a Member struct" do + response = %{ + member_id: "member_id", + client_id: "client_id", + client_host: "client_host", + member_metadata: "member_metadata", + member_assignment: nil + } + + result = Member.from_describe_group_response(response) + + assert "member_id" == result.member_id + assert "client_id" == result.client_id + assert "client_host" == result.client_host + assert "member_metadata" == result.member_metadata + assert nil == result.member_assignment + end + + test "returns a Member struct with member_assignment" do + response = %{ + member_id: "member_id", + client_id: "client_id", + client_host: "client_host", + member_metadata: "member_metadata", + member_assignment: %{ + version: 0, + user_data: "user_data", + partition_assignments: [] + } + } + + result = Member.from_describe_group_response(response) + + assert "member_id" == result.member_id + assert "client_id" == result.client_id + assert "client_host" == result.client_host + assert "member_metadata" == result.member_metadata + + member_assignment = result.member_assignment + assert 0 == member_assignment.version + assert "user_data" == member_assignment.user_data + assert [] == member_assignment.partition_assignments + end + + test "returns a Member struct with member_assignment and partition_assignments" do + response = %{ + member_id: "member_id", + client_id: "client_id", + client_host: "client_host", + member_metadata: "member_metadata", + member_assignment: %{ + version: 0, + user_data: "user_data", + partition_assignments: [ + %{ + topic: "topic", + partitions: [0, 1] + } + ] + } + } + + result = Member.from_describe_group_response(response) + + assert "member_id" == result.member_id + assert "client_id" == result.client_id + assert "client_host" == result.client_host + assert "member_metadata" == result.member_metadata + + member_assignment = result.member_assignment + assert 0 == member_assignment.version + assert "user_data" == member_assignment.user_data + assert 1 == length(member_assignment.partition_assignments) + + partition_assignment = Enum.at(member_assignment.partition_assignments, 0) + assert "topic" == partition_assignment.topic + assert [0, 1] == partition_assignment.partitions + end + end +end diff --git a/test/kafka_ex/new/structs/consumer_group/partition_assignment_test.exs b/test/kafka_ex/new/structs/consumer_group/partition_assignment_test.exs new file mode 100644 index 00000000..54e3b38b --- /dev/null +++ b/test/kafka_ex/new/structs/consumer_group/partition_assignment_test.exs @@ -0,0 +1,15 @@ +defmodule KafkaEx.New.Structs.ConsumerGroup.Member.MemberAssignment.PartitionAssignmentTest do + use ExUnit.Case, async: true + + alias KafkaEx.New.Structs.ConsumerGroup.Member.MemberAssignment.PartitionAssignment + + describe "from_describe_group_response/1" do + test "builds a struct from a describe group response" do + response = %{topic: "test", partitions: [0, 1, 2]} + expected = %PartitionAssignment{topic: "test", partitions: [0, 1, 2]} + + assert expected == + PartitionAssignment.from_describe_group_response(response) + end + end +end diff --git a/test/kafka_ex/new/structs/consumer_group_test.exs b/test/kafka_ex/new/structs/consumer_group_test.exs new file mode 100644 index 00000000..cee03acd --- /dev/null +++ b/test/kafka_ex/new/structs/consumer_group_test.exs @@ -0,0 +1,58 @@ +defmodule KafkaEx.New.ConsumerGroupTest do + use ExUnit.Case, async: true + + alias KafkaEx.New.Structs.ConsumerGroup + + describe "from_describe_group_response/1" do + test "returns a consumer group struct" do + response = %{ + group_id: "test-group", + state: "Stable", + protocol_type: "consumer", + protocol: "range", + members: [ + %{ + member_id: "test-member", + client_id: "test-client", + client_host: "test-host", + member_metadata: "test-metadata", + member_assignment: %{ + version: 0, + user_data: "test-user-data", + partition_assignments: [ + %{topic: "test-topic", partitions: [1, 2, 3]} + ] + } + } + ] + } + + expected = %ConsumerGroup{ + group_id: "test-group", + state: "Stable", + protocol_type: "consumer", + protocol: "range", + members: [ + %ConsumerGroup.Member{ + member_id: "test-member", + client_id: "test-client", + client_host: "test-host", + member_metadata: "test-metadata", + member_assignment: %ConsumerGroup.Member.MemberAssignment{ + version: 0, + user_data: "test-user-data", + partition_assignments: [ + %ConsumerGroup.Member.MemberAssignment.PartitionAssignment{ + topic: "test-topic", + partitions: [1, 2, 3] + } + ] + } + } + ] + } + + assert ConsumerGroup.from_describe_group_response(response) == expected + end + end +end diff --git a/test/kafka_ex/socket_test.exs b/test/kafka_ex/socket_test.exs index be4e65c6..11ed5094 100644 --- a/test/kafka_ex/socket_test.exs +++ b/test/kafka_ex/socket_test.exs @@ -1,5 +1,6 @@ defmodule KafkaEx.Socket.Test do use ExUnit.Case, async: false + import KafkaEx.TestHelpers setup_all do {:ok, _} = Application.ensure_all_started(:ssl) @@ -7,8 +8,9 @@ defmodule KafkaEx.Socket.Test do describe "without SSL socket" do setup do - KafkaEx.TestSupport.Server.start(3040) - {:ok, [port: 3040]} + port = get_free_port(3040) + KafkaEx.TestSupport.Server.start(port) + {:ok, [port: port]} end test "create a non SSL socket", context do diff --git a/test/support/test_helpers.ex b/test/support/test_helpers.ex new file mode 100644 index 00000000..88cafad0 --- /dev/null +++ b/test/support/test_helpers.ex @@ -0,0 +1,208 @@ +defmodule KafkaEx.TestHelpers do + alias KafkaEx.New.Client + alias KafkaEx.New.Structs.NodeSelector + require Logger + + @doc """ + Returns a random string of length string_length. + """ + def generate_random_string(string_length \\ 20) do + 1..string_length + |> Enum.map(fn _ -> round(:rand.uniform() * 25 + 65) end) + |> to_string + end + + @doc """ + Returns a random port number that is not in use. + """ + def get_free_port(port) do + case :gen_tcp.listen(port, [:binary]) do + {:ok, socket} -> + :ok = :gen_tcp.close(socket) + port + + {:error, :eaddrinuse} -> + get_free_port(port + 1) + end + end + + @doc """ + Wait for the return value of value_getter to pass the predicate condn + ~> If condn does not pass, sleep for dwell msec and try again + ~> If condn does not pass after max_tries attempts, raises an error + """ + def wait_for_value(value_getter, condn, dwell \\ 500, max_tries \\ 200) do + wait_for_value(value_getter, condn, dwell, max_tries, 0) + end + + @doc """ + Wait for condn to return false or nil; passes through to wait_for_value + returns :ok on success + """ + def wait_for(condn, dwell \\ 500, max_tries \\ 200) do + wait_for_value(fn -> :ok end, fn :ok -> condn.() end, dwell, max_tries) + end + + @doc """ + Execute value_getter, which should return a list, and accumulate + the results until the accumulated results are at least min_length long + """ + def wait_for_accum(value_getter, min_length, dwell \\ 500, max_tries \\ 200) do + wait_for_accum(value_getter, [], min_length, dwell, max_tries) + end + + @doc """ + passthrough to wait_for_accum with 1 as the min_length - i.e., + wait for any response + """ + def wait_for_any(value_getter, dwell \\ 500, max_tries \\ 200) do + wait_for_accum(value_getter, 1, dwell, max_tries) + end + + @doc """ + Returns a list of the brokers in the cluster + """ + def uris do + Application.get_env(:kafka_ex, :brokers) + end + + def utc_time do + {x, {a, b, c}} = + :calendar.local_time() + |> :calendar.local_time_to_universal_time_dst() + |> hd + + {x, {a, b, c + 60}} + end + + def latest_offset_number(topic, partition_id, worker \\ :kafka_ex) do + offset = + KafkaEx.latest_offset(topic, partition_id, worker) + |> first_partition_offset + + offset || 0 + end + + def latest_consumer_offset_number( + topic, + partition, + consumer_group, + worker \\ :kafka_ex, + api_version \\ 0 + ) do + request = %KafkaEx.Protocol.OffsetFetch.Request{ + topic: topic, + partition: partition, + consumer_group: consumer_group, + api_version: api_version + } + + resp = KafkaEx.offset_fetch(worker, request) + resp |> KafkaEx.Protocol.OffsetFetch.Response.last_offset() + end + + def ensure_append_timestamp_topic(client, topic_name) do + resp = + Client.send_request( + client, + %Kayrock.CreateTopics.V0.Request{ + create_topic_requests: [ + %{ + topic: topic_name, + num_partitions: 4, + replication_factor: 1, + replica_assignment: [], + config_entries: [ + %{ + config_name: "message.timestamp.type", + config_value: "LogAppendTime" + } + ] + } + ], + timeout: 1000 + }, + NodeSelector.controller() + ) + + {:ok, + %Kayrock.CreateTopics.V0.Response{ + topic_errors: [%{error_code: error_code}] + }} = resp + + wait_for_topic_to_appear(client, topic_name) + + if error_code in [0, 36] do + {:ok, topic_name} + else + Logger.error("Unable to create topic #{topic_name}: #{inspect(resp)}") + {:error, topic_name} + end + end + + defp wait_for_topic_to_appear(_client, _topic_name, attempts \\ 10) + + defp wait_for_topic_to_appear(_client, _topic_name, attempts) + when attempts <= 0 do + raise "Timeout while waiting for topic to appear" + end + + defp wait_for_topic_to_appear(client, topic_name, attempts) do + {:ok, %{topic_metadata: topic_metadata}} = + Client.send_request( + client, + %Kayrock.Metadata.V0.Request{}, + NodeSelector.topic_partition(topic_name, 0) + ) + + topics = topic_metadata |> Enum.map(& &1.topic) + + unless topic_name in topics do + wait_for_topic_to_appear(client, topic_name, attempts - 1) + end + end + + defp first_partition_offset(:topic_not_found) do + nil + end + + defp first_partition_offset(response) do + [%KafkaEx.Protocol.Offset.Response{partition_offsets: partition_offsets}] = response + + first_partition = hd(partition_offsets) + first_partition.offset |> hd + end + + defp wait_for_value(_value_getter, _condn, _dwell, max_tries, n) + when n >= max_tries do + raise "too many tries waiting for condition" + end + + defp wait_for_value(value_getter, condn, dwell, max_tries, n) do + value = value_getter.() + + if condn.(value) do + value + else + :timer.sleep(dwell) + wait_for_value(value_getter, condn, dwell, max_tries, n + 1) + end + end + + defp wait_for_accum(_value_getter, acc, min_length, _dwell, _max_tries) + when length(acc) >= min_length do + acc + end + + defp wait_for_accum(value_getter, acc, min_length, dwell, max_tries) do + value = + wait_for_value( + value_getter, + fn v -> length(v) > 0 end, + dwell, + max_tries + ) + + wait_for_accum(value_getter, acc ++ value, min_length, dwell, max_tries) + end +end diff --git a/test/test_helper.exs b/test/test_helper.exs index 7b134f62..4059a618 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -12,178 +12,3 @@ ExUnit.configure( server_0_p_8_p_0: true ] ) - -defmodule TestHelper do - alias KafkaEx.New.Client - alias KafkaEx.New.Structs.NodeSelector - require Logger - - def generate_random_string(string_length \\ 20) do - 1..string_length - |> Enum.map(fn _ -> round(:rand.uniform() * 25 + 65) end) - |> to_string - end - - # Wait for the return value of value_getter to pass the predicate condn - # If condn does not pass, sleep for dwell msec and try again - # If condn does not pass after max_tries attempts, raises an error - def wait_for_value(value_getter, condn, dwell \\ 500, max_tries \\ 200) do - wait_for_value(value_getter, condn, dwell, max_tries, 0) - end - - # Wait for condn to return false or nil; passes through to wait_for_value - # returns :ok on success - def wait_for(condn, dwell \\ 500, max_tries \\ 200) do - wait_for_value(fn -> :ok end, fn :ok -> condn.() end, dwell, max_tries) - end - - # execute value_getter, which should return a list, and accumulate - # the results until the accumulated results are at least min_length long - def wait_for_accum(value_getter, min_length, dwell \\ 500, max_tries \\ 200) do - wait_for_accum(value_getter, [], min_length, dwell, max_tries) - end - - # passthrough to wait_for_accum with 1 as the min_length - i.e., - # wait for any response - def wait_for_any(value_getter, dwell \\ 500, max_tries \\ 200) do - wait_for_accum(value_getter, 1, dwell, max_tries) - end - - def uris do - Application.get_env(:kafka_ex, :brokers) - end - - def utc_time do - {x, {a, b, c}} = - :calendar.local_time() - |> :calendar.local_time_to_universal_time_dst() - |> hd - - {x, {a, b, c + 60}} - end - - def latest_offset_number(topic, partition_id, worker \\ :kafka_ex) do - offset = - KafkaEx.latest_offset(topic, partition_id, worker) - |> first_partition_offset - - offset || 0 - end - - def latest_consumer_offset_number( - topic, - partition, - consumer_group, - worker \\ :kafka_ex, - api_version \\ 0 - ) do - request = %KafkaEx.Protocol.OffsetFetch.Request{ - topic: topic, - partition: partition, - consumer_group: consumer_group, - api_version: api_version - } - - resp = KafkaEx.offset_fetch(worker, request) - resp |> KafkaEx.Protocol.OffsetFetch.Response.last_offset() - end - - def ensure_append_timestamp_topic(client, topic_name) do - resp = - Client.send_request( - client, - %Kayrock.CreateTopics.V0.Request{ - create_topic_requests: [ - %{ - topic: topic_name, - num_partitions: 4, - replication_factor: 1, - replica_assignment: [], - config_entries: [ - %{ - config_name: "message.timestamp.type", - config_value: "LogAppendTime" - } - ] - } - ], - timeout: 1000 - }, - NodeSelector.controller() - ) - - {:ok, - %Kayrock.CreateTopics.V0.Response{ - topic_errors: [%{error_code: error_code}] - }} = resp - - wait_for_topic_to_appear(client, topic_name) - - if error_code in [0, 36] do - {:ok, topic_name} - else - Logger.error("Unable to create topic #{topic_name}: #{inspect(resp)}") - {:error, topic_name} - end - end - - defp wait_for_topic_to_appear(_client, _topic_name, attempts \\ 10) - - defp wait_for_topic_to_appear(_client, _topic_name, attempts) - when attempts <= 0 do - raise "Timeout while waiting for topic to appear" - end - - defp wait_for_topic_to_appear(client, topic_name, attempts) do - {:ok, %{topic_metadata: topic_metadata}} = - Client.send_request( - client, - %Kayrock.Metadata.V0.Request{}, - NodeSelector.topic_partition(topic_name, 0) - ) - - topics = topic_metadata |> Enum.map(& &1.topic) - - unless topic_name in topics do - wait_for_topic_to_appear(client, topic_name, attempts - 1) - end - end - - defp first_partition_offset(:topic_not_found) do - nil - end - - defp first_partition_offset(response) do - [%KafkaEx.Protocol.Offset.Response{partition_offsets: partition_offsets}] = response - - first_partition = hd(partition_offsets) - first_partition.offset |> hd - end - - defp wait_for_value(_value_getter, _condn, _dwell, max_tries, n) - when n >= max_tries do - raise "too many tries waiting for condition" - end - - defp wait_for_value(value_getter, condn, dwell, max_tries, n) do - value = value_getter.() - - if condn.(value) do - value - else - :timer.sleep(dwell) - wait_for_value(value_getter, condn, dwell, max_tries, n + 1) - end - end - - defp wait_for_accum(_value_getter, acc, min_length, _dwell, _max_tries) - when length(acc) >= min_length do - acc - end - - defp wait_for_accum(value_getter, acc, min_length, dwell, max_tries) do - value = wait_for_value(value_getter, fn v -> length(v) > 0 end, dwell, max_tries) - - wait_for_accum(value_getter, acc ++ value, min_length, dwell, max_tries) - end -end