diff --git a/lib/thrift/binary/framed/client.ex b/lib/thrift/binary/framed/client.ex index 18be648b..f761353e 100644 --- a/lib/thrift/binary/framed/client.ex +++ b/lib/thrift/binary/framed/client.ex @@ -21,7 +21,7 @@ defmodule Thrift.Binary.Framed.Client do alias Thrift.TApplicationException alias Thrift.Transport.SSL - @immutable_tcp_opts [active: false, packet: 4, mode: :binary] + @immutable_tcp_opts [active: true, packet: 4, mode: :binary] @type error :: {:error, atom} | {:error, {:exception, struct}} @type success :: {:ok, binary} @@ -42,6 +42,7 @@ defmodule Thrift.Binary.Framed.Client do {:tcp_opts, [tcp_option]} | {:ssl_opts, [SSL.option()]} | {:gen_server_opts, [genserver_call_option]} + | {:reconnect, boolean} @type options :: [option] @@ -55,7 +56,8 @@ defmodule Thrift.Binary.Framed.Client do ssl_opts: [SSL.option()], timeout: integer, sock: {:gen_tcp, :gen_tcp.socket()} | {:ssl, :ssl.sslsocket()}, - seq_id: integer + seq_id: integer, + reconnect: boolean } defstruct host: nil, port: nil, @@ -64,7 +66,8 @@ defmodule Thrift.Binary.Framed.Client do ssl_opts: nil, timeout: 5000, sock: nil, - seq_id: 0 + seq_id: 0, + reconnect: false end require Logger @@ -74,6 +77,7 @@ defmodule Thrift.Binary.Framed.Client do def init({host, port, opts}) do tcp_opts = Keyword.get(opts, :tcp_opts, []) ssl_opts = Keyword.get(opts, :ssl_opts, []) + reconnect = Keyword.get(opts, :reconnect, false) {timeout, tcp_opts} = Keyword.pop(tcp_opts, :timeout, 5000) @@ -82,7 +86,8 @@ defmodule Thrift.Binary.Framed.Client do port: port, tcp_opts: tcp_opts, ssl_opts: ssl_opts, - timeout: timeout + timeout: timeout, + reconnect: reconnect } {:connect, :init, s} @@ -124,6 +129,9 @@ defmodule Thrift.Binary.Framed.Client do Additionally, the options `:name`, `:debug`, and `:spawn_opt`, if specified, will be passed to the underlying `GenServer`. See `GenServer.start_link/3` for details on these options. + + The `:reconnect` option if set to `true` forces client to reopen TCP connection whenever + it closed. """ @spec start_link(String.t(), 0..65_535, options) :: GenServer.on_start() def start_link(host, port, opts) do @@ -143,6 +151,9 @@ defmodule Thrift.Binary.Framed.Client do |> Keyword.merge(@immutable_tcp_opts) |> Keyword.put_new(:send_timeout, 1000) + # reset sequence id for newly created connection + s = %{s | seq_id: 0} + case :gen_tcp.connect(host, port, opts, timeout) do {:ok, sock} -> maybe_ssl_handshake(sock, host, port, s) @@ -158,10 +169,13 @@ defmodule Thrift.Binary.Framed.Client do end @impl Connection - def disconnect(info, %{sock: {transport, sock}}) do + def disconnect(info, %{sock: {transport, sock}} = s) do :ok = transport.close(sock) case info do + :reconnect -> + {:connect, info, %{s | sock: nil}} + {:close, from} -> Connection.reply(from, :ok) {:stop, :normal, nil} @@ -245,7 +259,7 @@ defmodule Thrift.Binary.Framed.Client do def handle_call( {:call, rpc_name, serialized_args, tcp_opts}, - _, + _from, %{sock: {transport, sock}, seq_id: seq_id, timeout: default_timeout} = s ) do s = %{s | seq_id: seq_id + 1} @@ -253,7 +267,7 @@ defmodule Thrift.Binary.Framed.Client do timeout = Keyword.get(tcp_opts, :timeout, default_timeout) with :ok <- transport.send(sock, [message | serialized_args]), - {:ok, message} <- transport.recv(sock, 0, timeout) do + {:ok, message} <- receive_message(transport, sock, timeout) do reply = deserialize_message_reply(message, rpc_name, seq_id) {:reply, reply, s} else @@ -291,10 +305,59 @@ defmodule Thrift.Binary.Framed.Client do end end + @impl Connection + def handle_info({:tcp, sock, _data}, %{reconnect: true, sock: {:gen_tcp, sock}} = s) do + {:disconnect, :reconnect, s} + end + + def handle_info({:tcp_closed, sock}, %{reconnect: true, sock: {:gen_tcp, sock}} = s) do + {:disconnect, :reconnect, s} + end + + def handle_info({:tcp_error, sock, _error}, %{reconnect: true, sock: {:gen_tcp, sock}} = s) do + {:disconnect, :reconnect, s} + end + + def handle_info({:ssl, sock, _data}, %{reconnect: true, sock: {:ssl, sock}} = s) do + {:disconnect, :reconnect, s} + end + + def handle_info({:ssl_closed, sock}, %{reconnect: true, sock: {:ssl, sock}} = s) do + {:disconnect, :reconnect, s} + end + + def handle_info({:ssl_error, sock, _error}, %{reconnect: true, sock: {:ssl, sock}} = s) do + {:disconnect, :reconnect, s} + end + + def handle_info(_, s) do + {:noreply, s} + end + def deserialize_message_reply(message, rpc_name, seq_id) do handle_message(Binary.deserialize(:message_begin, message), seq_id, rpc_name) end + defp receive_message(:gen_tcp, sock, timeout) do + receive do + {:tcp, ^sock, data} -> {:ok, data} + {:tcp_closed, ^sock} -> {:error, :closed} + {:tcp_error, ^sock, error} -> {:error, error} + after + timeout -> {:error, :timeout} + end + end + + defp receive_message(:ssl, sock, timeout) do + receive do + {:ssl, ^sock, data} -> {:ok, data} + {:ssl_closed, ^sock} -> {:error, :closed} + {:ssl_error, ^sock, error} -> {:error, error} + after + timeout -> {:error, :timeout} + end + end + defp handle_message({:ok, {:reply, seq_id, rpc_name, serialized_response}}, seq_id, rpc_name) do {:ok, serialized_response} end diff --git a/mix.lock b/mix.lock index de28de65..4651a56f 100644 --- a/mix.lock +++ b/mix.lock @@ -14,7 +14,7 @@ "makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm"}, "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm"}, - "nimble_parsec": {:hex, :nimble_parsec, "0.5.1", "c90796ecee0289dbb5ad16d3ad06f957b0cd1199769641c961cfe0b97db190e0", [], [], "hexpm"}, + "nimble_parsec": {:hex, :nimble_parsec, "0.5.1", "c90796ecee0289dbb5ad16d3ad06f957b0cd1199769641c961cfe0b97db190e0", [:mix], [], "hexpm"}, "parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm"}, "ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.4", "f0eafff810d2041e93f915ef59899c923f4568f4585904d010387ed74988e77b", [:make, :mix, :rebar3], [], "hexpm"}, diff --git a/test/thrift/binary/framed/server_test.exs b/test/thrift/binary/framed/server_test.exs index 43b73e5c..2a359894 100644 --- a/test/thrift/binary/framed/server_test.exs +++ b/test/thrift/binary/framed/server_test.exs @@ -177,4 +177,40 @@ defmodule Servers.Binary.Framed.IntegrationTest do thrift_test "client methods can be called by name instead of pid", %{client_name: name} do assert {:ok, true} == Client.ping(name) end + + @ping_reply <<128, 1, 0, 2, 0, 0, 0, 4, 112, 105, 110, 103, 0, 0, 0, 0, 2, 0, 0, 1, 0>> + thrift_test "client can reconnect when connection closed by server", ctx do + {:ok, sock} = :gen_tcp.listen(0, [:binary, packet: 4, active: false]) + {:ok, port} = :inet.port(sock) + test_pid = self() + + # in the first connection we emulate disconnection by server + first_conn = + Task.async(fn -> + {:ok, conn} = :gen_tcp.accept(sock) + :ok = :gen_tcp.close(conn) + end) + + name = String.to_atom("#{ctx.client_name}_1") + {:ok, client} = Client.start_link("localhost", port, name: name, reconnect: true) + + # in the second connection we emulate reconnection to the same server port + second_conn = + Task.async(fn -> + {:ok, conn} = :gen_tcp.accept(sock) + send(test_pid, :connected) + {:ok, _} = :gen_tcp.recv(conn, 0) + :ok = :gen_tcp.send(conn, @ping_reply) + end) + + Task.await(first_conn) + # wait for sucessfull connection to re-opened port + :ok = + receive do + :connected -> :ok + end + + assert {:ok, true} == Client.ping(client) + Task.await(second_conn) + end end diff --git a/test/thrift/generator/utils_test.exs b/test/thrift/generator/utils_test.exs index d1461bdb..e533ee69 100644 --- a/test/thrift/generator/utils_test.exs +++ b/test/thrift/generator/utils_test.exs @@ -9,15 +9,15 @@ defmodule Thrift.Generator.UtilsTest do end test "optimize_iolist" do - check <<0>>, <<0>> - check [<<0>>], <<0>> - check [<<1>>, <<2>>], <<1, 2>> - check [<<1>>, [<<2>>]], <<1, 2>> - check [[<<1>>], <<2>>], <<1, 2>> - check [[[[<<1>>]], [<<2>>]]], <<1, 2>> - check [<<1>>, x, [<<2>>, y]], [<<1>>, x, <<2>> | y] - check [x, <<1>>, [<<2>>, y]], [x, <<1, 2>> | y] - check [<<1, 2>>, <<0>>], <<1, 2, 0>> - check [<<1, 2>>, "foo"], <<1, 2, "foo">> + check(<<0>>, <<0>>) + check([<<0>>], <<0>>) + check([<<1>>, <<2>>], <<1, 2>>) + check([<<1>>, [<<2>>]], <<1, 2>>) + check([[<<1>>], <<2>>], <<1, 2>>) + check([[[[<<1>>]], [<<2>>]]], <<1, 2>>) + check([<<1>>, x, [<<2>>, y]], [<<1>>, x, <<2>> | y]) + check([x, <<1>>, [<<2>>, y]], [x, <<1, 2>> | y]) + check([<<1, 2>>, <<0>>], <<1, 2, 0>>) + check([<<1, 2>>, "foo"], <<1, 2, "foo">>) end end