From 5ba16e741e16e7b9a8148d41bfd5f8fb6146be0b Mon Sep 17 00:00:00 2001 From: Gabriel Martinez Date: Tue, 28 Jan 2025 19:12:32 -0500 Subject: [PATCH] feat: added features and stability fixes --- channel-sender/config/config-local.yaml | 4 +- channel-sender/docs/swagger.yaml | 39 +- .../lib/channel_sender_ex/core/channel.ex | 106 ++++- .../core/channel_supervisor.ex | 8 +- .../core/pubsub/pub_sub_core.ex | 10 +- .../core/pubsub/re_connect_process.ex | 1 + .../core/security/channel_authenticator.ex | 7 +- .../transport/cowboy_starter.ex | 5 + .../transport/rest/rest_controller.ex | 46 ++- .../lib/channel_sender_ex/transport/socket.ex | 370 ++++++++++++------ channel-sender/mix.exs | 2 +- .../core/channel_integration_test.exs | 57 ++- .../channel_sender_ex/core/channel_test.exs | 6 +- .../core/pubsub/pub_sub_core_test.exs | 17 + .../core/pubsub/re_connect_process_test.exs | 2 +- .../transport/rest/rest_controller_test.exs | 60 ++- .../transport/socket_integration_test.exs | 4 +- 17 files changed, 587 insertions(+), 157 deletions(-) diff --git a/channel-sender/config/config-local.yaml b/channel-sender/config/config-local.yaml index 6aff23d..aca8ec8 100644 --- a/channel-sender/config/config-local.yaml +++ b/channel-sender/config/config-local.yaml @@ -14,7 +14,7 @@ channel_sender_ex: # max time in seconds to wait the client to send the auth token # before closing the channel - socket_idle_timeout: 30000 + socket_idle_timeout: 90000 # Specifies the maximum time (in milliseconds) that the Elixir supervisor waits # for child channel processes to terminate after sending it an exit signal @@ -78,7 +78,7 @@ channel_sender_ex: # for more information about the kubernetes configuration with libcluser logger: - level: info + level: debug diff --git a/channel-sender/docs/swagger.yaml b/channel-sender/docs/swagger.yaml index f5030a2..c023c72 100644 --- a/channel-sender/docs/swagger.yaml +++ b/channel-sender/docs/swagger.yaml @@ -15,10 +15,10 @@ paths: /create: post: tags: - - /ext/channel/ + - /ext/channel summary: Create Channel and session description: | - By passing in the appropriate options, you can regisster a new channel in the system + By passing in the appropriate options, you can register a new channel in the system operationId: createChannel requestBody: description: Channel to create @@ -43,7 +43,7 @@ paths: /deliver_message: post: tags: - - /ext/channel/ + - /ext/channel summary: Deliver an event message to a channel or group of channels description: Deliver an event message to a previusly registered channel_ref, or deliver a message to all channels related to an specific app_ref or user_ref operationId: deliverMessage @@ -73,7 +73,7 @@ paths: /deliver_batch: post: tags: - - /ext/channel/ + - /ext/channel summary: Batch deliver up to 10 event messages description: Deliver event messages to a group of channel_refs operationId: deliverBatchMessages @@ -98,7 +98,36 @@ paths: application/json: schema: $ref: '#/components/schemas/InvalidBodyResponse' - + /: + delete: + tags: + - /ext/channel + summary: Perform a graceful shutdown of a channel processes + description: Perform a graceful shutdown of a channel process and related socket process, if any. + operationId: stopChannel + parameters: + - name: channel_ref + in: query + description: The channel_ref to be stopped + required: true + schema: + type: string + example: beec634503c238f5b84f737275bfd4ba.855b8193bb6f419381eac6cc087aea3f + responses: + "200": + description: If the operation is performed succesfully. + content: + application/json: + schema: + oneOf: + - $ref: '#/components/schemas/SuccessResponse' + "400": + description: Bad request due to missing required parameter + content: + application/json: + schema: + $ref: '#/components/schemas/InvalidBodyResponse' + components: schemas: Messages: diff --git a/channel-sender/lib/channel_sender_ex/core/channel.ex b/channel-sender/lib/channel_sender_ex/core/channel.ex index 12dc72d..94d26e2 100644 --- a/channel-sender/lib/channel_sender_ex/core/channel.ex +++ b/channel-sender/lib/channel_sender_ex/core/channel.ex @@ -31,7 +31,8 @@ defmodule ChannelSenderEx.Core.Channel do pending_sending: ChannelSenderEx.Core.Channel.pending_sending(), stop_cause: atom(), socket_stop_cause: atom(), - user_ref: String.t() + user_ref: String.t(), + meta: String.t() } defstruct channel: "", @@ -41,9 +42,10 @@ defmodule ChannelSenderEx.Core.Channel do pending_sending: BoundedMap.new(), stop_cause: nil, socket_stop_cause: nil, - user_ref: "" + user_ref: "", + meta: nil - def new(channel, application, user_ref) do + def new(channel, application, user_ref, meta) do %Data{ channel: channel, application: application, @@ -52,7 +54,8 @@ defmodule ChannelSenderEx.Core.Channel do pending_sending: BoundedMap.new(), stop_cause: nil, socket_stop_cause: nil, - user_ref: user_ref + user_ref: user_ref, + meta: meta } end @@ -72,6 +75,13 @@ defmodule ChannelSenderEx.Core.Channel do GenStateMachine.call(server, {:socket_disconnected_reason, reason}, timeout) end + @doc """ + get information about this channel + """ + def info(server, timeout \\ @on_connected_channel_reply_timeout) do + GenStateMachine.call(server, :info, timeout) + end + @doc """ operation to mark a message as acknowledged """ @@ -89,18 +99,23 @@ defmodule ChannelSenderEx.Core.Channel do ) end + def stop(server) do + GenStateMachine.call(server, :stop) + end + @spec start_link(any()) :: :gen_statem.start_ret() @doc """ Starts the state machine. """ - def start_link(args = {_channel, _application, _user_ref}, opts \\ []) do + def start_link(args = {_channel, _application, _user_ref, _meta}, opts \\ []) do GenStateMachine.start_link(__MODULE__, args, opts) end @impl GenStateMachine @doc false - def init({channel, application, user_ref}) do - data = Data.new(channel, application, user_ref) + def init({channel, application, user_ref, meta}) do + data = Data.new(channel, application, user_ref, meta) + Logger.debug("Channel #{channel} created. Data: #{inspect(data)}") Process.flag(:trap_exit, true) CustomTelemetry.execute_custom_event([:adf, :channel], %{count: 1}) {:ok, :waiting, data} @@ -123,6 +138,21 @@ defmodule ChannelSenderEx.Core.Channel do end end + def waiting({:call, from}, :info, data) do + actions = [ + _reply = {:reply, from, {:waiting, data}} + ] + {:keep_state_and_data, actions} + end + + def waiting({:call, from}, :stop, data) do + actions = [ + _reply = {:reply, from, :ok} + ] + Logger.info("Channel #{data.channel} stopping, reason: :explicit_close") + {:next_state, :closed, %{data | stop_cause: :explicit_close}, actions} + end + ## stop the process with a timeout cause if the socket is not ## authenticated in the given time def waiting(:state_timeout, :waiting_timeout, data) do @@ -205,6 +235,24 @@ defmodule ChannelSenderEx.Core.Channel do {:keep_state_and_data, [{:state_timeout, refresh_timeout, :refresh_token_timeout}]} end + def connected({:call, from}, :info, data) do + actions = [ + _reply = {:reply, from, {:connected, data}} + ] + {:keep_state_and_data, actions} + end + + def connected({:call, from}, {:socket_connected, socket_pid}, data) do + socket_ref = Process.monitor(socket_pid) + new_data = %{data | socket: {socket_pid, socket_ref}, socket_stop_cause: nil} + + actions = [ + _reply = {:reply, from, :ok} + ] + Logger.debug("Channel #{data.channel} overwritting socket pid.") + {:keep_state, new_data, actions} + end + # this method will be called when the socket is disconnected # to inform this process about the disconnection reason # this will be later used to define if this process will go back to the waiting state @@ -304,8 +352,8 @@ defmodule ChannelSenderEx.Core.Channel do ## Handle info notification when socket process terminates. This method is called because the socket is monitored. ## via Process.monitor(socket_pid) in the waited/connected state. - def connected(:info, {:DOWN, _ref, :process, _object, _reason}, data) do - new_data = %{data | socket: nil} + def connected(:info, {:DOWN, _ref, :process, _object, reason}, data) do + new_data = %{data | socket: nil, socket_stop_cause: reason} Logger.warning("Channel #{data.channel} detected socket close/disconnection. Will enter :waiting state") {:next_state, :waiting, new_data, []} end @@ -321,6 +369,14 @@ defmodule ChannelSenderEx.Core.Channel do {:stop, :normal, %{data | stop_cause: :name_conflict}} end + # capture shutdown signal + def connected(:info, {:EXIT, from_pid, :shutdown}, data) do + source_process = Process.info(from_pid) + Logger.info("Channel #{inspect(data)} received shutdown signal: #{inspect(source_process)}") + :keep_state_and_data + end + + # capture any other info message def connected( :info, info_payload, @@ -330,12 +386,12 @@ defmodule ChannelSenderEx.Core.Channel do {:keep_state_and_data, :postpone} end - @impl true - def terminate(reason, state, data) do - CustomTelemetry.execute_custom_event([:adf, :channel], %{count: -1}) - level = if reason == :normal, do: :info, else: :warning - Logger.log(level, "Channel #{data.channel} terminating, from state #{inspect(state)} and reason #{inspect(reason)}") - :ok + def connected({:call, from}, :stop, data) do + actions = [ + _reply = {:reply, from, :ok} + ] + Logger.debug("Channel #{data.channel} stopping, reason: :explicit_close") + {:next_state, :closed, %{data | stop_cause: :explicit_close}, actions} end defp new_token_message(_data = %{application: app, channel: channel, user_ref: user}) do @@ -343,6 +399,25 @@ defmodule ChannelSenderEx.Core.Channel do ProtocolMessage.of(UUID.uuid4(:hex), ":n_token", new_token) end + ############################################ + ### CLOSED STATE #### + ############################################ + def closed(:enter, _old_state, data) do + {:stop, :normal, data} + end + + @impl true + def terminate(reason, state, data) do + CustomTelemetry.execute_custom_event([:adf, :channel], %{count: -1}) + level = if reason == :normal, do: :info, else: :warning + Logger.log(level, + """ + Channel #{data.channel} terminating, from state #{inspect(state)} + and reason #{inspect(reason)}. Data: #{inspect(data)} + """) + :ok + end + ######################################### ### Support functions #### ######################################### @@ -430,6 +505,7 @@ defmodule ChannelSenderEx.Core.Channel do defp socket_clean_disconnection?(data) do case data.socket_stop_cause do + :normal -> true {:remote, 1000, _} -> true _ -> false end diff --git a/channel-sender/lib/channel_sender_ex/core/channel_supervisor.ex b/channel-sender/lib/channel_sender_ex/core/channel_supervisor.ex index 5747a4c..805a50f 100644 --- a/channel-sender/lib/channel_sender_ex/core/channel_supervisor.ex +++ b/channel-sender/lib/channel_sender_ex/core/channel_supervisor.ex @@ -27,7 +27,9 @@ defmodule ChannelSenderEx.Core.ChannelSupervisor do @type channel_ref :: String.t() @type application :: String.t() @type user_ref :: String.t() - @type channel_init_args :: {channel_ref(), application(), user_ref()} + @type meta :: list() + @type channel_init_args :: {channel_ref(), application(), user_ref(), meta()} + @spec start_channel(channel_init_args()) :: any() def start_channel(args) do Horde.DynamicSupervisor.start_child(__MODULE__, channel_child_spec(args)) @@ -35,12 +37,12 @@ defmodule ChannelSenderEx.Core.ChannelSupervisor do @spec channel_child_spec(channel_init_args()) :: any() @compile {:inline, channel_child_spec: 1} - def channel_child_spec(channel_args = {channel_ref, application, user_ref}) do + def channel_child_spec(channel_args = {channel_ref, application, user_ref, _meta}) do channel_child_spec(channel_args, via_tuple(channel_ref, application, user_ref)) end @compile {:inline, channel_child_spec: 2} - def channel_child_spec(channel_args = {channel_ref, _application, _user_ref}, name) do + def channel_child_spec(channel_args = {channel_ref, _application, _user_ref, _meta}, name) do %{ id: "Channel_#{channel_ref}", start: {Channel, :start_link, [channel_args, [name: name]]}, diff --git a/channel-sender/lib/channel_sender_ex/core/pubsub/pub_sub_core.ex b/channel-sender/lib/channel_sender_ex/core/pubsub/pub_sub_core.ex index 948a2d7..65899de 100644 --- a/channel-sender/lib/channel_sender_ex/core/pubsub/pub_sub_core.ex +++ b/channel-sender/lib/channel_sender_ex/core/pubsub/pub_sub_core.ex @@ -22,7 +22,7 @@ defmodule ChannelSenderEx.Core.PubSub.PubSubCore do Delivers a message to a single channel associated with the given channel reference. If the channel is not found, the message is retried up to @max_retries times with exponential backoff. """ - @spec deliver_to_channel(channel_ref(), ProtocolMessage.t()) :: Channel.deliver_response() + @spec deliver_to_channel(channel_ref(), ProtocolMessage.t()) :: any() def deliver_to_channel(channel_ref, message) do action_fn = fn _ -> do_deliver_to_channel(channel_ref, message) end execute(@min_backoff, @max_backoff, @max_retries, action_fn, fn -> @@ -65,4 +65,12 @@ defmodule ChannelSenderEx.Core.PubSub.PubSubCore do end end + def delete_channel(channel_ref) do + case ChannelRegistry.lookup_channel_addr(channel_ref) do + pid when is_pid(pid) -> + Channel.stop(pid) + :noproc -> + :noproc + end + end end diff --git a/channel-sender/lib/channel_sender_ex/core/pubsub/re_connect_process.ex b/channel-sender/lib/channel_sender_ex/core/pubsub/re_connect_process.ex index 1fa4200..b89f91a 100644 --- a/channel-sender/lib/channel_sender_ex/core/pubsub/re_connect_process.ex +++ b/channel-sender/lib/channel_sender_ex/core/pubsub/re_connect_process.ex @@ -37,6 +37,7 @@ defmodule ChannelSenderEx.Core.PubSub.ReConnectProcess do timeout = Application.get_env(:channel_sender_ex, :on_connected_channel_reply_timeout) Channel.socket_connected(pid, socket_pid, timeout) + pid end catch _type, _err -> :noproc diff --git a/channel-sender/lib/channel_sender_ex/core/security/channel_authenticator.ex b/channel-sender/lib/channel_sender_ex/core/security/channel_authenticator.ex index 5d3240c..96b203d 100644 --- a/channel-sender/lib/channel_sender_ex/core/security/channel_authenticator.ex +++ b/channel-sender/lib/channel_sender_ex/core/security/channel_authenticator.ex @@ -9,11 +9,12 @@ defmodule ChannelSenderEx.Core.Security.ChannelAuthenticator do @type user_ref() :: String.t() @type channel_ref() :: String.t() @type channel_secret() :: String.t() + @type meta() :: list() - @spec create_channel(application(), user_ref()) :: {channel_ref(), channel_secret()} - def create_channel(application, user_ref) do + @spec create_channel(application(), user_ref(), meta()) :: {channel_ref(), channel_secret()} + def create_channel(application, user_ref, meta \\ []) do {channel_ref, _channel_secret} = credentials = create_channel_data_for(application, user_ref) - {:ok, _pid} = ChannelSupervisor.start_channel({channel_ref, application, user_ref}) + {:ok, _pid} = ChannelSupervisor.start_channel({channel_ref, application, user_ref, meta}) credentials end diff --git a/channel-sender/lib/channel_sender_ex/transport/cowboy_starter.ex b/channel-sender/lib/channel_sender_ex/transport/cowboy_starter.ex index f95f72b..0c26ae8 100644 --- a/channel-sender/lib/channel_sender_ex/transport/cowboy_starter.ex +++ b/channel-sender/lib/channel_sender_ex/transport/cowboy_starter.ex @@ -33,7 +33,12 @@ defmodule ChannelSenderEx.Transport.CowboyStarter do CustomTelemetry.execute_custom_event([:adf, :socket, :switchprotocol], %{count: 1}, %{request_path: "/ext/socket", status: 101, code: "0"}) + _ -> + :ok end + + rescue + e -> Logger.warning("Error in metrics callback: #{inspect(e)}") end defp compile_routes(paths) do diff --git a/channel-sender/lib/channel_sender_ex/transport/rest/rest_controller.ex b/channel-sender/lib/channel_sender_ex/transport/rest/rest_controller.ex index a1da639..0219404 100644 --- a/channel-sender/lib/channel_sender_ex/transport/rest/rest_controller.ex +++ b/channel-sender/lib/channel_sender_ex/transport/rest/rest_controller.ex @@ -5,10 +5,14 @@ defmodule ChannelSenderEx.Transport.Rest.RestController do alias ChannelSenderEx.Core.ProtocolMessage alias ChannelSenderEx.Core.PubSub.PubSubCore alias ChannelSenderEx.Core.Security.ChannelAuthenticator + alias Plug.Conn.Query use Plug.Router require Logger + @metadata_headers_max 3 + @metadata_headers_prefix "x-meta-" + plug(Plug.Telemetry, event_prefix: [:channel_sender_ex, :plug]) plug(CORSPlug) plug(:match) @@ -24,16 +28,23 @@ defmodule ChannelSenderEx.Transport.Rest.RestController do post("/ext/channel/create", do: create_channel(conn)) post("/ext/channel/deliver_message", do: deliver_message(conn)) post("/ext/channel/deliver_batch", do: deliver_message(conn)) + delete("/ext/channel", do: close_channel(conn)) match(_, do: send_resp(conn, 404, "Route not found.")) defp create_channel(conn) do - route_create(conn.body_params, conn) + # collect metadata from headers, up to 3 metadata fields + metadata = conn.req_headers + |> Enum.filter(fn {key, _} -> String.starts_with?(key, @metadata_headers_prefix) end) + |> Enum.map(fn {key, value} -> {String.replace(key, @metadata_headers_prefix, ""), String.slice(value, 0, 50)} end) + |> Enum.take(@metadata_headers_max) + route_create(conn.body_params, metadata, conn) end + @spec route_create(map(), list(), Plug.Conn.t()) :: Plug.Conn.t() defp route_create(message = %{ application_ref: application_ref, user_ref: user_ref - }, conn + }, metadata, conn ) do is_valid = message @@ -41,7 +52,7 @@ defmodule ChannelSenderEx.Transport.Rest.RestController do case is_valid do true -> - {channel_ref, channel_secret} = ChannelAuthenticator.create_channel(application_ref, user_ref) + {channel_ref, channel_secret} = ChannelAuthenticator.create_channel(application_ref, user_ref, metadata) response = %{channel_ref: channel_ref, channel_secret: channel_secret} conn @@ -53,10 +64,37 @@ defmodule ChannelSenderEx.Transport.Rest.RestController do end end - defp route_create(_body, conn) do + defp route_create(_body, _metadata, conn) do invalid_body(conn) end + defp close_channel(conn) do + channel = conn.query_string + |> Query.decode + |> Map.get("channel_ref", nil) + case channel do + nil -> + invalid_body(conn) + + _ -> + route_close(channel, conn) + end + end + + defp route_close(channel, conn) do + case PubSubCore.delete_channel(channel) do + :ok -> + conn + |> put_resp_header("content-type", "application/json") + |> send_resp(200, Jason.encode!(%{result: "Ok"})) + + :noproc -> + conn + |> put_resp_header("content-type", "application/json") + |> send_resp(410, Jason.encode!(%{error: "Channel not found"})) + end + end + defp deliver_message(conn) do route_deliver(conn.body_params, conn) end diff --git a/channel-sender/lib/channel_sender_ex/transport/socket.ex b/channel-sender/lib/channel_sender_ex/transport/socket.ex index b767764..9f9a851 100644 --- a/channel-sender/lib/channel_sender_ex/transport/socket.ex +++ b/channel-sender/lib/channel_sender_ex/transport/socket.ex @@ -5,17 +5,35 @@ defmodule ChannelSenderEx.Transport.Socket do @behaviour :cowboy_websocket @channel_key "channel" - # Error code to indicate a generic bad request - @invalid_request_code "1006" + @normal_close_code "3000" - # Error code to indicate bad request, specifically when - # not providing a valid channel reference - @invalid_channel_code "1007" + ## --------------------- + ## Non-Retryable errors + ## --------------------- + + # Error code to indicate a generic bad request. + @invalid_request_code "3006" + + # Error to indicate the shared secret for the channel is invalid + @invalid_secret_code "3008" + + # Error code to indicate that the channel is already connected + # and a new socket process is trying to connect to it. + @invalid_already_stablished "3009" - @invalid_secret_code 1008 + ## ----------------- + ## Retryable errors + ## ----------------- + + # Error code to indicate bad request, specifically when + # not providing a valid channel reference, or when clustered + # the reference its yet visible among all replicas. + # This error code and up, may be retried by the client. + @invalid_channel_code "3050" require Logger + alias ChannelSenderEx.Core.Channel alias ChannelSenderEx.Core.ChannelRegistry alias ChannelSenderEx.Core.ProtocolMessage alias ChannelSenderEx.Core.PubSub.ReConnectProcess @@ -42,7 +60,6 @@ defmodule ChannelSenderEx.Transport.Socket do @impl :cowboy_websocket def init(req = %{method: "GET"}, opts) do init_result = get_relevant_request_info(req) - |> lookup_channel_addr |> process_subprotocol_selection(req) case init_result do @@ -54,75 +71,26 @@ defmodule ChannelSenderEx.Transport.Socket do end end - defp get_relevant_request_info(req) do - # extracts the channel key from the request query string - case :lists.keyfind(@channel_key, 1, :cowboy_req.parse_qs(req)) do - {@channel_key, channel} = resp when byte_size(channel) > 10 -> - Logger.debug("Socket starting, parameters: #{inspect(resp)}") - resp - _ -> - Logger.error("Socket unable to start. channel_ref not found in query string request.") - {:error, @invalid_request_code} - end - end - - defp lookup_channel_addr(channel_ref) do - action_fn = fn _ -> check_channel_registered(channel_ref) end - # retries 3 times the lookup of the channel reference (useful when running as a cluster with several nodes) - # with a backoff strategy of 100ms initial delay and max of 500ms delay. - execute(50, 300, 3, action_fn, fn -> - Logger.error("Socket unable to start. channel_ref process does not exist yet, ref: #{inspect(channel_ref)}") - {:error, @invalid_channel_code} - end) - end - - defp check_channel_registered(res = {@channel_key, channel_ref}) do - case ChannelRegistry.lookup_channel_addr(channel_ref) do - :noproc -> - Logger.warning("Channel #{channel_ref} not found, retrying query...") - :retry - _ -> res - end - end - - defp check_channel_registered(res = {:error, _desc}) do - res - end - - defp process_subprotocol_selection({@channel_key, channel}, req) do - case :cowboy_req.parse_header("sec-websocket-protocol", req) do - :undefined -> - {:cowboy_websocket, req, _state = {channel, :pre_auth, Application.get_env( - :channel_sender_ex, - :message_encoder, - ChannelSenderEx.Transport.Encoders.JsonEncoder - )}, ws_opts()} - - sub_protocols -> - {encoder, req} = - case Enum.member?(sub_protocols, "binary_flow") do - true -> - {BinaryEncoder, - :cowboy_req.set_resp_header("sec-websocket-protocol", "binary_flow", req)} - - false -> - {JsonEncoder, - :cowboy_req.set_resp_header("sec-websocket-protocol", "json_flow", req)} - end - - {:cowboy_websocket, req, _state = {channel, :pre_auth, encoder}, ws_opts()} - end - end - - defp process_subprotocol_selection(err = {:error, _}, _req) do - Logger.error("Socket unable to start. Error: #{inspect(err)}") - err - end - @impl :cowboy_websocket def websocket_init(state) do - Logger.debug("Socket init #{inspect(state)}") - {_commands = [], state} + Logger.debug("Socket init with pid: #{inspect(self())} starting... #{inspect(state)}") + {ref, _, _} = state + proc = lookup_channel_addr({"channel", ref}) + case proc do + {:ok, pid} -> + case validate_channel_is_waiting(pid) do + {:error, desc, data} -> + Logger.warning(""" + Socket init with pid: #{inspect(self())} will not continue for #{ref}. + Error: #{desc}. There is a socket already connected = #{inspect(data.socket)} + """) + {_commands = [{:close, 1001, desc}], state} + _ -> + {_commands = [], state} + end + {:error, desc} -> + {_commands = [{:close, 1001, desc}], state} + end end @impl :cowboy_websocket @@ -143,7 +111,7 @@ defmodule ChannelSenderEx.Transport.Socket do %{request_path: "/ext/socket", status: 101, code: @invalid_secret_code}) Logger.error("Socket unable to authorize connection. Error: #{@invalid_secret_code}-invalid token for channel #{channel}") - {_commands = [{:close, @invalid_secret_code, "Invalid token for channel"}], + {_commands = [{:close, 1001, <<@invalid_secret_code>>}], {channel, :unauthorized}} end end @@ -176,26 +144,6 @@ defmodule ChannelSenderEx.Transport.Socket do {_commands = [], state} end - @compile {:inline, notify_connected: 1} - defp notify_connected(channel) do - Logger.debug("Socket for channel #{channel} connected") - - socket_event_bus = get_param(:socket_event_bus, nil) - ch_pid = socket_event_bus.notify_event({:connected, channel}, self()) - Process.monitor(ch_pid) - end - - @compile {:inline, set_pending: 2} - defp set_pending(state, new_pending), do: :erlang.setelement(5, state, new_pending) - - @compile {:inline, remove_pending: 2} - defp remove_pending(state = {_, :connected, _, _, pending}, message_id) do - case Map.pop(pending, message_id) do - {nil, _} -> {nil, state} - {elem, new_pending} -> {elem, set_pending(state, new_pending)} - end - end - @impl :cowboy_websocket def websocket_info( {:deliver_msg, from = {pid, ref}, message}, @@ -214,11 +162,40 @@ defmodule ChannelSenderEx.Transport.Socket do end end + # @impl :cowboy_websocket + # def websocket_info({:DOWN, ref, :process, _pid, cause}, state = {channel_ref, :connected, _, {_, _, ref}, _}) do + # case cause do + # :normal -> + # Logger.info("Socket for channel #{channel_ref}. Related process #{inspect(ref)} down normally") + # {_commands = [{:close, @normal_close_code, "close"}], state} + # _ -> + # Logger.warning(""" + # Socket for channel #{channel_ref}. Related Process #{inspect(ref)} + # down with cause #{inspect(cause)}. Spawning process for re-conection + # """) + # spawn_monitor(ReConnectProcess, :start, [self(), channel_ref]) + # {_commands = [], state} + # end + # end + @impl :cowboy_websocket - def websocket_info({:DOWN, ref, :process, _pid, _cause}, state = {channel_ref, :connected, _, {_, _, ref}, _}) do - Logger.warning("Socket for channel #{channel_ref} : spawning process for re-conection") - spawn_monitor(ReConnectProcess, :start, [self(), channel_ref]) - {_commands = [], state} + def websocket_info({:DOWN, ref, proc, pid, cause}, state = {channel_ref, _, _, _, _}) do + case cause do + :normal -> + Logger.info("Socket for channel #{channel_ref}. Related process #{inspect(ref)} down normally.") + {_commands = [{:close, 1000, <<@normal_close_code>>}], state} + _ -> + Logger.warning(""" + Socket for channel #{channel_ref}. Related Process #{inspect(ref)} + received DOWN message: #{inspect({ref, proc, pid, cause})}. Spawning process for re-conection + """) + + new_pid = ReConnectProcess.start(self(), channel_ref) + Logger.debug("Socket for channel #{channel_ref} : channel process found for re-conection: #{inspect(new_pid)}") + Process.monitor(new_pid) + + {_commands = [], state} + end end @impl :cowboy_websocket @@ -229,28 +206,195 @@ defmodule ChannelSenderEx.Transport.Socket do end @impl :cowboy_websocket - def websocket_info(_message, state) do + def websocket_info(message, state) do + Logger.warning("Socket received socket info message: #{inspect(message)}, state: #{inspect(state)}") {_commands = [], state} end @impl :cowboy_websocket - def terminate(reason, _partial_req, state) do + def terminate(reason, partial_req, state) do + + Logger.debug("Socket terminate with pid: #{inspect(self())}. REASON: #{inspect(reason)}. REQ: #{inspect(partial_req)}, STATE: #{inspect(state)}") + CustomTelemetry.execute_custom_event([:adf, :socket, :disconnection], %{count: 1}) - case state do - {channel_ref, _, _, _, _} -> - Logger.warning("Socket for channel #{channel_ref} terminated with reason: #{inspect(reason)}") - socket_event_bus = get_param(:socket_event_bus, :noop) - case socket_event_bus do - :noop -> :ok - _ -> - socket_event_bus.notify_event({:socket_down_reason, channel_ref, reason}, self()) - end + + handle_terminate(reason, partial_req, state) + + # level = if reason == :normal, do: :info, else: :warning + # case state do + # { channel_ref, _, _, _, _} -> + # level = if reason == :normal, do: :info, else: :warning + # Logger.log(level, "Socket for channel #{channel_ref} terminated with reason: #{inspect(reason)}") + # post_terminate(reason, channel_ref) + # :ok + # _ -> + # Logger.log(level, "Socket terminated with reason: #{reason}. State: #{inspect(state)}") + # :ok + # end + end + + ############################# + ## + ## SUPPORT FUNCTIONS + ## + ############################# + + defp get_relevant_request_info(req) do + # extracts the channel key from the request query string + case :lists.keyfind(@channel_key, 1, :cowboy_req.parse_qs(req)) do + {@channel_key, channel} = resp when byte_size(channel) > 10 -> + resp _ -> - Logger.warning("Socket terminated with reason: #{reason} and state: #{inspect(state)}") - :ok + Logger.error("Socket unable to start. channel_ref not found in query string request.") + {:error, @invalid_request_code} end end + defp lookup_channel_addr(channel_ref) do + action_fn = fn _ -> check_channel_registered(channel_ref) end + # retries 3 times the lookup of the channel reference (useful when running as a cluster with several nodes) + # with a backoff strategy of 100ms initial delay and max of 500ms delay. + result = execute(50, 300, 3, action_fn, fn -> + Logger.error("Socket unable to start. channel_ref process does not exist yet, ref: #{inspect(channel_ref)}") + {:error, <<@invalid_channel_code>>} + end) + + case result do + {:error, _desc} = e -> + e + {pid, _res} -> + {:ok, pid} + end + end + + defp check_channel_registered(res = {@channel_key, channel_ref}) do + case ChannelRegistry.lookup_channel_addr(channel_ref) do + :noproc -> + Logger.warning("Socket: #{channel_ref} not found, retrying query...") + :retry + pid -> + {pid, res} + end + end + + # defp check_channel_registered({:error, _desc}) do + # :retry + # end + + defp validate_channel_is_waiting(pid) when is_pid(pid) do + {status, data} = Channel.info(pid) + case status do + :waiting -> + # process can continue, and socket process will be linked to the channel process + {:ok, data} + _ -> + # channel is already in a connected state, and a previous socket process + # was already linked to it. + {:error, <<@invalid_already_stablished>>, data} + end + end + + defp process_subprotocol_selection({@channel_key, channel}, req) do + case :cowboy_req.parse_header("sec-websocket-protocol", req) do + :undefined -> + {:cowboy_websocket, req, _state = {channel, :pre_auth, Application.get_env( + :channel_sender_ex, + :message_encoder, + ChannelSenderEx.Transport.Encoders.JsonEncoder + )}, ws_opts()} + + sub_protocols -> + {encoder, req} = + case Enum.member?(sub_protocols, "binary_flow") do + true -> + {BinaryEncoder, + :cowboy_req.set_resp_header("sec-websocket-protocol", "binary_flow", req)} + + false -> + {JsonEncoder, + :cowboy_req.set_resp_header("sec-websocket-protocol", "json_flow", req)} + end + + {:cowboy_websocket, req, _state = {channel, :pre_auth, encoder}, ws_opts()} + end + end + + defp process_subprotocol_selection(err = {:error, _}, req) do + Logger.error("Socket unable to start. Error: #{inspect(err)}. Request: #{inspect(req)}") + err + end + + @compile {:inline, notify_connected: 1} + defp notify_connected(channel) do + socket_event_bus = get_param(:socket_event_bus, nil) + ch_pid = socket_event_bus.notify_event({:connected, channel}, self()) + Process.monitor(ch_pid) + end + + @compile {:inline, set_pending: 2} + defp set_pending(state, new_pending), do: :erlang.setelement(5, state, new_pending) + + @compile {:inline, remove_pending: 2} + defp remove_pending(state = {_, :connected, _, _, pending}, message_id) do + case Map.pop(pending, message_id) do + {nil, _} -> {nil, state} + {elem, new_pending} -> {elem, set_pending(state, new_pending)} + end + end + + defp handle_terminate(:normal, _req, state) do + Logger.info("Socket with pid: #{inspect(self())} terminated with cause :normal. STATE: #{inspect(state)}") + :ok + end + + defp handle_terminate(:remote, _req, state) do + Logger.info("Socket with pid: #{inspect(self())} terminated with cause :remote. STATE: #{inspect(state)}") + :ok + end + + defp handle_terminate(cause = {:remote, _code, _}, _req, state) do + {channel_ref, _, _, _, _} = state + Logger.info("Socket with pid: #{inspect(self())}, for ref #{channel_ref} terminated normally. CAUSE: #{inspect(cause)}") + :ok + end + + defp handle_terminate(:stop, _req, state) do + channel_ref = case state do + {ref, _, _} -> ref + {ref, _unauthorized_atom} -> ref + _ -> state + end + Logger.info("Socket with pid: #{inspect(self())}, for ref #{channel_ref} terminated with :stop. STATE: #{inspect(state)}") + :ok + end + + defp handle_terminate(:timeout, _req, state) do + channel_ref = case state do + {ref, _, _} -> ref + {ref, _, _, _, _} -> ref + _ -> state + end + Logger.info("Socket with pid: #{inspect(self())}, for ref #{inspect(channel_ref)} terminated with :timeout. STATE: #{inspect(state)}") + :ok + end + + defp handle_terminate({:error, :closed}, _req, state) do + channel_ref = case state do + {ref, _, _} -> ref + _ -> state + end + Logger.warning("Socket with pid: #{inspect(self())}, for ref #{inspect(channel_ref)} was closed without receiving closing frame first") + :ok + end + + # handle other possible termination reasons: + # {:crash, Class, Reason} + # {:error, :badencoding | :badframe | :closed | Reason} + defp handle_terminate(reason, _req, state) do + Logger.info("Socket with pid: #{inspect(self())}, terminated with reason: #{inspect(reason)}. STATE: #{inspect(state)}") + :ok + end + @compile {:inline, auth_ok_frame: 1} defp auth_ok_frame(encoder) do encoder.simple_frame("AuthOk") @@ -259,7 +403,7 @@ defmodule ChannelSenderEx.Transport.Socket do end defp ws_opts do - timeout = get_param(:socket_idle_timeout, 900) + timeout = get_param(:socket_idle_timeout, 90_000) %{ idle_timeout: timeout, # active_n: 5, diff --git a/channel-sender/mix.exs b/channel-sender/mix.exs index 940e9f4..f2db13a 100644 --- a/channel-sender/mix.exs +++ b/channel-sender/mix.exs @@ -4,7 +4,7 @@ defmodule ChannelSenderEx.MixProject do def project do [ app: :channel_sender_ex, - version: "0.2.1", + version: "0.2.2", elixir: "~> 1.16", start_permanent: Mix.env() == :prod, deps: deps(), diff --git a/channel-sender/test/channel_sender_ex/core/channel_integration_test.exs b/channel-sender/test/channel_sender_ex/core/channel_integration_test.exs index ee03e63..70132f2 100644 --- a/channel-sender/test/channel_sender_ex/core/channel_integration_test.exs +++ b/channel-sender/test/channel_sender_ex/core/channel_integration_test.exs @@ -82,6 +82,41 @@ defmodule ChannelSenderEx.Core.ChannelIntegrationTest do Process.sleep(100) end + test "Should just connect and allow to close explicit", %{ + port: port, + channel: channel, + secret: secret + } do + + # start socket connection and authenticate + {conn, _stream} = assert_connect_and_authenticate(port, channel, secret) + + # call for stop + channel_pid = ChannelRegistry.lookup_channel_addr(channel) + :ok = Channel.stop(channel_pid) + + Process.sleep(100) + + # validate is no longer alive + refute Process.alive?(channel_pid) + end + + test "Should just connect and allow to call close, even when no socket and channel process waiting", %{ + port: port, + channel: channel, + secret: secret + } do + + # call for stop + channel_pid = ChannelRegistry.lookup_channel_addr(channel) + :ok = Channel.stop(channel_pid) + + Process.sleep(200) + + # validate is no longer alive + refute Process.alive?(channel_pid) + end + test "Should send messages collected while in wait state", %{ port: port, channel: channel, @@ -121,8 +156,10 @@ defmodule ChannelSenderEx.Core.ChannelIntegrationTest do channel: channel, secret: secret } do - Helper.compile(:channel_sender_ex, channel_shutdown_on_clean_close: 0) - Helper.compile(:channel_sender_ex, channel_shutdown_on_disconnection: 0) + Application.put_env(:channel_sender_ex, :channel_shutdown_on_clean_close, 0) + Application.put_env(:channel_sender_ex, :channel_shutdown_on_disconnection, 0) + Helper.compile(:channel_sender_ex) + {conn, stream} = assert_connect_and_authenticate(port, channel, secret) assert {:accepted_connected, _, _} = deliver_message(channel) assert_receive {:gun_ws, ^conn, ^stream, {:text, _data_string}} @@ -159,7 +196,7 @@ defmodule ChannelSenderEx.Core.ChannelIntegrationTest do end test "Should send pending messages to twin process when terminated by supervisor merge (name conflict)" do - channel_args = {"channel_ref", "application", "user_ref"} + channel_args = {"channel_ref", "application", "user_ref", []} {:ok, _} = Horde.DynamicSupervisor.start_link(name: :sup1, strategy: :one_for_one) {:ok, _} = Horde.DynamicSupervisor.start_link(name: :sup2, strategy: :one_for_one) {:ok, _} = Horde.Registry.start_link(name: :reg1, keys: :unique) @@ -186,6 +223,20 @@ defmodule ChannelSenderEx.Core.ChannelIntegrationTest do assert Map.get(pending_msg, "82") == msg2 end + test "Should not allow multiple socket to one channel process", %{ + port: port, + channel: channel, + secret: secret + } do + {conn, stream} = assert_connect_and_authenticate(port, channel, secret) + + # try to open a new socket connection and link it to the same channel + conn2 = connect(port, channel) + assert_receive {:gun_ws, _, _, {:close, 1001, "3009"}}, 500 + + :gun.close(conn2) + end + defp deliver_message(channel, message_id \\ "42") do {data, message} = build_message(message_id) channel_response = PubSubCore.deliver_to_channel(channel, message) diff --git a/channel-sender/test/channel_sender_ex/core/channel_test.exs b/channel-sender/test/channel_sender_ex/core/channel_test.exs index 7aeb8c5..bf369fb 100644 --- a/channel-sender/test/channel_sender_ex/core/channel_test.exs +++ b/channel-sender/test/channel_sender_ex/core/channel_test.exs @@ -48,7 +48,7 @@ defmodule ChannelSenderEx.Core.ChannelTest do end) {:ok, - init_args: {channel_ref, app, user_ref}, + init_args: {channel_ref, app, user_ref, []}, message: %{ message_id: "32452", correlation_id: "1111", @@ -64,7 +64,7 @@ defmodule ChannelSenderEx.Core.ChannelTest do :accepted_connected = Channel.deliver_message(pid, message_to_send) assert_receive {:deliver_msg, _from = {^pid, _ref}, ^message_to_send} - {_, app, user} = init_args + {_, app, user, []} = init_args data = %Data{application: app, user_ref: user} assert data.application == app @@ -133,7 +133,7 @@ defmodule ChannelSenderEx.Core.ChannelTest do end) end - test "Should send new token in correct interval", %{init_args: init_args = {channel, _, _}} do + test "Should send new token in correct interval", %{init_args: init_args = {channel, _, _, _}} do Helper.compile(:channel_sender_ex, max_age: 2) {:ok, pid} = start_channel_safe(init_args) :sys.trace(pid, true) diff --git a/channel-sender/test/channel_sender_ex/core/pubsub/pub_sub_core_test.exs b/channel-sender/test/channel_sender_ex/core/pubsub/pub_sub_core_test.exs index 7210af8..58f0c82 100644 --- a/channel-sender/test/channel_sender_ex/core/pubsub/pub_sub_core_test.exs +++ b/channel-sender/test/channel_sender_ex/core/pubsub/pub_sub_core_test.exs @@ -54,4 +54,21 @@ defmodule ChannelSenderEx.Core.PubSub.PubSubCoreTest do end end + test "should handle call to delete (end) non-existent channel" do + with_mock( + ChannelRegistry, [lookup_channel_addr: fn(_) -> :noproc end] + ) do + assert :noproc == PubSubCore.delete_channel("channel_ref") + end + end + + test "should handle call to delete (end) existent channel" do + with_mocks([ + {ChannelRegistry, [], [lookup_channel_addr: fn(_) -> :c.pid(0, 255, 0) end]}, + {Channel, [], [stop: fn(_) -> :ok end]} + ]) do + assert :ok == PubSubCore.delete_channel("channel_ref") + end + end + end diff --git a/channel-sender/test/channel_sender_ex/core/pubsub/re_connect_process_test.exs b/channel-sender/test/channel_sender_ex/core/pubsub/re_connect_process_test.exs index c6e7b20..9c3547f 100644 --- a/channel-sender/test/channel_sender_ex/core/pubsub/re_connect_process_test.exs +++ b/channel-sender/test/channel_sender_ex/core/pubsub/re_connect_process_test.exs @@ -39,7 +39,7 @@ defmodule ChannelSenderEx.Core.PubSub.ReConnectProcessTest do {ChannelRegistry, [], [lookup_channel_addr: fn(_) -> :c.pid(0, 200, 0) end]}, {Channel, [], [socket_connected: fn(_, _, _) -> :ok end]}, ]) do - assert ReConnectProcess.connect_socket_to_channel("channel_ref", :c.pid(0, 250, 0)) == :ok + assert is_pid(ReConnectProcess.connect_socket_to_channel("channel_ref", :c.pid(0, 250, 0))) end end diff --git a/channel-sender/test/channel_sender_ex/transport/rest/rest_controller_test.exs b/channel-sender/test/channel_sender_ex/transport/rest/rest_controller_test.exs index ea2bdbd..2b408a6 100644 --- a/channel-sender/test/channel_sender_ex/transport/rest/rest_controller_test.exs +++ b/channel-sender/test/channel_sender_ex/transport/rest/rest_controller_test.exs @@ -29,10 +29,11 @@ defmodule ChannelSenderEx.Transport.Rest.RestControllerTest do test "Should create channel on request" do body = Jason.encode!(%{application_ref: "some_application", user_ref: "user_ref_00117ALM"}) - with_mock ChannelAuthenticator, [create_channel: fn(_, _) -> {"xxxx", "yyyy"} end] do + with_mock ChannelAuthenticator, [create_channel: fn(_, _, _) -> {"xxxx", "yyyy"} end] do conn = conn(:post, "/ext/channel/create", body) |> put_req_header("content-type", "application/json") + |> put_req_header("x-meta-foo", "bar") conn = RestController.call(conn, @options) @@ -350,4 +351,61 @@ defmodule ChannelSenderEx.Transport.Rest.RestControllerTest do assert %{"error" => "Invalid request" <> _rest} = Jason.decode!(conn.resp_body) end + test "Should be able to call for close channel" do + body = Jason.encode!(%{application_ref: "some_application", user_ref: "user_ref_00117ALM"}) + + with_mocks([ + {PubSubCore, [], [delete_channel: fn(_) -> :ok end]} + ]) do + + conn2 = conn(:delete, "/ext/channel?channel_ref=xxxx") + |> put_req_header("accept", "application/json") + + conn2 = RestController.call(conn2, @options) + + assert conn2.status == 200 + + assert %{"result" => "Ok"} = + Jason.decode!(conn2.resp_body) + end + end + + test "Should be able to handle call for close unexistent channel" do + with_mocks([ + {PubSubCore, [], [delete_channel: fn(_) -> :noproc end]} + ]) do + + # then call for close + + conn2 = conn(:delete, "/ext/channel?channel_ref=xxxx") + |> put_req_header("accept", "application/json") + + conn2 = RestController.call(conn2, @options) + + assert conn2.status == 410 + + assert %{"error" => "Channel not found"} = + Jason.decode!(conn2.resp_body) + + end + end + + test "Should be able to handle invalid requesr for close channel" do + with_mocks([ + {PubSubCore, [], [delete_channel: fn(_) -> :noproc end]} + ]) do + + conn2 = conn(:delete, "/ext/channel") + |> put_req_header("accept", "application/json") + + conn2 = RestController.call(conn2, @options) + + assert conn2.status == 400 + + assert %{"error" => "Invalid request", "request" => %{}} = + Jason.decode!(conn2.resp_body) + + end + end + end diff --git a/channel-sender/test/channel_sender_ex/transport/socket_integration_test.exs b/channel-sender/test/channel_sender_ex/transport/socket_integration_test.exs index a77ea23..9bb200d 100644 --- a/channel-sender/test/channel_sender_ex/transport/socket_integration_test.exs +++ b/channel-sender/test/channel_sender_ex/transport/socket_integration_test.exs @@ -102,7 +102,7 @@ defmodule ChannelSenderEx.Transport.SocketIntegrationTest do conn = connect(port, channel) assert_receive {:gun_upgrade, ^conn, stream, ["websocket"], _headers} :gun.ws_send(conn, {:text, "Auth::#{secret}Invalid"}) - assert_receive {:gun_ws, ^conn, ^stream, {:close, 1008, "Invalid token for channel"}} + assert_receive {:gun_ws, ^conn, ^stream, {:close, 1001, "3008"}} assert_receive {:gun_down, ^conn, :ws, :closed, [], []} refute_receive {:gun_up, _conn, _} :gun.close(conn) @@ -346,7 +346,7 @@ defmodule ChannelSenderEx.Transport.SocketIntegrationTest do sub_protocol -> connect(port, channel, sub_protocol) end - assert_receive {:gun_response, ^conn, stream, :fin, 400, _headers}, 1000 + assert_receive {:gun_ws, ^conn, stream, {:close, 1001, "3050"}}, 1000 {conn, stream} end