Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow client to reconnect if other side closes connection #478

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
77 changes: 70 additions & 7 deletions lib/thrift/binary/framed/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ defmodule Thrift.Binary.Framed.Client do
alias Thrift.TApplicationException
alias Thrift.Transport.SSL

@immutable_tcp_opts [active: false, packet: 4, mode: :binary]
@immutable_tcp_opts [active: true, packet: 4, mode: :binary]

@type error :: {:error, atom} | {:error, {:exception, struct}}
@type success :: {:ok, binary}
Expand All @@ -42,6 +42,7 @@ defmodule Thrift.Binary.Framed.Client do
{:tcp_opts, [tcp_option]}
| {:ssl_opts, [SSL.option()]}
| {:gen_server_opts, [genserver_call_option]}
| {:reconnect, boolean}

@type options :: [option]

Expand All @@ -55,7 +56,8 @@ defmodule Thrift.Binary.Framed.Client do
ssl_opts: [SSL.option()],
timeout: integer,
sock: {:gen_tcp, :gen_tcp.socket()} | {:ssl, :ssl.sslsocket()},
seq_id: integer
seq_id: integer,
reconnect: boolean
}
defstruct host: nil,
port: nil,
Expand All @@ -64,7 +66,8 @@ defmodule Thrift.Binary.Framed.Client do
ssl_opts: nil,
timeout: 5000,
sock: nil,
seq_id: 0
seq_id: 0,
reconnect: false
end

require Logger
Expand All @@ -74,6 +77,7 @@ defmodule Thrift.Binary.Framed.Client do
def init({host, port, opts}) do
tcp_opts = Keyword.get(opts, :tcp_opts, [])
ssl_opts = Keyword.get(opts, :ssl_opts, [])
reconnect = Keyword.get(opts, :reconnect, false)

{timeout, tcp_opts} = Keyword.pop(tcp_opts, :timeout, 5000)

Expand All @@ -82,7 +86,8 @@ defmodule Thrift.Binary.Framed.Client do
port: port,
tcp_opts: tcp_opts,
ssl_opts: ssl_opts,
timeout: timeout
timeout: timeout,
reconnect: reconnect
}

{:connect, :init, s}
Expand Down Expand Up @@ -124,6 +129,9 @@ defmodule Thrift.Binary.Framed.Client do
Additionally, the options `:name`, `:debug`, and `:spawn_opt`, if specified,
will be passed to the underlying `GenServer`. See `GenServer.start_link/3`
for details on these options.

The `:reconnect` option if set to `true` forces client to reopen TCP connection whenever
it closed.
"""
@spec start_link(String.t(), 0..65_535, options) :: GenServer.on_start()
def start_link(host, port, opts) do
Expand All @@ -143,6 +151,9 @@ defmodule Thrift.Binary.Framed.Client do
|> Keyword.merge(@immutable_tcp_opts)
|> Keyword.put_new(:send_timeout, 1000)

# reset sequence id for newly created connection
s = %{s | seq_id: 0}

case :gen_tcp.connect(host, port, opts, timeout) do
{:ok, sock} ->
maybe_ssl_handshake(sock, host, port, s)
Expand All @@ -158,10 +169,13 @@ defmodule Thrift.Binary.Framed.Client do
end

@impl Connection
def disconnect(info, %{sock: {transport, sock}}) do
def disconnect(info, %{sock: {transport, sock}} = s) do
:ok = transport.close(sock)

case info do
:reconnect ->
{:connect, info, %{s | sock: nil}}

{:close, from} ->
Connection.reply(from, :ok)
{:stop, :normal, nil}
Expand Down Expand Up @@ -245,15 +259,15 @@ defmodule Thrift.Binary.Framed.Client do

def handle_call(
{:call, rpc_name, serialized_args, tcp_opts},
_,
_from,
%{sock: {transport, sock}, seq_id: seq_id, timeout: default_timeout} = s
) do
s = %{s | seq_id: seq_id + 1}
message = Binary.serialize(:message_begin, {:call, seq_id, rpc_name})
timeout = Keyword.get(tcp_opts, :timeout, default_timeout)

with :ok <- transport.send(sock, [message | serialized_args]),
{:ok, message} <- transport.recv(sock, 0, timeout) do
{:ok, message} <- receive_message(transport, sock, timeout) do
reply = deserialize_message_reply(message, rpc_name, seq_id)
{:reply, reply, s}
else
Expand Down Expand Up @@ -291,10 +305,59 @@ defmodule Thrift.Binary.Framed.Client do
end
end

@impl Connection
def handle_info({:tcp, sock, _data}, %{reconnect: true, sock: {:gen_tcp, sock}} = s) do
{:disconnect, :reconnect, s}
end

def handle_info({:tcp_closed, sock}, %{reconnect: true, sock: {:gen_tcp, sock}} = s) do
{:disconnect, :reconnect, s}
end

def handle_info({:tcp_error, sock, _error}, %{reconnect: true, sock: {:gen_tcp, sock}} = s) do
{:disconnect, :reconnect, s}
end

def handle_info({:ssl, sock, _data}, %{reconnect: true, sock: {:ssl, sock}} = s) do
{:disconnect, :reconnect, s}
end

def handle_info({:ssl_closed, sock}, %{reconnect: true, sock: {:ssl, sock}} = s) do
{:disconnect, :reconnect, s}
end

def handle_info({:ssl_error, sock, _error}, %{reconnect: true, sock: {:ssl, sock}} = s) do
{:disconnect, :reconnect, s}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should always disconnect and make the decision on reconnect there. If reconnect: false we want to disconnect too.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case many tests fails. If I disconnect on any TCP or SSL message with reconnect: false following output produced by mix test:

19:52 $ mix test
Compiling 1 file (.ex)
Excluding tags: [pending: true]

............................................................................................................................................

  1) test clients exit if they try to use a closed client (Thrift.Generator.ServiceTest)
     test/thrift/generator/service_test.exs:387
     ** (exit) exited in: :gen_server.call(#PID<0.1755.0>, {:call, "friend_ids_of", [<<10, 0, 1, 0, 0, 0, 0, 0, 0, 4, 210>> | <<0>>], []}, 5000)
         ** (EXIT) {:error, :closed}
     code: assert Client.friend_ids_of(client, 1234) == {:error, :closed}
     stacktrace:
       (stdlib) gen_server.erl:223: :gen_server.call/3
       (thrift) lib/thrift/binary/framed/client.ex:221: Thrift.Binary.Framed.Client.call/5
       test/thrift/generator/service_test.exs:392: (test)

     The following output was logged:
     
     19:52:36.114 [error] Connection closed
     
     19:52:36.115 [error] GenServer #PID<0.1755.0> terminating
     ** (stop) {:error, :closed}
     Last message: {:tcp_closed, #Port<0.19>}
     State: nil
     
..............

  2) test clients exit on connection timeout (Thrift.Generator.ServiceTest)
     test/thrift/generator/service_test.exs:373
     ** (exit) exited in: :gen_server.call(#PID<0.1958.0>, {:call, "friend_ids_of", [<<10, 0, 1, 0, 0, 0, 0, 0, 0, 4, 210>> | <<0>>], []}, 5000)
         ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
     code: assert {:error, closed} = Client.friend_ids_of(client, 1234)
     stacktrace:
       (stdlib) gen_server.erl:223: :gen_server.call/3
       (thrift) lib/thrift/binary/framed/client.ex:221: Thrift.Binary.Framed.Client.call/5
       test/thrift/generator/service_test.exs:382: (test)

     The following output was logged:
     
     19:52:36.155 [error] Connection closed
     
     19:52:36.155 [error] GenServer #PID<0.1958.0> terminating
     ** (stop) {:error, :closed}
     Last message: {:tcp_closed, #Port<0.59>}
     State: nil
     
..

  3) test it handles void oneway functions (Thrift.Generator.ServiceTest)
     test/thrift/generator/service_test.exs:309
     ** (EXIT from #PID<0.1990.0>) {:error, :closed}

     The following output was logged:
     
     19:52:36.188 [error] Connection closed
     
     19:52:36.188 [error] GenServer #PID<0.1996.0> terminating
     ** (stop) {:error, :closed}
     Last message: {:tcp, #Port<0.68>, <<0>>}
     State: nil
     
     19:52:36.189 [error] GenServer #PID<0.1991.0> terminating
     ** (stop) {:error, :closed}
     Last message: {:EXIT, #PID<0.1990.0>, {:error, :closed}}
     State: {:state, {#PID<0.1991.0>, Supervisor.Default}, :one_for_one, {[ranch_listener_sup: Thrift.Generator.ServiceTest.ServerSpy], %{{:ranch_listener_sup, Thrift.Generator.ServiceTest.ServerSpy} => {:child, #PID<0.1992.0>, {:ranch_listener_sup, Thrift.Generator.ServiceTest.ServerSpy}, {:ranch_listener_sup, :start_link, [Thrift.Generator.ServiceTest.ServerSpy, :ranch_tcp, %{num_acceptors: 1, socket_opts: [port: 0]}, Thrift.Binary.Framed.ProtocolHandler, {Thrift.Generator.ServiceTest.SimpleService.Binary.Framed.Server, Thrift.Generator.ServiceTest.ServerSpy, [], []}]}, :permanent, :infinity, :supervisor, [:ranch_listener_sup]}}}, :undefined, 0, 5, [], 0, Supervisor.Default, {:ok, {%{intensity: 0, period: 5, strategy: :one_for_one}, [{{:ranch_listener_sup, Thrift.Generator.ServiceTest.ServerSpy}, {:ranch_listener_sup, :start_link, [Thrift.Generator.ServiceTest.ServerSpy, :ranch_tcp, %{num_acceptors: 1, socket_opts: [port: 0]}, Thrift.Binary.Framed.ProtocolHandler, {Thrift.Generator.ServiceTest.SimpleService.Binary.Framed.Server, Thrift.Generator.ServiceTest.ServerSpy, [], []}]}, :permanent, :infinity, :supervisor, [:ranch_listener_sup]}]}}}
     
.................

  4) test it can handle oneway messages (Servers.Binary.Framed.IntegrationTest)
     test/thrift/binary/framed/server_test.exs:158
     ** (EXIT from #PID<0.2192.0>) {:error, :closed}

     The following output was logged:
     
     19:52:36.324 [error] Connection closed
     
     19:52:36.324 [error] GenServer :"test it can handle oneway messages" terminating
     ** (stop) {:error, :closed}
     Last message: {:tcp, #Port<0.114>, <<0>>}
     State: nil
     
..................................................

Finished in 6.3 seconds
227 tests, 4 failures

Randomized with seed 350387

Copy link
Author

@cybernetlab cybernetlab Oct 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure - should I correct these test cases?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, any comments about my last question?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to think about we handle these cases, and handle them.

For example we could do

try do
  GenServer.call(...)
catch
  :exit, {{:error, _} = error, _} ->
    error
  :exit, {:noproc, _} ->
    {:error, :closed}
end

Copy link
Author

@cybernetlab cybernetlab Oct 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My opinion is not to handle these cases and let users of library to handle it. But in this case some tests should be rewritten

end

def handle_info(_, s) do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should handle the {:tcp_error, sock, _}, {:ssl_error, sock, _}, {:tcp, sock, _}, {;ssl, sock, _} here but ignore messages from old sockets (or clean them up in disconnect/2).

{:noreply, s}
end

def deserialize_message_reply(message, rpc_name, seq_id) do
handle_message(Binary.deserialize(:message_begin, message), seq_id, rpc_name)
end

defp receive_message(:gen_tcp, sock, timeout) do
receive do
{:tcp, ^sock, data} -> {:ok, data}
{:tcp_closed, ^sock} -> {:error, :closed}
{:tcp_error, ^sock, error} -> {:error, error}
after
timeout -> {:error, :timeout}
end
end

defp receive_message(:ssl, sock, timeout) do
receive do
{:ssl, ^sock, data} -> {:ok, data}
{:ssl_closed, ^sock} -> {:error, :closed}
{:ssl_error, ^sock, error} -> {:error, error}
after
timeout -> {:error, :timeout}
end
end
cybernetlab marked this conversation as resolved.
Show resolved Hide resolved

defp handle_message({:ok, {:reply, seq_id, rpc_name, serialized_response}}, seq_id, rpc_name) do
{:ok, serialized_response}
end
Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm"},
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm"},
"mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm"},
"nimble_parsec": {:hex, :nimble_parsec, "0.5.1", "c90796ecee0289dbb5ad16d3ad06f957b0cd1199769641c961cfe0b97db190e0", [], [], "hexpm"},
"nimble_parsec": {:hex, :nimble_parsec, "0.5.1", "c90796ecee0289dbb5ad16d3ad06f957b0cd1199769641c961cfe0b97db190e0", [:mix], [], "hexpm"},
"parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm"},
"ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.4", "f0eafff810d2041e93f915ef59899c923f4568f4585904d010387ed74988e77b", [:make, :mix, :rebar3], [], "hexpm"},
Expand Down
36 changes: 36 additions & 0 deletions test/thrift/binary/framed/server_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -177,4 +177,40 @@ defmodule Servers.Binary.Framed.IntegrationTest do
thrift_test "client methods can be called by name instead of pid", %{client_name: name} do
assert {:ok, true} == Client.ping(name)
end

@ping_reply <<128, 1, 0, 2, 0, 0, 0, 4, 112, 105, 110, 103, 0, 0, 0, 0, 2, 0, 0, 1, 0>>
thrift_test "client can reconnect when connection closed by server", ctx do
{:ok, sock} = :gen_tcp.listen(0, [:binary, packet: 4, active: false])
{:ok, port} = :inet.port(sock)
test_pid = self()

# in the first connection we emulate disconnection by server
first_conn =
Task.async(fn ->
{:ok, conn} = :gen_tcp.accept(sock)
:ok = :gen_tcp.close(conn)
end)

name = String.to_atom("#{ctx.client_name}_1")
{:ok, client} = Client.start_link("localhost", port, name: name, reconnect: true)

# in the second connection we emulate reconnection to the same server port
second_conn =
Task.async(fn ->
{:ok, conn} = :gen_tcp.accept(sock)
send(test_pid, :connected)
{:ok, _} = :gen_tcp.recv(conn, 0)
:ok = :gen_tcp.send(conn, @ping_reply)
end)

Task.await(first_conn)
# wait for sucessfull connection to re-opened port
:ok =
receive do
:connected -> :ok
end

assert {:ok, true} == Client.ping(client)
Task.await(second_conn)
end
end
20 changes: 10 additions & 10 deletions test/thrift/generator/utils_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ defmodule Thrift.Generator.UtilsTest do
end

test "optimize_iolist" do
check <<0>>, <<0>>
check [<<0>>], <<0>>
check [<<1>>, <<2>>], <<1, 2>>
check [<<1>>, [<<2>>]], <<1, 2>>
check [[<<1>>], <<2>>], <<1, 2>>
check [[[[<<1>>]], [<<2>>]]], <<1, 2>>
check [<<1>>, x, [<<2>>, y]], [<<1>>, x, <<2>> | y]
check [x, <<1>>, [<<2>>, y]], [x, <<1, 2>> | y]
check [<<1, 2>>, <<0>>], <<1, 2, 0>>
check [<<1, 2>>, "foo"], <<1, 2, "foo">>
check(<<0>>, <<0>>)
check([<<0>>], <<0>>)
check([<<1>>, <<2>>], <<1, 2>>)
check([<<1>>, [<<2>>]], <<1, 2>>)
check([[<<1>>], <<2>>], <<1, 2>>)
check([[[[<<1>>]], [<<2>>]]], <<1, 2>>)
check([<<1>>, x, [<<2>>, y]], [<<1>>, x, <<2>> | y])
check([x, <<1>>, [<<2>>, y]], [x, <<1, 2>> | y])
check([<<1, 2>>, <<0>>], <<1, 2, 0>>)
check([<<1, 2>>, "foo"], <<1, 2, "foo">>)
end
end