diff --git a/.gitignore b/.gitignore index 31760c8..9c04f56 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,3 @@ -.rebar3 _* .eunit *.o @@ -14,7 +13,6 @@ rebar.lock ebin log erl_crash.dump -.rebar _rel _deps _plugins @@ -22,5 +20,6 @@ _tdeps _plt logs _build -rebar3 +rebar *.crashdump +.exrc diff --git a/.travis.yml b/.travis.yml index 3ebe762..6065421 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,12 +1,10 @@ sudo: false language: erlang otp_release: - - "19.1" - - "19.2" + - "21.0" cache: directories: - - $HOME/otp/19.1 - - $HOME/otp/19.2 + - $HOME/otp/21.0 - $HOME/.cache/rebar3 - _plt install: "true" diff --git a/CHANGELOG.md b/CHANGELOG.md index 15507f0..546e609 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,15 @@ Below is a non-exhaustive list of changes between `gen_rpc` versions. +## 3.0.0 + +- Deprecate support for Erlang < 21.0 +- Support monitoring nodes +- Support EC SSL certificates +- Support cookie per node configuration +- Support external cookie validation mechanism +- Support keepalive gen_server that actively keeps a client connection alive + ## 2.1.0 - Support multiple connections per node using aribtrary keys. diff --git a/README.md b/README.md index c22489a..9e732fa 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ To build this project you need to have the following: -* **Erlang/OTP** >= 19.1 +* **Erlang/OTP** >= 21.0 * **git** >= 1.7 @@ -94,6 +94,8 @@ For more information on what the functions below do, run `erl -man rpc`. - `eval_everywhere(Module, Function, Args)` and `eval_everywhere(NodesOrNodesWithKeys, Module, Function, Args)`: Multi-node version of the `cast` function. +- `monitor_node(Node, Flag)` and `monitor_node(Node, Flag, MessageType)`: Sends messages of node connects and disconnects to the subscribed process. Set `MessageType` to `gen_server` to send a `gen_server:cast` as a state change message, `gen_fsm` to send a `gen_fsm:send_all_state_event` as a state change message or `simple` to send a simple message upon node state change. Please note that in contrast to `erlang:monitor_node`, calling `gen_rpc:monitor_node` multiple times will result to only one registration per process. + ### Per-Key Sharding `gen_rpc` supports multiple outgoing connections per node using a key of arbitrary type to differentiate between connections. @@ -321,3 +323,5 @@ Please see [CONTRIBUTING.md](CONTRIBUTING.md) ### Contributors: - [Edward Tsang](https://github.com/linearregression) +- [getong](https://github.com/getong) +- [JianBo He](https://github.com/HJianBo) diff --git a/TODO.md b/TODO.md index 338b44d..c5aa49e 100644 --- a/TODO.md +++ b/TODO.md @@ -2,4 +2,4 @@ This is a list of pending features or code technical debt for `gen_rpc`: -- Implement `net_kernel:monitor_nodes/2` functionality +- Alternative Distribution Driver that transparently uses gen_rpc diff --git a/include/guards.hrl b/include/guards.hrl index d0783be..dda2c9c 100644 --- a/include/guards.hrl +++ b/include/guards.hrl @@ -7,6 +7,7 @@ -define(is_null(A), A =:= undefined orelse A =:= null). -define(is_true(A), A =:= true orelse A =:= "true" orelse A =:= <<"true">>). -define(is_false(A), A =:= false orelse A =:= "false" orelse A =:= <<"false">>). +-define(is_boolean(A), (?is_true(A)) orelse (?is_false(A))). -define(is_process(A), is_pid(A) orelse is_atom(A)). -define(is_limit(A), (is_integer(A) andalso A >= 0) orelse A =:= infinity). -define(is_timeout(A), (is_integer(A) andalso A >= 0) orelse A =:= infinity). diff --git a/include/ssl.hrl b/include/ssl.hrl index bb6a9e4..789f6e3 100644 --- a/include/ssl.hrl +++ b/include/ssl.hrl @@ -30,6 +30,8 @@ "ECDH-ECDSA-AES256-SHA","ECDH-RSA-AES256-SHA","AES256-SHA","ECDHE-ECDSA-AES128-SHA", "ECDHE-RSA-AES128-SHA","DHE-DSS-AES128-SHA","ECDH-ECDSA-AES128-SHA","ECDH-RSA-AES128-SHA","AES128-SHA"]}, {secure_renegotiate,true}, + {honor_ecc_order,true}, + {honor_cipher_order,true}, {reuse_sessions,true}, {versions,['tlsv1.2','tlsv1.1']}, {verify,verify_peer}, @@ -38,7 +40,6 @@ -define(SSL_DEFAULT_SERVER_OPTS, [{fail_if_no_peer_cert,true}, {log_alert,false}, - {honor_cipher_order,true}, {client_renegotiation,true}]). -define(SSL_DEFAULT_CLIENT_OPTS, [{server_name_indication,disable}, diff --git a/package.exs b/package.exs index 9f32170..c88548d 100644 --- a/package.exs +++ b/package.exs @@ -3,7 +3,7 @@ defmodule GenRPC.Mixfile do def project do [app: :gen_rpc, - version: "2.0.0", + version: "3.0.0", description: "A scalable RPC library for Erlang-VM based languages", package: package] end diff --git a/priv/ec_ssl/ca.cert.pem b/priv/ec_ssl/ca.cert.pem new file mode 100644 index 0000000..46bbd9f --- /dev/null +++ b/priv/ec_ssl/ca.cert.pem @@ -0,0 +1,14 @@ +-----BEGIN CERTIFICATE----- +MIICOTCCAd6gAwIBAgICMwwwCgYIKoZIzj0EAwIwdDELMAkGA1UEBhMCVVMxEzAR +BgNVBAgMCkNhbGlmb3JuaWExFjAUBgNVBAcMDVNhbiBGcmFuY2lzY28xEDAOBgNV +BAoMB2dlbl9ycGMxJjAkBgNVBAMMHWdlbl9ycGMgQ2VydGlmaWNhdGUgQXV0aG9y +aXR5MB4XDTE4MTIyMTIzMTUwNVoXDTM4MTIxNjIzMTUwNVowdDELMAkGA1UEBhMC +VVMxEzARBgNVBAgMCkNhbGlmb3JuaWExFjAUBgNVBAcMDVNhbiBGcmFuY2lzY28x +EDAOBgNVBAoMB2dlbl9ycGMxJjAkBgNVBAMMHWdlbl9ycGMgQ2VydGlmaWNhdGUg +QXV0aG9yaXR5MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEpZT2IwQ8QcG0BMeY +9OyzZN6lcTsNFpXv3dhEAsug8zhY5uUz1GVRWDFAtjrlFQ7mOERQHUlFiFXqRoYm +2c/BTKNgMF4wDwYDVR0TAQH/BAUwAwEB/zALBgNVHQ8EBAMCAQYwHQYDVR0OBBYE +FAF6LCa2P8J2MNb33CWg82MCfAgNMB8GA1UdIwQYMBaAFAF6LCa2P8J2MNb33CWg +82MCfAgNMAoGCCqGSM49BAMCA0kAMEYCIQDGq0LtZ2O7ks0m+wc3a7qJN8WCyeAz +vK8tGQOKLzTlSQIhAIMJcudk09EZt7cqtQUt5Fc6U8P9OsEPXe8WGginL/FU +-----END CERTIFICATE----- diff --git a/priv/ec_ssl/gen_rpc_master@127.0.0.1.cert.pem b/priv/ec_ssl/gen_rpc_master@127.0.0.1.cert.pem new file mode 100644 index 0000000..8f3a141 --- /dev/null +++ b/priv/ec_ssl/gen_rpc_master@127.0.0.1.cert.pem @@ -0,0 +1,15 @@ +-----BEGIN CERTIFICATE----- +MIICTTCCAfOgAwIBAgIDAI7gMAoGCCqGSM49BAMCMHQxCzAJBgNVBAYTAlVTMRMw +EQYDVQQIDApDYWxpZm9ybmlhMRYwFAYDVQQHDA1TYW4gRnJhbmNpc2NvMRAwDgYD +VQQKDAdnZW5fcnBjMSYwJAYDVQQDDB1nZW5fcnBjIENlcnRpZmljYXRlIEF1dGhv +cml0eTAeFw0xODEyMjEyMzE1MzNaFw0zODEyMTYyMzE1MzNaMG8xCzAJBgNVBAYT +AlVTMRMwEQYDVQQIDApDYWxpZm9ybmlhMRYwFAYDVQQHDA1TYW4gRnJhbmNpc2Nv +MRAwDgYDVQQKDAdnZW5fcnBjMSEwHwYDVQQDDBhnZW5fcnBjX21hc3RlckAxMjcu +MC4wLjEwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAASXE5wPPspbIS4IHn7xs3JJ +hqqVtS710Ys9q2eNzKSkFJ9Av0mXKwhw8gsYbCyCZtEw89QV1Yzg2f8MARzfU6nG +o3kwdzAJBgNVHRMEAjAAMAsGA1UdDwQEAwIF4DAdBgNVHSUEFjAUBggrBgEFBQcD +AQYIKwYBBQUHAwIwHQYDVR0OBBYEFB6s40nETDtvzmXuxeatcNdjTisVMB8GA1Ud +IwQYMBaAFAF6LCa2P8J2MNb33CWg82MCfAgNMAoGCCqGSM49BAMCA0gAMEUCIHHt +aCD+Ihp/ATDdapFL5fbYJPAI0Ah0bFdul5ip4ekQAiEA11BZwECTDQo1/I/58kQi +E8y9hyoxh8FD/SzimQ7qZHY= +-----END CERTIFICATE----- diff --git a/priv/ec_ssl/gen_rpc_master@127.0.0.1.key.pem b/priv/ec_ssl/gen_rpc_master@127.0.0.1.key.pem new file mode 100644 index 0000000..9a8ef1a --- /dev/null +++ b/priv/ec_ssl/gen_rpc_master@127.0.0.1.key.pem @@ -0,0 +1,8 @@ +-----BEGIN EC PARAMETERS----- +BggqhkjOPQMBBw== +-----END EC PARAMETERS----- +-----BEGIN EC PRIVATE KEY----- +MHcCAQEEIKegvLV7VLEr7dOKdsilVH06/edAt9Mcc+k3raAOtIM1oAoGCCqGSM49 +AwEHoUQDQgAElxOcDz7KWyEuCB5+8bNySYaqlbUu9dGLPatnjcykpBSfQL9JlysI +cPILGGwsgmbRMPPUFdWM4Nn/DAEc31Opxg== +-----END EC PRIVATE KEY----- diff --git a/priv/ec_ssl/gen_rpc_slave@127.0.0.1.cert.pem b/priv/ec_ssl/gen_rpc_slave@127.0.0.1.cert.pem new file mode 100644 index 0000000..69f740d --- /dev/null +++ b/priv/ec_ssl/gen_rpc_slave@127.0.0.1.cert.pem @@ -0,0 +1,15 @@ +-----BEGIN CERTIFICATE----- +MIICTDCCAfKgAwIBAgIDAMEGMAoGCCqGSM49BAMCMHQxCzAJBgNVBAYTAlVTMRMw +EQYDVQQIDApDYWxpZm9ybmlhMRYwFAYDVQQHDA1TYW4gRnJhbmNpc2NvMRAwDgYD +VQQKDAdnZW5fcnBjMSYwJAYDVQQDDB1nZW5fcnBjIENlcnRpZmljYXRlIEF1dGhv +cml0eTAeFw0xODEyMjEyMzE1MjJaFw0zODEyMTYyMzE1MjJaMG4xCzAJBgNVBAYT +AlVTMRMwEQYDVQQIDApDYWxpZm9ybmlhMRYwFAYDVQQHDA1TYW4gRnJhbmNpc2Nv +MRAwDgYDVQQKDAdnZW5fcnBjMSAwHgYDVQQDDBdnZW5fcnBjX3NsYXZlQDEyNy4w +LjAuMTBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABBmLXBysYtqQjvxAVxfihlQ5 +OTDCJNNADxa6uuaW4BElOIc4tWEzouN+yCCjxI4AMs3g/7RitHHwYt6bnAuY8Iyj +eTB3MAkGA1UdEwQCMAAwCwYDVR0PBAQDAgXgMB0GA1UdJQQWMBQGCCsGAQUFBwMB +BggrBgEFBQcDAjAdBgNVHQ4EFgQUCt8m+xW2pR3O6vx1zqBWmTH+Z5AwHwYDVR0j +BBgwFoAUAXosJrY/wnYw1vfcJaDzYwJ8CA0wCgYIKoZIzj0EAwIDSAAwRQIgR1PV +dClqz5h5W1RGHO2yffAqlCinWDzzCg2VO/1eD24CIQC7l0vlTGsoBtjhciGlI+ej +S2Ravs4obm9OgS5YEIFPCw== +-----END CERTIFICATE----- diff --git a/priv/ec_ssl/gen_rpc_slave@127.0.0.1.key.pem b/priv/ec_ssl/gen_rpc_slave@127.0.0.1.key.pem new file mode 100644 index 0000000..aa2a191 --- /dev/null +++ b/priv/ec_ssl/gen_rpc_slave@127.0.0.1.key.pem @@ -0,0 +1,8 @@ +-----BEGIN EC PARAMETERS----- +BggqhkjOPQMBBw== +-----END EC PARAMETERS----- +-----BEGIN EC PRIVATE KEY----- +MHcCAQEEIGSRUX8RPDRYPL1RAn3LM2yIW56F2nuPH43PKo/JKKEJoAoGCCqGSM49 +AwEHoUQDQgAEGYtcHKxi2pCO/EBXF+KGVDk5MMIk00APFrq65pbgESU4hzi1YTOi +437IIKPEjgAyzeD/tGK0cfBi3pucC5jwjA== +-----END EC PRIVATE KEY----- diff --git a/rebar.config b/rebar.config index cc1e975..9cbdebd 100644 --- a/rebar.config +++ b/rebar.config @@ -2,8 +2,8 @@ %%% ex: set ft=erlang fenc=utf-8 sts=4 ts=4 sw=4 et: %%% -%%% Require OTP 19.1 at a bare minimum -{minimum_otp_vsn, "19.1"}. +%%% Require OTP 21.0 at a bare minimum +{minimum_otp_vsn, "21.0"}. %% Plugins {plugins, [rebar3_hex]}. @@ -32,8 +32,8 @@ ]}. {deps, [ - {hut, "~> 1.2"}, - {ssl_verify_fun, "~> 1.1"} + {hut, "~> 1.2.1"}, + {ssl_verify_fun, "~> 1.1.4"} ]}. {profiles, [ @@ -44,8 +44,8 @@ warnings_as_errors, export_all, no_inline_list_funcs]}, - {deps, [{lager, "~> 3.0"}, - {eunit_formatters, "~> 0.3"} + {deps, [{lager, "~> 3.6.7"}, + {eunit_formatters, "~> 0.5"} ]} ]}, {dev, [ @@ -53,7 +53,7 @@ {parse_transform, lager_transform}, warnings_as_errors, no_inline_list_funcs]}, - {deps, [{lager, "~> 3.0"}, + {deps, [{lager, "~> 3.6.7"}, {sync, {git, "git://github.com/rustyio/sync.git", {branch, "master"}}} ]} ]} diff --git a/rebar3 b/rebar3 new file mode 100755 index 0000000..8324a5a Binary files /dev/null and b/rebar3 differ diff --git a/src/driver/gen_rpc_driver_ssl.erl b/src/driver/gen_rpc_driver_ssl.erl index 2ed4953..0c0d5e0 100644 --- a/src/driver/gen_rpc_driver_ssl.erl +++ b/src/driver/gen_rpc_driver_ssl.erl @@ -30,12 +30,13 @@ get_peer/1, send/2, activate_socket/1, - authenticate_server/1, + authenticate_to_server/2, authenticate_client/3, copy_sock_opts/2, set_controlling_process/2, set_send_timeout/2, - set_acceptor_opts/1]). + set_acceptor_opts/1, + getstat/2]). %%% =================================================== %%% Public API @@ -62,14 +63,13 @@ listen(Port) when is_integer(Port) -> SslOpts = merge_ssl_options(server, undefined), ssl:listen(Port, SslOpts). --spec accept(ssl:sslsocket()) -> ok | {error, term()}. +-spec accept(ssl:sslsocket()) -> {ok, ssl:sslsocket()} | {error, term()}. accept(Socket) when is_tuple(Socket) -> {ok, TSocket} = ssl:transport_accept(Socket, infinity), - case ssl:ssl_accept(TSocket) of - ok -> - {ok, TSocket}; - Error -> - Error + case ssl:handshake(TSocket) of + {ok, SslSocket} -> + {ok, SslSocket}; + Error -> Error end. -spec send(ssl:sslsocket(), binary()) -> ok | {error, term()}. @@ -92,11 +92,10 @@ activate_socket(Socket) when is_tuple(Socket) -> ok. %% Authenticate to a server --spec authenticate_server(ssl:sslsocket()) -> ok | {error, {badtcp | badrpc, term()}}. -authenticate_server(Socket) -> - Cookie = erlang:get_cookie(), - NodeStr = erlang:atom_to_list(node()), - Packet = erlang:term_to_binary({gen_rpc_authenticate_connection, NodeStr, Cookie}), +-spec authenticate_to_server(atom(), ssl:sslsocket()) -> ok | {error, {badtcp | badrpc, term()}}. +authenticate_to_server(Node, Socket) -> + Cookie = gen_rpc_helper:get_cookie_per_node(Node), + Packet = erlang:term_to_binary({gen_rpc_authenticate_connection, node(), Cookie}), SendTO = gen_rpc_helper:get_send_timeout(undefined), RecvTO = gen_rpc_helper:get_call_receive_timeout(undefined), ok = set_send_timeout(Socket, SendTO), @@ -135,45 +134,49 @@ authenticate_server(Socket) -> %% Authenticate a connected client -spec authenticate_client(ssl:sslsocket(), tuple(), binary()) -> ok | {error, {badtcp | badrpc, term()}}. authenticate_client(Socket, Peer, Data) -> - Cookie = erlang:get_cookie(), try erlang:binary_to_term(Data) of - {gen_rpc_authenticate_connection, Node, Cookie} -> - PeerCert = extract_peer_certificate(Socket), - {SocketResponse, AuthResult} = case ssl_verify_hostname:verify_cert_hostname(PeerCert, Node) of - {fail, AuthReason} -> - ?log(error, "event=node_certificate_mismatch socket=\"~s\" peer=\"~s\" reason=\"~p\"", - [gen_rpc_helper:socket_to_string(Socket), gen_rpc_helper:peer_to_string(Peer), AuthReason]), - {{gen_rpc_connection_rejected,node_certificate_mismatch}, {error,{badrpc,node_certificate_mismatch}}}; - {valid, _Hostname} -> - ?log(debug, "event=certificate_validated socket=\"~s\" peer=\"~s\"", - [gen_rpc_helper:socket_to_string(Socket), gen_rpc_helper:peer_to_string(Peer)]), - {gen_rpc_connection_authenticated, ok} - end, - Packet = erlang:term_to_binary(SocketResponse), - case send(Socket, Packet) of - {error, Reason} -> - ?log(error, "event=transmission_failed socket=\"~s\" peer=\"~s\" reason=\"~p\"", - [gen_rpc_helper:socket_to_string(Socket), gen_rpc_helper:peer_to_string(Peer), Reason]), - {error, {badtcp,Reason}}; - ok -> - ?log(debug, "event=transmission_succeeded socket=\"~s\" peer=\"~s\"", - [gen_rpc_helper:socket_to_string(Socket), gen_rpc_helper:peer_to_string(Peer)]), - ok = activate_socket(Socket), - AuthResult + {gen_rpc_authenticate_connection, Node, Cookie} when is_atom(Node), is_atom(Cookie) -> + ValidCookie = gen_rpc_helper:get_cookie_per_node(Node), + if + ValidCookie == Cookie -> + PeerCert = extract_peer_certificate(Socket), + NodeStr = gen_rpc_helper:to_string(Node), + {SocketResponse, AuthResult} = case ssl_verify_hostname:verify_cert_hostname(PeerCert, NodeStr) of + {fail, AuthReason} -> + ?log(error, "event=node_certificate_mismatch socket=\"~s\" peer=\"~s\" node=\"~s\" reason=\"~p\"", + [gen_rpc_helper:socket_to_string(Socket), gen_rpc_helper:peer_to_string(Peer), Node, AuthReason]), + {{gen_rpc_connection_rejected,node_certificate_mismatch}, {error,{badrpc,node_certificate_mismatch}}}; + {valid, _Hostname} -> + ?log(debug, "event=certificate_validated socket=\"~s\" peer=\"~s\" node=\"~s\"", + [gen_rpc_helper:socket_to_string(Socket), gen_rpc_helper:peer_to_string(Peer), Node]), + {gen_rpc_connection_authenticated, ok} + end, + Packet = erlang:term_to_binary(SocketResponse), + case send(Socket, Packet) of + {error, Reason} -> + ?log(error, "event=transmission_failed socket=\"~s\" peer=\"~s\" node=\"~s\" reason=\"~p\"", + [gen_rpc_helper:socket_to_string(Socket), gen_rpc_helper:peer_to_string(Peer), Node, Reason]), + {error, {badtcp,Reason}}; + ok -> + ?log(debug, "event=transmission_succeeded socket=\"~s\" peer=\"~s\" node=\"~s\"", + [gen_rpc_helper:socket_to_string(Socket), gen_rpc_helper:peer_to_string(Peer), Node]), + ok = activate_socket(Socket), + AuthResult + end; + true -> + ?log(error, "event=invalid_cookie_received socket=\"~s\" peer=\"~s\" node=\"~s\"", + [gen_rpc_helper:socket_to_string(Socket), gen_rpc_helper:peer_to_string(Peer), Node]), + Packet = erlang:term_to_binary({gen_rpc_connection_rejected, invalid_cookie}), + ok = case send(Socket, Packet) of + {error, Reason} -> + ?log(error, "event=transmission_failed socket=\"~s\" peer=\"~s\" node=\"~s\" reason=\"~p\"", + [gen_rpc_helper:socket_to_string(Socket), gen_rpc_helper:peer_to_string(Peer), Node, Reason]); + ok -> + ?log(debug, "event=transmission_succeeded socket=\"~s\" peer=\"~s\" node=\"~s\"", + [gen_rpc_helper:socket_to_string(Socket), gen_rpc_helper:peer_to_string(Peer), Node]) + end, + {error, {badrpc,invalid_cookie}} end; - {gen_rpc_authenticate_connection, _Node, _IncorrectCookie} -> - ?log(error, "event=invalid_cookie_received socket=\"~s\" peer=\"~s\"", - [gen_rpc_helper:socket_to_string(Socket), gen_rpc_helper:peer_to_string(Peer)]), - Packet = erlang:term_to_binary({gen_rpc_connection_rejected, invalid_cookie}), - ok = case send(Socket, Packet) of - {error, Reason} -> - ?log(error, "event=transmission_failed socket=\"~s\" peer=\"~s\" reason=\"~p\"", - [gen_rpc_helper:socket_to_string(Socket), gen_rpc_helper:peer_to_string(Peer), Reason]); - ok -> - ?log(debug, "event=transmission_succeeded socket=\"~s\" peer=\"~s\"", - [gen_rpc_helper:socket_to_string(Socket), gen_rpc_helper:peer_to_string(Peer)]) - end, - {error, {badrpc,invalid_cookie}}; OtherData -> ?log(debug, "event=erroneous_data_received socket=\"~s\" peer=\"~s\" data=\"~p\"", [gen_rpc_helper:socket_to_string(Socket), gen_rpc_helper:peer_to_string(Peer), OtherData]), @@ -207,12 +210,16 @@ set_acceptor_opts(Socket) when is_tuple(Socket) -> ok = ssl:setopts(Socket, [{send_timeout, gen_rpc_helper:get_send_timeout(undefined)}]), ok. +-spec getstat(ssl:sslsocket(), list()) -> ok | {error, any()}. +getstat(Socket, OptNames) -> + ssl:getstat(Socket, OptNames). + %%% =================================================== %%% Private functions %%% =================================================== merge_ssl_options(client, Node) -> {ok, ExtraOpts} = application:get_env(?APP, ssl_client_options), - NodeStr = atom_to_list(Node), + NodeStr = gen_rpc_helper:to_string(Node), DefaultOpts = lists:append(?SSL_DEFAULT_COMMON_OPTS, ?SSL_DEFAULT_CLIENT_OPTS), VerifyOpts = [{verify_fun, {fun ssl_verify_hostname:verify_fun/3,[{check_hostname,NodeStr}]}}|DefaultOpts], gen_rpc_helper:merge_sockopt_lists(ExtraOpts, VerifyOpts); diff --git a/src/driver/gen_rpc_driver_tcp.erl b/src/driver/gen_rpc_driver_tcp.erl index 5c87124..f1b13ac 100644 --- a/src/driver/gen_rpc_driver_tcp.erl +++ b/src/driver/gen_rpc_driver_tcp.erl @@ -28,12 +28,13 @@ get_peer/1, send/2, activate_socket/1, - authenticate_server/1, + authenticate_to_server/2, authenticate_client/3, copy_sock_opts/2, set_controlling_process/2, set_send_timeout/2, - set_acceptor_opts/1]). + set_acceptor_opts/1, + getstat/2]). %%% =================================================== %%% Public API @@ -57,7 +58,7 @@ connect(Node, Port) when is_atom(Node) -> listen(Port) when is_integer(Port) -> gen_tcp:listen(Port, ?TCP_DEFAULT_OPTS). --spec accept(port()) -> ok | {error, term()}. +-spec accept(port()) -> {ok, inet:socket()} | {error, term()}. accept(Socket) when is_port(Socket) -> gen_tcp:accept(Socket, infinity). @@ -81,10 +82,10 @@ send(Socket, Data) when is_port(Socket), is_binary(Data) -> end. %% Authenticate to a server --spec authenticate_server(port()) -> ok | {error, {badtcp | badrpc, term()}}. -authenticate_server(Socket) -> - Cookie = erlang:get_cookie(), - Packet = erlang:term_to_binary({gen_rpc_authenticate_connection, Cookie}), +-spec authenticate_to_server(atom(), port()) -> ok | {error, {badtcp | badrpc, term()}}. +authenticate_to_server(Node, Socket) -> + Cookie = gen_rpc_helper:get_cookie_per_node(Node), + Packet = erlang:term_to_binary({gen_rpc_authenticate_connection, Node, Cookie}), SendTO = gen_rpc_helper:get_send_timeout(undefined), RecvTO = gen_rpc_helper:get_call_receive_timeout(undefined), ok = set_send_timeout(Socket, SendTO), @@ -124,35 +125,38 @@ authenticate_server(Socket) -> %% Authenticate a connected client -spec authenticate_client(port(), tuple(), binary()) -> ok | {error, {badtcp | badrpc, term()}}. authenticate_client(Socket, Peer, Data) -> - Cookie = erlang:get_cookie(), try erlang:binary_to_term(Data) of - {gen_rpc_authenticate_connection, Cookie} -> - Packet = erlang:term_to_binary(gen_rpc_connection_authenticated), - Result = case send(Socket, Packet) of - {error, Reason} -> - ?log(error, "event=transmission_failed socket=\"~s\" peer=\"~s\" reason=\"~p\"", - [gen_rpc_helper:socket_to_string(Socket), gen_rpc_helper:peer_to_string(Peer), Reason]), - {error, {badtcp,Reason}}; - ok -> - ?log(debug, "event=transmission_succeeded socket=\"~s\" peer=\"~s\"", - [gen_rpc_helper:socket_to_string(Socket), gen_rpc_helper:peer_to_string(Peer)]), - ok = activate_socket(Socket), - ok - end, - Result; - {gen_rpc_authenticate_connection, _IncorrectCookie} -> - ?log(error, "event=invalid_cookie_received socket=\"~s\" peer=\"~s\"", - [gen_rpc_helper:socket_to_string(Socket), gen_rpc_helper:peer_to_string(Peer)]), - Packet = erlang:term_to_binary({gen_rpc_connection_rejected, invalid_cookie}), - ok = case send(Socket, Packet) of - {error, Reason} -> - ?log(error, "event=transmission_failed socket=\"~s\" peer=\"~s\" reason=\"~p\"", - [gen_rpc_helper:socket_to_string(Socket), gen_rpc_helper:peer_to_string(Peer), Reason]); - ok -> - ?log(debug, "event=transmission_succeeded socket=\"~s\" peer=\"~s\"", - [gen_rpc_helper:socket_to_string(Socket), gen_rpc_helper:peer_to_string(Peer)]) - end, - {error, {badrpc,invalid_cookie}}; + {gen_rpc_authenticate_connection, Node, Cookie} when is_atom(Node), is_atom(Cookie) -> + ValidCookie = gen_rpc_helper:get_cookie_per_node(Node), + if + ValidCookie == Cookie -> + Packet = erlang:term_to_binary(gen_rpc_connection_authenticated), + Result = case send(Socket, Packet) of + {error, Reason} -> + ?log(error, "event=transmission_failed socket=\"~s\" peer=\"~s\" node=\"~s\" reason=\"~p\"", + [gen_rpc_helper:socket_to_string(Socket), gen_rpc_helper:peer_to_string(Peer), Node, Reason]), + {error, {badtcp,Reason}}; + ok -> + ?log(debug, "event=transmission_succeeded socket=\"~s\" peer=\"~s\" node=\"~s\"", + [gen_rpc_helper:socket_to_string(Socket), gen_rpc_helper:peer_to_string(Peer), Node]), + ok = activate_socket(Socket), + ok + end, + Result; + true -> + ?log(error, "event=invalid_cookie_received socket=\"~s\" peer=\"~s\" node=\"~s\"", + [gen_rpc_helper:socket_to_string(Socket), gen_rpc_helper:peer_to_string(Peer), Node]), + Packet = erlang:term_to_binary({gen_rpc_connection_rejected, invalid_cookie}), + ok = case send(Socket, Packet) of + {error, Reason} -> + ?log(error, "event=transmission_failed socket=\"~s\" peer=\"~s\" node=\"~s\" reason=\"~p\"", + [gen_rpc_helper:socket_to_string(Socket), gen_rpc_helper:peer_to_string(Peer), Node, Reason]); + ok -> + ?log(debug, "event=transmission_succeeded socket=\"~s\" peer=\"~s\" node=\"~s\"", + [gen_rpc_helper:socket_to_string(Socket), gen_rpc_helper:peer_to_string(Peer), Node]) + end, + {error, {badrpc,invalid_cookie}} + end; OtherData -> ?log(debug, "event=erroneous_data_received socket=\"~s\" peer=\"~s\" data=\"~p\"", [gen_rpc_helper:socket_to_string(Socket), gen_rpc_helper:peer_to_string(Peer), OtherData]), @@ -197,6 +201,10 @@ set_acceptor_opts(Socket) when is_port(Socket) -> ok = inet:setopts(Socket, [{send_timeout, gen_rpc_helper:get_send_timeout(undefined)}|?ACCEPTOR_DEFAULT_TCP_OPTS]), ok. +-spec getstat(port(), list()) -> ok | {error, term()}. +getstat(Socket, OptNames) -> + inet:getstat(Socket, OptNames). + %%% =================================================== %%% Private functions %%% =================================================== diff --git a/src/gen_rpc.app.src b/src/gen_rpc.app.src index b60f5f5..0e3fa45 100644 --- a/src/gen_rpc.app.src +++ b/src/gen_rpc.app.src @@ -40,7 +40,7 @@ %% Fine-graned driver/port control %% for each outgoing client connection %% The internal implementation expects - %% {internal, Map} whereh Map is a map of + %% {internal, Map} where Map is a map of %% node_name => {driver, port} or node_name => driver %% which uses the default port for the specified client driver %% If you have an external service that allows discovery @@ -79,7 +79,15 @@ %% Seconds between probes {socket_keepalive_interval, 5}, %% Probes lost to close the connection - {socket_keepalive_count, 2} + {socket_keepalive_count, 2}, + %% Cookie per node configuration + %% Set a separate cookie per node by setting + %% {internal, #{ Node::atom() => Cookie()::atom() }} + %% If you have an external service that can validate an incoming + %% client has provided the correct cookie / send the correct cookie per + %% node to the server, you can change this setting to + %% {external, Module} + {cookie_per_node, {internal, #{}}} ]}, {modules, []}] }. diff --git a/src/gen_rpc.erl b/src/gen_rpc.erl index 5f415dc..6d1fcd3 100644 --- a/src/gen_rpc.erl +++ b/src/gen_rpc.erl @@ -8,6 +8,8 @@ -author("Panagiotis Papadomitsos "). %%% Include helpful guard macros +-include("guards.hrl"). +%%% Include helpful types -include("types.hrl"). %%% Library interface @@ -32,7 +34,7 @@ -export([sbcast/2, sbcast/3]). %% Misc functions --export([nodes/0]). +-export([nodes/0, monitor_node/2]). %%% =================================================== %%% Library interface @@ -113,21 +115,25 @@ multicall(Nodes, M, F, A, Timeout) -> gen_rpc_client:multicall(Nodes, M, F, A, Timeout). -spec abcast(atom(), term()) -> abcast. -abcast(Name, Msg) when is_atom(Name) -> +abcast(Name, Msg) -> gen_rpc_client:abcast(Name, Msg). -spec abcast(list(), atom(), term()) -> abcast. -abcast(Nodes, Name, Msg) when is_list(Nodes), is_atom(Name) -> +abcast(Nodes, Name, Msg) -> gen_rpc_client:abcast(Nodes, Name, Msg). -spec sbcast(atom(), term()) -> {list(), list()}. -sbcast(Name, Msg) when is_atom(Name) -> +sbcast(Name, Msg) -> gen_rpc_client:sbcast(Name, Msg). -spec sbcast(list(), atom(), term()) -> {list(), list()}. -sbcast(Nodes, Name, Msg) when is_list(Nodes), is_atom(Name) -> +sbcast(Nodes, Name, Msg) -> gen_rpc_client:sbcast(Nodes, Name, Msg). -spec nodes() -> list(). nodes() -> gen_rpc_client_sup:nodes(). + +-spec monitor_node(atom(), boolean()) -> true. +monitor_node(Node, Flag) -> + gen_rpc_monitor:monitor_node(Node, Flag). diff --git a/src/gen_rpc_acceptor.erl b/src/gen_rpc_acceptor.erl index 4cc0bd0..36eb451 100644 --- a/src/gen_rpc_acceptor.erl +++ b/src/gen_rpc_acceptor.erl @@ -184,6 +184,11 @@ waiting_for_data(info, {Driver,Socket,Data}, end, ok = DriverMod:activate_socket(Socket), waiting_for_data(info, {sbcast, Caller, Reply}, State); + ping -> + ?log(debug, "event=ping_received driver=~s socket=\"~s\" peer=\"~s\" action=ignore", + [Driver, gen_rpc_helper:socket_to_string(Socket), gen_rpc_helper:peer_to_string(Peer)]), + ok = DriverMod:activate_socket(Socket), + {keep_state_and_data, gen_rpc_helper:get_inactivity_timeout(?MODULE)}; OtherData -> ?log(debug, "event=erroneous_data_received driver=~s socket=\"~s\" peer=\"~s\" data=\"~p\"", [Driver, gen_rpc_helper:socket_to_string(Socket), gen_rpc_helper:peer_to_string(Peer), OtherData]), @@ -267,7 +272,7 @@ call_middleman(M, F, A) -> catch throw:Term -> Term; exit:Reason -> {badrpc, {'EXIT', Reason}}; - error:Reason -> {badrpc, {'EXIT', {Reason, erlang:get_stacktrace()}}} + error:Reason:Stacktrace -> {badrpc, {'EXIT', {Reason, Stacktrace}}} end, erlang:exit({call_middleman_result, Res}), ok. diff --git a/src/gen_rpc_client.erl b/src/gen_rpc_client.erl index a7bdd04..5f146b4 100644 --- a/src/gen_rpc_client.erl +++ b/src/gen_rpc_client.erl @@ -26,7 +26,8 @@ driver :: atom(), driver_mod :: atom(), driver_closed :: atom(), - driver_error :: atom()}). + driver_error :: atom(), + keepalive :: tuple()}). %%% Supervisor functions -export([start_link/1, stop/1]). @@ -243,13 +244,23 @@ init({Node}) -> ?log(info, "event=initializing_client driver=~s node=\"~s\" port=~B", [Driver, Node, Port]), case DriverMod:connect(Node, Port) of {ok, Socket} -> - case DriverMod:authenticate_server(Socket) of + case DriverMod:authenticate_to_server(Node, Socket) of ok -> - {ok, #state{socket=Socket, - driver=Driver, - driver_mod=DriverMod, - driver_closed=DriverClosed, - driver_error=DriverError}, gen_rpc_helper:get_inactivity_timeout(?MODULE)}; + ok = gen_rpc_monitor:register_node(Node, self()), + Interval = application:get_env(?APP, keepalive_interval, 60), % 60s + KeepaliveFun = keepalive_probe(DriverMod, Socket), + case gen_rpc_keepalive:start(KeepaliveFun, Interval, {keepalive, check}) of + {ok, KeepAlive} -> + {ok, #state{socket=Socket, + driver=Driver, + driver_mod=DriverMod, + driver_closed=DriverClosed, + driver_error=DriverError, + keepalive=KeepAlive}, gen_rpc_helper:get_inactivity_timeout(?MODULE)}; + {error, Error} -> + ?log(error, "event=start_keepalive_failed driver=~p, reason=\"~p\"", [Driver, Error]), + {stop, Error} + end; {error, ReasonTuple} -> ?log(error, "event=client_authentication_failed driver=~s reason=\"~p\"", [Driver, ReasonTuple]), {stop, ReasonTuple} @@ -289,6 +300,7 @@ handle_call(Msg, _Caller, #state{socket=Socket, driver=Driver} = State) -> %% This is the actual CAST handler for CAST handle_cast({{cast,_M,_F,_A} = PacketTuple, SendTO}, State) -> send_cast(PacketTuple, State, SendTO, false); + % send_cast(PacketTuple, State, SendTO, true); %% This is the actual CAST handler for ABCAST handle_cast({{abcast,_Name,_Msg} = PacketTuple, undefined}, State) -> @@ -361,6 +373,18 @@ handle_info(timeout, #state{socket=Socket, driver=Driver} = State) -> [Driver, gen_rpc_helper:socket_to_string(Socket)]), {stop, normal, State}; +handle_info({keepalive, check}, #state{driver=Driver, keepalive=KeepAlive} = State) -> + case gen_rpc_keepalive:check(KeepAlive) of + {ok, KeepAlive1} -> + {noreply, State#state{keepalive=KeepAlive1}, gen_rpc_helper:get_inactivity_timeout(?MODULE)}; + {error, timeout} -> + send_ping(State#state{keepalive=gen_rpc_keepalive:resume(KeepAlive)}); + {error, Reason} -> + ?log(error, "event=keepalive_check_failed driver=~p, reason=\"~p\" action=stopping", + [Driver, Reason]), + {stop, Reason, State} + end; + %% Catch-all for info - our protocol is strict so die! handle_info(Msg, #state{socket=Socket, driver=Driver} = State) -> ?log(error, "event=uknown_message_received driver=~s socket=\"~s\" message=\"~p\" action=stopping", @@ -371,12 +395,21 @@ handle_info(Msg, #state{socket=Socket, driver=Driver} = State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -terminate(_Reason, _State) -> +terminate(_Reason, #state{keepalive=KeepAlive}) -> + gen_rpc_keepalive:cancel(KeepAlive), ok. %%% =================================================== %%% Private functions %%% =================================================== +keepalive_probe(DriverMod, Socket) -> + fun() -> + case DriverMod:getstat(Socket, [recv_oct]) of + {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct}; + {error, Error} -> {error, Error} + end + end. + send_cast(PacketTuple, #state{socket=Socket, driver=Driver, driver_mod=DriverMod} = State, SendTO, Activate) -> Packet = erlang:term_to_binary(PacketTuple), ?log(debug, "event=constructing_cast_term driver=~s socket=\"~s\" cast=\"~p\"", @@ -388,14 +421,31 @@ send_cast(PacketTuple, #state{socket=Socket, driver=Driver, driver_mod=DriverMod [Driver, gen_rpc_helper:socket_to_string(Socket), Reason]), {stop, Reason, State}; ok -> - ok = if Activate =:= true -> DriverMod:activate_socket(Socket); - true -> ok + ok = if + Activate =:= true -> DriverMod:activate_socket(Socket); + true -> ok end, ?log(debug, "message=cast event=transmission_succeeded driver=~s socket=\"~s\"", [Driver, gen_rpc_helper:socket_to_string(Socket)]), {noreply, State, gen_rpc_helper:get_inactivity_timeout(?MODULE)} end. +send_ping(#state{socket=Socket, driver=Driver, driver_mod=DriverMod} = State) -> + Packet = erlang:term_to_binary(ping), + ok = DriverMod:set_send_timeout(Socket, undefined), + case DriverMod:send(Socket, Packet) of + {error, Reason} -> + ?log(error, "message=ping event=transmission_failed driver=~s socket=\"~s\" reason=\"~p\"", + [Driver, gen_rpc_helper:socket_to_string(Socket), Reason]), + {stop, Reason, State}; + ok -> + ?log(debug, "message=ping event=transmission_succeeded driver=~s socket=\"~s\"", + [Driver, gen_rpc_helper:socket_to_string(Socket)]), + %% We should keep this flag same as previous + ok = DriverMod:activate_socket(Socket), + {noreply, State, gen_rpc_helper:get_inactivity_timeout(?MODULE)} + end. + cast_worker(NodeOrTuple, Cast, Ret, SendTO) -> %% Create a unique name for the client because we register as such PidName = gen_rpc_helper:make_process_name("client", NodeOrTuple), diff --git a/src/gen_rpc_driver.erl b/src/gen_rpc_driver.erl index bffc969..aadfd7e 100644 --- a/src/gen_rpc_driver.erl +++ b/src/gen_rpc_driver.erl @@ -10,11 +10,11 @@ -callback listen(inet:port_number()) -> {ok, term()} | {error, term()}. --callback accept(term()) -> ok | {error, term()}. +-callback accept(term()) -> {ok, inet:socket() | ssl:sslsocket()} | {error, term()}. -callback activate_socket(term()) -> ok. --callback authenticate_server(term()) -> ok | {error, {badtcp | badrpc, term()}}. +-callback authenticate_to_server(atom(), term()) -> ok | {error, {badtcp | badrpc, term()}}. -callback authenticate_client(term(), tuple(), binary()) -> ok | {error, {badtcp | badrpc, term()}}. diff --git a/src/gen_rpc_helper.erl b/src/gen_rpc_helper.erl index bb0e87d..ad92063 100644 --- a/src/gen_rpc_helper.erl +++ b/src/gen_rpc_helper.erl @@ -17,7 +17,8 @@ -include("types.hrl"). %%% Public API --export([peer_to_string/1, +-export([to_string/1, + peer_to_string/1, socket_to_string/1, host_from_node/1, set_optimal_process_flags/0, @@ -27,6 +28,7 @@ get_server_driver_options/1, get_client_config_per_node/1, get_client_driver_options/1, + get_cookie_per_node/1, get_connect_timeout/0, get_send_timeout/1, get_rpc_module_control/0, @@ -40,6 +42,14 @@ %%% =================================================== %%% Public API %%% =================================================== +%% Convert any type to atom +to_string(Str) when is_list(Str) -> + Str; +to_string(Atom) when is_atom(Atom) -> + erlang:atom_to_list(Atom); +to_string(Bin) when is_binary(Bin) -> + binary:bin_to_list(Bin). + %% Return the connected peer's IP -spec peer_to_string({inet:ip4_address(), inet:port_number()} | inet:ip4_address()) -> string(). peer_to_string({{A,B,C,D}, Port}) when is_integer(A), is_integer(B), is_integer(C), is_integer(D), is_integer(Port) -> @@ -160,6 +170,20 @@ get_client_config_per_node(Node) when is_atom(Node) -> get_client_config_from_map(Node, NodeMap) end. +-spec get_cookie_per_node(atom()) -> atom(). +get_cookie_per_node(Node) when is_atom(Node) -> + {ok, CookieConfig} = application:get_env(?APP, cookie_per_node), + case CookieConfig of + {external, Module} when is_atom(Module) -> + case Module:get_cookie(Node) of + Cookie when is_atom(Cookie) -> Cookie; + {error, Reason} -> {error, Reason} + end; + {internal, NodeMap} -> + maps:get(Node, NodeMap, erlang:get_cookie()) + end. + + -spec get_connect_timeout() -> timeout(). get_connect_timeout() -> {ok, ConnTO} = application:get_env(?APP, connect_timeout), diff --git a/src/gen_rpc_keepalive.erl b/src/gen_rpc_keepalive.erl new file mode 100644 index 0000000..747af74 --- /dev/null +++ b/src/gen_rpc_keepalive.erl @@ -0,0 +1,81 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2013-2017 EMQ Enterprise, Inc. (http://emqtt.io) +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +%% @doc Client Keepalive + +%% Copied from emqttd +%% -module(emqttd_keepalive). +%% +-module(gen_rpc_keepalive). + +-author("Feng Lee "). + +-export([start/3, check/1, cancel/1, resume/1]). + +-record(keepalive, {statfun :: function() | undefined, + statval :: integer() | undefined, + tsec :: integer() | undefined, + tmsg :: term(), + tref :: reference() | undefined, + repeat :: integer() | undefined}). + +-type(keepalive() :: #keepalive{}). + +-export_type([keepalive/0]). + +%% @doc Start a keepalive +-spec(start(fun(), integer(), any()) -> {ok, keepalive()} | {error, term()}). +start(_, 0, _) -> + {ok, #keepalive{}}; +start(StatFun, TimeoutSec, TimeoutMsg) -> + case StatFun() of + {ok, StatVal} -> + {ok, #keepalive{statfun = StatFun, statval = StatVal, + tsec = TimeoutSec, tmsg = TimeoutMsg, + tref = timer(TimeoutSec, TimeoutMsg), repeat = 0}}; + {error, Error} -> + {error, Error} + end. + +%% @doc Check keepalive, called when timeout. +-spec(check(keepalive()) -> {ok, keepalive()} | {error, term()}). +check(KeepAlive = #keepalive{statfun = StatFun, statval = LastVal, repeat = _Repeat}) -> + case StatFun() of + {ok, NewVal} -> + if NewVal =/= LastVal -> + {ok, resume(KeepAlive#keepalive{statval = NewVal, repeat = 0})}; + % Repeat < 1 -> + % {ok, resume(KeepAlive#keepalive{statval = NewVal, repeat = Repeat + 1})}; + true -> + {error, timeout} + end; + {error, Error} -> + {error, Error} + end. + +resume(KeepAlive = #keepalive{tsec = TimeoutSec, tmsg = TimeoutMsg}) -> + KeepAlive#keepalive{tref = timer(TimeoutSec, TimeoutMsg)}. + +%% @doc Cancel Keepalive +-spec(cancel(keepalive()) -> ok). +cancel(#keepalive{tref = TRef}) when is_reference(TRef) -> + catch erlang:cancel_timer(TRef), ok; +cancel(_) -> + ok. + +timer(Sec, Msg) -> + erlang:send_after(timer:seconds(Sec), self(), Msg). + diff --git a/src/gen_rpc_monitor.erl b/src/gen_rpc_monitor.erl new file mode 100644 index 0000000..0fe6535 --- /dev/null +++ b/src/gen_rpc_monitor.erl @@ -0,0 +1,206 @@ +%%% -*-mode:erlang;coding:utf-8;tab-width:4;c-basic-offset:4;indent-tabs-mode:()-*- +%%% ex: set ft=erlang fenc=utf-8 sts=4 ts=4 sw=4 et: +%%% +%%% Copyright 2015 Panagiotis Papadomitsos. All Rights Reserved. +%%% + +-module(gen_rpc_monitor). + +%%% Behaviour +-behaviour(gen_server). + +%%% Include the HUT library +-include_lib("hut/include/hut.hrl"). +%%% Include this library's name macro +-include("app.hrl"). +%%% Include helpful guard macros +-include("guards.hrl"). +%%% Include helpful types +-include("types.hrl"). + +%%% Local state +-record(state, {pid_to_node :: map(), % Used to map a crashed PID to a specific node + node_conns :: map(), % Used to count connections to a specific node + node_to_sub :: map(), % Used to map a node to its subscribers + sub_to_node :: map(), % Used to map a subsriber PID to a node + sub_to_mref :: map()}). % Used to map a client PID to its monitor reference + +%%% Supervisor functions +-export([start_link/0, stop/0]). + +%%% Server functions +-export([register_node/2, monitor_node/2]). + +%%% Behaviour callbacks +-export([init/1, handle_call/3, handle_cast/2, + handle_info/2, terminate/2, code_change/3]). + +%%% =================================================== +%%% Public API +%%% =================================================== +-spec start_link() -> gen_server:startlink_ret(). +start_link() -> + gen_server:start_link({local,?MODULE}, ?MODULE, [], []). + +-spec stop() -> ok. +stop() -> + gen_server:stop(?MODULE, normal, infinity). + +-spec register_node(atom(), pid()) -> ok | {error, term()}. +register_node(Node, Pid) when is_atom(Node), is_pid(Pid) -> + gen_server:call(?MODULE, {register_node,Node,Pid}, infinity). + +-spec monitor_node(atom(), boolean()) -> true. +monitor_node(Node, Flag) when is_atom(Node), ?is_boolean(Flag) -> + gen_server:call(?MODULE, {monitor_node,Node,Flag}, infinity). + +%%% =================================================== +%%% Behaviour callbacks +%%% =================================================== +init([]) -> + ?log(info, "event=start"), + {ok, #state{pid_to_node=maps:new(), + node_conns=maps:new(), + node_to_sub=maps:new(), + sub_to_node=maps:new(), + sub_to_mref=maps:new() + }}. + +%% Register a node +handle_call({register_node,Node,Pid}, _Caller, #state{pid_to_node=PTN,node_conns=NodeConns} = State) -> + _MRef = erlang:monitor(process, Pid), + %% Save the client PID to node map + %% so we can lookup the PID to node mapping if + %% the gen_rpc client process for that node dies + NewPTN = maps:put(Pid, Node, PTN), + %% Increase the node connection count + NewNodeConns = case maps:get(Node, NodeConns, 0) of + 0 -> + %% The connection to this node is new + %% Notify the subscribers + ?log(debug, "event=new_node_connection_detected node=\"~s\"", [Node]), + ok = gen_server:cast(self(), {notify_node_up,Node}), + maps:put(Node, 1, NodeConns); + NodeConn when NodeConn > 0 -> + maps:put(Node, NodeConn + 1, NodeConns) + end, + {reply, ok, State#state{pid_to_node=NewPTN,node_conns=NewNodeConns}}; + +%% Register a subscriber +handle_call({monitor_node,Node,true}, {Caller,_Tag}, #state{node_to_sub=NTS, sub_to_node=STN, sub_to_mref=STM} = State) -> + %% Register the subscriber in the list of subscribers for that + %% specific node + NewNTS = case maps:find(Node, NTS) of + {ok, SubSet} -> + %% Only register node once + NewSubSet = sets:add_element(Caller, SubSet), + maps:put(Node, NewSubSet, NTS); + error -> + % This node has no members let's create a new set + maps:put(Node, sets:from_list([Caller]), NTS) + end, + %% Register the subscriber to node assignment + NewSTN = maps:put(Caller, Node, STN), + %% Monitor the new subscriber to remove them in case + %% they die + NewSTM = case maps:find(Caller, STM) of + {ok, _MRef} -> + %% Subscriber already has a monitor setup for them + %% Do nothing + STM; + error -> + MRef = erlang:monitor(process, Caller), + maps:put(Caller, MRef, STM) + end, + {reply, true, State#state{node_to_sub=NewNTS, sub_to_node=NewSTN, sub_to_mref=NewSTM}}; + +%% Unregister a subscriber +handle_call({monitor_node,Node,false}, {Caller,_Tag}, #state{node_to_sub=NTS, sub_to_node=STN, sub_to_mref=STM} = State) -> + NewNTS = case maps:find(Node, NTS) of + {ok, SubSet} -> + NewSubSet = sets:del_element(Caller, SubSet), + maps:put(Node, NewSubSet, NTS); + error -> + NTS + end, + %% Unregister the PID from the subscriber to node map + NewSTN = maps:remove(Caller, STN), + %% Unregister the PID from the subscriber to monitor refs map + NewSTM = case maps:find(Caller, STM) of + {ok, MRef} -> + %% Subscriber exists, let's remove the monitor + _Any = erlang:demonitor(MRef, [flush]), + maps:remove(Caller, STM); + error -> + STM + end, + {reply, true, State#state{node_to_sub=NewNTS, sub_to_node=NewSTN, sub_to_mref=NewSTM}}; + +%% Catch-all for calls - die if we get a message we don't expect +handle_call(Msg, _Caller, State) -> + ?log(error, "event=uknown_call_received message=\"~p\" action=stopping", [Msg]), + {stop, {unknown_call, Msg}, State}. + +handle_cast({notify_node_down,Node}, #state{node_to_sub=NTS} = State) -> + ok = lists:foreach(fun(Pid) -> + _Msg = erlang:send(Pid, {nodedown,Node}) + end, sets:to_list(maps:get(Node, NTS, sets:new()))), + {noreply, State}; + +handle_cast({notify_node_up,Node}, #state{node_to_sub=NTS} = State) -> + ok = lists:foreach(fun(Pid) -> + _Msg = erlang:send(Pid, {nodeup,Node}) + end, sets:to_list(maps:get(Node, NTS, sets:new()))), + {noreply, State}; + +%% Catch-all for casts - die if we get a message we don't expect +handle_cast(Msg, State) -> + ?log(error, "event=uknown_cast_received message=\"~p\" action=stopping", [Msg]), + {stop, {unknown_cast, Msg}, State}. + +%% Handle client node disconnects +handle_info({'DOWN', _MRef, process, Pid, _Info}, #state{pid_to_node=PTN, node_conns=NodeConns, node_to_sub=NTS, + sub_to_node=STN, sub_to_mref=STM} = State) -> + %% Get the pid to node assignment + case maps:find(Pid, PTN) of + {ok, Node} -> + %% This is a node event + %% Remove the crashed PID from the pid to node list + NewPTN = maps:remove(Pid, PTN), + {ok, NodeConn} = maps:find(Node, NodeConns), + NewNodeConns = maps:put(Node, NodeConn - 1, NodeConns), + if + NodeConn =:= 0 -> + %% Just a single connection went down, not a notification event + {noreply, State#state{pid_to_node=NewPTN, node_conns=NewNodeConns}}; + true -> + %% The node truly went down, let's notify the subscribers + %% Optimization: only send when there are subscribers + ok = gen_server:cast(self(), {notify_node_down,Node}), + {noreply, State#state{pid_to_node=NewPTN, node_conns=NewNodeConns}} + end; + error -> + %% This is not a node event, this is a subscriber process event + %% The subscriber is dead, remove it + {ok, Node} = maps:find(Pid, STN), + %% Remove from the subscriber to node map + NewSTN = maps:remove(Pid, STN), + %% Remove from the node subscriber lists + {ok, SubSet} = maps:find(Node, NTS), + NewSubSet = sets:del_element(Pid, SubSet), + NewNTS = maps:put(Node, NewSubSet, NTS), + %% Remove from the subscriber to monitor ref map + NewSTM = maps:remove(Pid, STM), + {noreply, State#state{node_to_sub=NewNTS, sub_to_node=NewSTN, sub_to_mref=NewSTM}} + end; + +%% Catch-all for info - our protocol is strict so die! +handle_info(Msg, State) -> + ?log(error, "event=uknown_message_received message=\"~p\" action=stopping", [Msg]), + {stop, {unknown_info, Msg}, State}. + +code_change(_OldVersion, State, _Extra) -> + {ok, State}. + +terminate(_Reason, _State) -> + ok. diff --git a/src/supervisor/gen_rpc_sup.erl b/src/supervisor/gen_rpc_sup.erl index 8fb2af1..adef8c3 100644 --- a/src/supervisor/gen_rpc_sup.erl +++ b/src/supervisor/gen_rpc_sup.erl @@ -30,7 +30,8 @@ init([]) -> {ok, {{one_for_one, 100, 1}, [ {gen_rpc_server_tcp, {gen_rpc_server,start_link,[tcp]}, permanent, 5000, worker, [gen_rpc_server]}, {gen_rpc_server_ssl, {gen_rpc_server,start_link,[ssl]}, permanent, 5000, worker, [gen_rpc_server]}, - {gen_rpc_acceptor_sup, {gen_rpc_acceptor_sup,start_link, []}, permanent, 5000, supervisor, [gen_rpc_acceptor_sup]}, - {gen_rpc_dispatcher, {gen_rpc_dispatcher,start_link, []}, permanent, 5000, worker, [gen_rpc_dispatcher]}, - {gen_rpc_client_sup, {gen_rpc_client_sup,start_link, []}, permanent, 5000, supervisor, [gen_rpc_client_sup]} + {gen_rpc_acceptor_sup, {gen_rpc_acceptor_sup,start_link,[]}, permanent, 5000, supervisor, [gen_rpc_acceptor_sup]}, + {gen_rpc_monitor, {gen_rpc_monitor,start_link,[]}, permanent, 5000, worker, [gen_rpc_monitor]}, + {gen_rpc_dispatcher, {gen_rpc_dispatcher,start_link,[]}, permanent, 5000, worker, [gen_rpc_dispatcher]}, + {gen_rpc_client_sup, {gen_rpc_client_sup,start_link,[]}, permanent, 5000, supervisor, [gen_rpc_client_sup]} ]}}. diff --git a/test/ct/remote_SUITE.erl b/test/ct/remote_SUITE.erl index 82aafd5..656a174 100644 --- a/test/ct/remote_SUITE.erl +++ b/test/ct/remote_SUITE.erl @@ -17,13 +17,13 @@ %%% CT callback functions %%% =================================================== all() -> - [{group, tcp}, {group, ssl}]. + [{group, tcp}, {group, ssl}, {group, ec_ssl}]. % [{group, tcp}]. groups() -> Cases = gen_rpc_test_helper:get_test_functions(?MODULE), % [{tcp, [], Cases}]. - [{tcp, [], Cases}, {ssl, [], Cases}]. + [{tcp, [], Cases}, {ssl, [], Cases}, {ec_ssl, [], Cases}]. init_per_group(Group, Config) -> % Our group name is the name of the driver diff --git a/test/gen_rpc.master.config b/test/gen_rpc.master.config index 1cc978f..a3c08c7 100644 --- a/test/gen_rpc.master.config +++ b/test/gen_rpc.master.config @@ -8,21 +8,23 @@ {tcp_client_port, false}, {ssl_server_port, 5370}, {ssl_client_port, 5370}, + {keepalive_interval, 60}, {ssl_server_options, [ - {certfile, "./priv/ssl/gen_rpc_master@127.0.0.1.cert.pem"}, - {keyfile, "./priv/ssl/gen_rpc_master@127.0.0.1.key.pem"}, - {cacertfile, "./priv/ssl/ca.cert.pem"} + {certfile, "./priv/ec_ssl/gen_rpc_master@127.0.0.1.cert.pem"}, + {keyfile, "./priv/ec_ssl/gen_rpc_master@127.0.0.1.key.pem"}, + {cacertfile, "./priv/ec_ssl/ca.cert.pem"} ]}, {ssl_client_options, [ - {certfile, "./priv/ssl/gen_rpc_master@127.0.0.1.cert.pem"}, - {keyfile, "./priv/ssl/gen_rpc_master@127.0.0.1.key.pem"}, - {cacertfile, "./priv/ssl/ca.cert.pem"} + {certfile, "./priv/ec_ssl/gen_rpc_master@127.0.0.1.cert.pem"}, + {keyfile, "./priv/ec_ssl/gen_rpc_master@127.0.0.1.key.pem"}, + {cacertfile, "./priv/ec_ssl/ca.cert.pem"} ]}, {client_config_per_node, {internal, #{ 'gen_rpc_slave@127.0.0.1' => 5372 }}} ]}, {sasl, [ + {sasl_error_logger, false}, {errlog_type, error}, {error_logger_mf_dir, false} ]}, @@ -32,8 +34,10 @@ {crash_log_size, 0}, {colored, true}, {handlers, [ - {lager_console_backend, [debug, - {lager_default_formatter, ["[", date, " ", time, "] severity=", severity, " node=\"", {node, "undefined"}, "\" pid=\"", pid, + {lager_console_backend, [ + {level, debug}, + {formatter, lager_default_formatter}, + {formatter_config, ["[", date, " ", time, "] severity=", severity, " node=\"", {node, "undefined"}, "\" pid=\"", pid, "\" module=", {module, "gen_rpc"}, " function=", {function, "undefined"}, " ", message, "\n"]} ]} ]} diff --git a/test/gen_rpc.slave.config b/test/gen_rpc.slave.config index 3a165e3..fa5dd9a 100644 --- a/test/gen_rpc.slave.config +++ b/test/gen_rpc.slave.config @@ -8,21 +8,23 @@ {tcp_client_port, false}, {ssl_server_port, 5372}, {ssl_client_port, 5372}, + {keepalive_interval, 60}, {ssl_server_options, [ - {certfile, "./priv/ssl/gen_rpc_slave@127.0.0.1.cert.pem"}, - {keyfile, "./priv/ssl/gen_rpc_slave@127.0.0.1.key.pem"}, - {cacertfile, "./priv/ssl/ca.cert.pem"} + {certfile, "./priv/ec_ssl/gen_rpc_slave@127.0.0.1.cert.pem"}, + {keyfile, "./priv/ec_ssl/gen_rpc_slave@127.0.0.1.key.pem"}, + {cacertfile, "./priv/ec_ssl/ca.cert.pem"} ]}, {ssl_client_options, [ - {certfile, "./priv/ssl/gen_rpc_slave@127.0.0.1.cert.pem"}, - {keyfile, "./priv/ssl/gen_rpc_slave@127.0.0.1.key.pem"}, - {cacertfile, "./priv/ssl/ca.cert.pem"} + {certfile, "./priv/ec_ssl/gen_rpc_slave@127.0.0.1.cert.pem"}, + {keyfile, "./priv/ec_ssl/gen_rpc_slave@127.0.0.1.key.pem"}, + {cacertfile, "./priv/ec_ssl/ca.cert.pem"} ]}, {client_config_per_node, {internal, #{ 'gen_rpc_master@127.0.0.1' => 5370 }}} ]}, {sasl, [ + {sasl_error_logger, false}, {errlog_type, error}, {error_logger_mf_dir, false} ]}, @@ -32,8 +34,10 @@ {crash_log_size, 0}, {colored, true}, {handlers, [ - {lager_console_backend, [debug, - {lager_default_formatter, ["[", date, " ", time, "] severity=", severity, " node=\"", {node, "undefined"}, "\" pid=\"", pid, + {lager_console_backend, [ + {level, debug}, + {formatter, lager_default_formatter}, + {formatter_config, ["[", date, " ", time, "] severity=", severity, " node=\"", {node, "undefined"}, "\" pid=\"", pid, "\" module=", {module, "gen_rpc"}, " function=", {function, "undefined"}, " ", message, "\n"]} ]} ]} diff --git a/test/gen_rpc_test_helper.erl b/test/gen_rpc_test_helper.erl index 16fb966..d1a9b22 100644 --- a/test/gen_rpc_test_helper.erl +++ b/test/gen_rpc_test_helper.erl @@ -116,6 +116,38 @@ set_driver_configuration(ssl, ?SLAVE) -> {cacertfile, CaFile}], [{persistent, true}]]), ok; +set_driver_configuration(ec_ssl, ?MASTER) -> + Prefix = filename:join(["..", "..", ".."]), + CertFile = filename:join([Prefix, "priv", "ec_ssl", atom_to_list(?MASTER)]), + CaFile = filename:join([Prefix, "priv", "ec_ssl", "ca.cert.pem"]), + ok = application:set_env(?APP, default_client_driver, ssl, [{persistent, true}]), + ok = application:set_env(?APP, ssl_server_port, ?MASTER_PORT, [{persistent, true}]), + ok = application:set_env(?APP, ssl_server_options, [ + {certfile, CertFile ++ ".cert.pem"}, + {keyfile, CertFile ++ ".key.pem"}, + {cacertfile, CaFile}], [{persistent, true}]), + ok = application:set_env(?APP, ssl_client_options, [ + {certfile, CertFile ++ ".cert.pem"}, + {keyfile, CertFile ++ ".key.pem"}, + {cacertfile, CaFile}], [{persistent, true}]), + ok; + +set_driver_configuration(ec_ssl, ?SLAVE) -> + Prefix = filename:join(["..", "..", ".."]), + CertFile = filename:join([Prefix, "priv", "ec_ssl", atom_to_list(?SLAVE)]), + CaFile = filename:join([Prefix, "priv", "ec_ssl", "ca.cert.pem"]), + ok = rpc:call(?SLAVE, application, set_env, [?APP, default_client_driver, ssl, [{persistent, true}]]), + ok = rpc:call(?SLAVE, application, set_env, [?APP, ssl_server_port, ?SLAVE_PORT, [{persistent, true}]]), + ok = rpc:call(?SLAVE, application, set_env, [?APP, ssl_server_options, [ + {certfile, CertFile ++ ".cert.pem"}, + {keyfile, CertFile ++ ".key.pem"}, + {cacertfile, CaFile}], [{persistent, true}]]), + ok = rpc:call(?SLAVE, application, set_env, [?APP, ssl_client_options, [ + {certfile, CertFile ++ ".cert.pem"}, + {keyfile, CertFile ++ ".key.pem"}, + {cacertfile, CaFile}], [{persistent, true}]]), + ok; + set_driver_configuration(tcp, ?MASTER) -> ok = application:set_env(?APP, default_client_driver, tcp, [{persistent, true}]), ok = application:set_env(?APP, tcp_server_port, ?MASTER_PORT, [{persistent, true}]), diff --git a/test/include/ct.hrl b/test/include/ct.hrl index 7e10620..8f81d52 100644 --- a/test/include/ct.hrl +++ b/test/include/ct.hrl @@ -18,7 +18,9 @@ -define(DEFAULT_DRIVER, tcp). --define(TEST_APPLICATION_ENV, [{sasl, errlog_type, error}, +-define(TEST_APPLICATION_ENV, [ + {sasl, sasl_error_logger, false}, + {sasl, errlog_type, error}, {sasl, error_logger_mf_dir, false}, {?APP, tcp_server_port, false}, {?APP, ssl_server_port, false}, @@ -26,6 +28,7 @@ ?MASTER => ?MASTER_PORT, ?SLAVE => ?SLAVE_PORT }}}, + {?APP, cookie_per_node, {internal, #{}}}, {?APP, connect_timeout, 500}, {?APP, send_timeout, 500}, {lager, log_root, "./log"}, @@ -34,11 +37,21 @@ {lager, colored, false}, {lager, handlers, [ %% Commented out to reduce test output polution, uncomment during development - % {lager_common_test_backend, [debug, - % {lager_default_formatter, ["[", date, " ", time, "] severity=", severity, " node=\"", {node, "undefined"}, "\" pid=\"", pid, - % "\" module=", {module, "gen_rpc"}, " function=", {function, "undefined"}, " ", message, "\n"]}]}, - {lager_file_backend, [{file, "messages.log"}, {level, debug}, {formatter, lager_default_formatter}, {size, 0}, {date, "$D0"}, {count, 7}, + % {lager_common_test_backend, [ + % {level, debug}, + % {formatter, lager_default_formatter}, + % {formatter_config, ["[", date, " ", time, "] severity=", severity, " node=\"", {node, "undefined"}, "\" pid=\"", pid, + % "\" module=", {module, "gen_rpc"}, " function=", {function, "undefined"}, " ", message, "\n"]} + % ]}, + {lager_file_backend, [ + {file, "messages.log"}, + {level, debug}, + {formatter, lager_default_formatter}, + {size, 0}, + {date, "$D0"}, + {count, 7}, {formatter_config, ["[", date, " ", time, "] severity=", severity, " node=\"", {node, "undefined"}, "\" pid=\"", pid, - "\" module=", {module, "gen_rpc"}, " function=", {function, "undefined"}, " ", message, "\n"]}]} + "\" module=", {module, "gen_rpc"}, " function=", {function, "undefined"}, " ", message, "\n"]} + ]} ]} ]).