Skip to content

Commit

Permalink
Merge pull request #290 from kafkaex/iolist_cleanup
Browse files Browse the repository at this point in the history
Iolist cleanup
  • Loading branch information
joshuawscott authored May 23, 2018
2 parents cdbeb27 + 10a04d9 commit 808d9e2
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 56 deletions.
134 changes: 82 additions & 52 deletions lib/kafka_ex/protocol/produce.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,40 @@ defmodule KafkaEx.Protocol.Produce do

defmodule Request do
@moduledoc """
- require_acks: indicates how many acknowledgements the servers should receive before responding to the request. If it is 0 the server will not send any response (this is the only case where the server will not reply to a request). If it is 1, the server will wait the data is written to the local log before sending a response. If it is -1 the server will block until the message is committed by all in sync replicas before sending a response. For any number > 1 the server will block waiting for this number of acknowledgements to occur (but the server will never wait for more acknowledgements than there are in-sync replicas), default is 0
- timeout: provides a maximum time in milliseconds the server can await the receipt of the number of acknowledgements in RequiredAcks, default is 100 milliseconds
- require_acks: indicates how many acknowledgements the servers should
receive before responding to the request. If it is 0 the server will not
send any response (this is the only case where the server will not reply
to a request). If it is 1, the server will wait the data is written to the
local log before sending a response. If it is -1 the server will block until
the message is committed by all in sync replicas before sending a response.
For any number > 1 the server will block waiting for this number of
acknowledgements to occur (but the server will never wait for more
acknowledgements than there are in-sync replicas), default is 0
- timeout: provides a maximum time in milliseconds the server can await the
receipt of the number of acknowledgements in RequiredAcks, default is 100
milliseconds
"""
defstruct topic: nil, partition: nil, required_acks: 0, timeout: 0, compression: :none, messages: []
@type t :: %Request{topic: binary, partition: integer, required_acks: integer, timeout: integer, compression: atom, messages: list}
defstruct topic: nil,
partition: nil,
required_acks: 0,
timeout: 0,
compression: :none,
messages: []

@type t :: %Request{
topic: binary,
partition: integer,
required_acks: integer,
timeout: integer,
compression: atom,
messages: list
}
end

defmodule Message do
@moduledoc """
- key: is used for partition assignment, can be nil, when none is provided it is defaulted to nil
- key: is used for partition assignment, can be nil, when none is provided
it is defaulted to nil
- value: is the message to be written to kafka logs.
"""
defstruct key: nil, value: nil
Expand All @@ -31,73 +55,79 @@ defmodule KafkaEx.Protocol.Produce do
@type t :: %Response{topic: binary, partitions: list}
end

def create_request(
correlation_id,
client_id,
%Request{compression: compression, messages: messages} = request
) do
message_set = create_message_set(messages, compression)
header = produce_header(correlation_id, client_id, request, message_set)
header <> message_set
def create_request(correlation_id, client_id, %Request{
topic: topic,
partition: partition,
required_acks: required_acks,
timeout: timeout,
compression: compression,
messages: messages
}) do
{message_set, mssize} = create_message_set(messages, compression)

[
KafkaEx.Protocol.create_request(:produce, correlation_id, client_id),
<<required_acks::16-signed, timeout::32-signed, 1::32-signed>>,
<<byte_size(topic)::16-signed, topic::binary, 1::32-signed,
partition::32-signed, mssize::32-signed>>,
message_set
]
end

def parse_response(<< _correlation_id :: 32-signed, num_topics :: 32-signed, rest :: binary >>), do: parse_topics(num_topics, rest, __MODULE__)
def parse_response(
<<_correlation_id::32-signed, num_topics::32-signed, rest::binary>>
),
do: parse_topics(num_topics, rest, __MODULE__)

def parse_response(unknown), do: unknown

defp create_message_set([], _compression_type), do: ""
defp create_message_set([], _compression_type), do: {"", 0}
defp create_message_set([%Message{key: key, value: value}|messages], :none) do
message = create_message(value, key)
message_set = message_set_header(message) <> message
message_set <> create_message_set(messages, :none)
{message, msize} = create_message(value, key)
message_set = [<< 0 :: 64-signed >>, << msize :: 32-signed >>, message]
{message_set2, ms2size} = create_message_set(messages, :none)
{[message_set, message_set2], 8 + 4 + msize + ms2size}
end
defp create_message_set(messages, compression_type) do
message_set = create_message_set(messages, :none)
{message_set, _} = create_message_set(messages, :none)
{compressed_message_set, attribute} =
Compression.compress(compression_type, message_set)
message = create_message(compressed_message_set, nil, attribute)

message_set_header(message) <> message
end
{message, msize} = create_message(compressed_message_set, nil, attribute)

defp produce_header(
correlation_id,
client_id,
%Request{
topic: topic,
partition: partition,
required_acks: required_acks,
timeout: timeout
},
message_set
) do
KafkaEx.Protocol.create_request(:produce, correlation_id, client_id) <>
<< required_acks :: 16-signed, timeout :: 32-signed, 1 :: 32-signed >> <>
<< byte_size(topic) :: 16-signed, topic :: binary >> <>
<< 1 :: 32-signed, partition :: 32-signed >> <>
<< byte_size(message_set) :: 32-signed >>
end

defp message_set_header(message) do
<< 0 :: 64-signed, byte_size(message) :: 32-signed >>
{[<< 0 :: 64-signed >>, << msize :: 32-signed >>, message], 8 + 4 + msize}
end

defp create_message(value, key, attributes \\ 0) do
sub = << 0 :: 8, attributes :: 8-signed >> <> bytes(key) <> bytes(value)
{bkey, skey} = bytes(key)
{bvalue, svalue} = bytes(value)
sub = [<< 0 :: 8, attributes :: 8-signed >>, bkey, bvalue]
crc = :erlang.crc32(sub)
<< crc :: 32 >> <> sub
{[<< crc :: 32 >>, sub], 4 + 2 + skey + svalue}
end

defp bytes(nil), do: << -1 :: 32-signed >>
defp bytes(nil), do: {<< -1 :: 32-signed >>, 4}
defp bytes(data) do
case byte_size(data) do
0 -> << 0 :: 32 >>
size -> << size :: 32, data :: binary >>
case :erlang.iolist_size(data) do
0 -> {<< 0 :: 32 >>, 4}
size -> {[<< size :: 32>> , data], 4 + size}
end
end

def parse_partitions(0, rest, partitions), do: {partitions, rest}
def parse_partitions(partitions_size, << partition :: 32-signed, error_code :: 16-signed, offset :: 64, rest :: binary >>, partitions) do
parse_partitions(partitions_size - 1, rest, [%{partition: partition, error_code: Protocol.error(error_code), offset: offset} | partitions])
end

def parse_partitions(
partitions_size,
<<partition::32-signed, error_code::16-signed, offset::64,
rest::binary>>,
partitions
) do
parse_partitions(partitions_size - 1, rest, [
%{
partition: partition,
error_code: Protocol.error(error_code),
offset: offset
}
| partitions
])
end
end
9 changes: 5 additions & 4 deletions test/protocol/produce_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ defmodule KafkaEx.Protocol.Produce.Test do
]
})

assert expected_request == request
assert expected_request == :erlang.iolist_to_binary(request)
end

test "create_request correctly batches multiple request messages" do
Expand All @@ -24,7 +24,7 @@ defmodule KafkaEx.Protocol.Produce.Test do
]
})

assert expected_request == request
assert expected_request == :erlang.iolist_to_binary(request)
end

test "create_request correctly encodes messages with gzip" do
Expand All @@ -46,7 +46,8 @@ defmodule KafkaEx.Protocol.Produce.Test do
messages: messages
}

request = KafkaEx.Protocol.Produce.create_request(1, client_id, produce)
iolist_request = KafkaEx.Protocol.Produce.create_request(1, client_id, produce)
request = :erlang.iolist_to_binary(iolist_request)

# The exact binary contents of the message can change as zlib changes,
# but they should remain compatible. We test this by splitting the binary
Expand Down Expand Up @@ -100,7 +101,7 @@ defmodule KafkaEx.Protocol.Produce.Test do
"compression_client_test",
produce)

assert expected_request == request
assert expected_request == :erlang.iolist_to_binary(request)
end

test "parse_response correctly parses a valid response with single topic and partition" do
Expand Down

0 comments on commit 808d9e2

Please sign in to comment.