Skip to content

Commit

Permalink
Merge pull request #102 from qzhuyan/dev/william/export-async-connect
Browse files Browse the repository at this point in the history
Export async connect and also retry connect
  • Loading branch information
qzhuyan authored Jun 23, 2022
2 parents 81806d4 + 0a4ebe4 commit 44ce3de
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 9 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
strategy:
matrix:
otp:
[22.3.4.9, 23.3.4.5, 24.3.3]
[23.3.4.5, 24.3.3]
build_type:
- RelWithDebInfo
- Debug
Expand Down
1 change: 1 addition & 0 deletions src/quicer.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
, "Makefile"
, "get-msquic.sh"
, "c_build"
, "build.sh"
]},
{exclude_regexps, ["priv/.*.so"]}

Expand Down
44 changes: 36 additions & 8 deletions src/quicer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
-export([ listen/2
, close_listener/1
, connect/4
, async_connect/3
, handshake/1
, handshake/2
, async_handshake/1
Expand Down Expand Up @@ -181,9 +182,10 @@ close_listener(Listener) ->
quicer_nif:close_listener(Listener).

%% @doc
%% Initial New Connection (Client)
%% Initiate New Connection (Client)
%%
%% Initial new connection to remote endpoint with connection opts specified.
%% Initiate new connection to remote endpoint with connection opts specified.
%% @see async_connect/3
%% @end
-spec connect(inet:hostname() | inet:ip_address(),
inet:port_number(), conn_opts(), timeout()) ->
Expand All @@ -195,26 +197,52 @@ connect(Host, Port, Opts, Timeout) when is_list(Opts) ->
connect(Host, Port, Opts, Timeout) when is_tuple(Host) ->
connect(inet:ntoa(Host), Port, Opts, Timeout);
connect(Host, Port, Opts, Timeout) when is_map(Opts) ->
NewOpts = maps:merge(default_conn_opts(), Opts),
do_connect(Host, Port, Opts, Timeout, 1).
do_connect(_Host, _Port, _Opts, Timeout, _Retries) when Timeout =< 0 ->
{error, timeout};
do_connect(Host, Port, Opts, Timeout, Retries) ->
HandshakeTOut = maps:get(handshake_idle_timeout_ms, Opts, 200),
RetryAfter = HandshakeTOut*Retries,
HandshakeTOut2 = min(Timeout, RetryAfter),
NewOpts = maps:merge(default_conn_opts(), Opts#{handshake_idle_timeout_ms => HandshakeTOut2}),
case quicer_nif:async_connect(Host, Port, NewOpts) of
{ok, H} ->
receive
{quic, connected, Ctx} ->
{ok, Ctx};
{quic, transport_shutdown, C, Reason} when Reason == connection_timeout
orelse Reason == connection_idle ->
%% We must close the old one
quicer:shutdown_connection(C),
do_connect(Host, Port, Opts, Timeout - HandshakeTOut2, Retries * 2);
{quic, transport_shutdown, _, Reason} ->
{error, transport_down, Reason}
after Timeout ->
%% @TODO caller should provide the method to handle timeout
async_shutdown_connection(H, ?QUIC_CONNECTION_SHUTDOWN_FLAG_SILENT, 0),
{error, timeout}
end;
{error, _} = Err ->
Err
end.

%% @doc
%% Initiate New Connection (Client)
%%
%% Async variant of connect/4
%% @see connect/4
%% @end
-spec async_connect(inet:hostname() | inet:ip_address(),
inet:port_number(), conn_opts()) ->
{ok, connection_handler()} |
{error, conn_open_error | config_error | conn_start_error}.
async_connect(Host, Port, Opts) when is_list(Opts) ->
async_connect(Host, Port, maps:from_list(Opts));
async_connect(Host, Port, Opts) when is_tuple(Host) ->
async_connect(inet:ntoa(Host), Port, Opts);
async_connect(Host, Port, Opts) when is_map(Opts) ->
NewOpts = maps:merge(default_conn_opts(), Opts),
quicer_nif:async_connect(Host, Port, NewOpts).


%% @doc Complete TLS handshake after accepted a Connection
%% with 5s timeout
%% with 5s timeout (Server)
%% @end
%% @see accept/3
%% @see handshake/2
Expand Down
74 changes: 74 additions & 0 deletions test/quicer_snb_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ all() ->
, tc_conn_gc
, tc_conn_resume_old
, tc_conn_resume_nst
, tc_conn_resume_nst_async
].

%%--------------------------------------------------------------------
Expand Down Expand Up @@ -1069,6 +1070,79 @@ tc_conn_resume_nst(Config) ->
end),
ok.


%%% Non-blocking connection resume, client could send app data without waiting for handshake done.
tc_conn_resume_nst_async(Config) ->
Port = select_port(),
ListenerOpts = [{conn_acceptors, 32} | default_listen_opts(Config)],
ConnectionOpts = [ {conn_callback, quicer_server_conn_callback}
, {stream_acceptors, 32}
| default_conn_opts()],
StreamOpts = [ {stream_callback, quicer_echo_server_stream_callback}
| default_stream_opts() ],
Options = {ListenerOpts, ConnectionOpts, StreamOpts},
ct:pal("Listener Options: ~p", [Options]),
?check_trace(#{timetrap => 10000},
begin
{ok, _QuicApp} = quicer_start_listener(mqtt, Port, Options),
{ok, Conn} = quicer:connect("localhost", Port, [{quic_event_mask, ?QUICER_CONNECTION_EVENT_MASK_NST} | default_conn_opts()], 5000),
{ok, Stm} = quicer:start_stream(Conn, [{active, false}]),
{ok, 4} = quicer:async_send(Stm, <<"ping">>),
{ok, <<"ping">>} = quicer:recv(Stm, 4),
NST = receive
{quic, nst_received, Conn, Ticket} ->
Ticket
after 1000 ->
ct:fail("No ticket received")
end,
quicer:close_connection(Conn, ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 111),

{ok, ConnResumed} = quicer:async_connect("localhost", Port, [{nst, NST} | default_conn_opts()]),
{ok, Stm2} = quicer:start_stream(ConnResumed, [{active, false}]),
{ok, 5} = quicer:async_send(Stm2, <<"ping3">>),
{ok, <<"ping3">>} = quicer:recv(Stm2, 5),
ct:pal("stop listener"),
ok = quicer:stop_listener(mqtt)
end,
fun(Result, Trace) ->
ct:pal("Trace is ~p", [Trace]),
?assertEqual(ok, Result),
%% 1. verify that for each success connect we send a resumption ticket
?assert(?strict_causality(#{ ?snk_kind := debug
, context := "callback"
, function := "ClientConnectionCallback"
, mark := ?QUIC_CONNECTION_EVENT_CONNECTED
, tag := "event"
, resource_id := _CRid1
},
#{ ?snk_kind := debug
, context := "callback"
, function := "ClientConnectionCallback"
, tag := "event"
, mark := ?QUIC_CONNECTION_EVENT_RESUMPTION_TICKET_RECEIVED
, resource_id := _CRid1
},
Trace)),
%% 2. verify that resumption ticket is received on client side
%% and client use it to resume success
?assert(?causality(#{ ?snk_kind := debug
, context := "callback"
, function := "ClientConnectionCallback"
, mark := ?QUIC_CONNECTION_EVENT_RESUMPTION_TICKET_RECEIVED
, tag := "event"
, resource_id := _CRid1
},
#{ ?snk_kind := debug
, context := "callback"
, function := "ServerConnectionCallback"
, tag := "event"
, mark := ?QUIC_CONNECTION_EVENT_RESUMED
, resource_id := _SRid1
},
Trace))
end),
ok.

%%% Internal Helpers
default_stream_opts() ->
[].
Expand Down

0 comments on commit 44ce3de

Please sign in to comment.