Skip to content

Commit

Permalink
feat: added features and stability fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
gabheadz committed Jan 29, 2025
1 parent 359548f commit 5ba16e7
Show file tree
Hide file tree
Showing 17 changed files with 587 additions and 157 deletions.
4 changes: 2 additions & 2 deletions channel-sender/config/config-local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ channel_sender_ex:

# max time in seconds to wait the client to send the auth token
# before closing the channel
socket_idle_timeout: 30000
socket_idle_timeout: 90000

# Specifies the maximum time (in milliseconds) that the Elixir supervisor waits
# for child channel processes to terminate after sending it an exit signal
Expand Down Expand Up @@ -78,7 +78,7 @@ channel_sender_ex:
# for more information about the kubernetes configuration with libcluser

logger:
level: info
level: debug



39 changes: 34 additions & 5 deletions channel-sender/docs/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ paths:
/create:
post:
tags:
- /ext/channel/
- /ext/channel
summary: Create Channel and session
description: |
By passing in the appropriate options, you can regisster a new channel in the system
By passing in the appropriate options, you can register a new channel in the system
operationId: createChannel
requestBody:
description: Channel to create
Expand All @@ -43,7 +43,7 @@ paths:
/deliver_message:
post:
tags:
- /ext/channel/
- /ext/channel
summary: Deliver an event message to a channel or group of channels
description: Deliver an event message to a previusly registered channel_ref, or deliver a message to all channels related to an specific app_ref or user_ref
operationId: deliverMessage
Expand Down Expand Up @@ -73,7 +73,7 @@ paths:
/deliver_batch:
post:
tags:
- /ext/channel/
- /ext/channel
summary: Batch deliver up to 10 event messages
description: Deliver event messages to a group of channel_refs
operationId: deliverBatchMessages
Expand All @@ -98,7 +98,36 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/InvalidBodyResponse'

/:
delete:
tags:
- /ext/channel
summary: Perform a graceful shutdown of a channel processes
description: Perform a graceful shutdown of a channel process and related socket process, if any.
operationId: stopChannel
parameters:
- name: channel_ref
in: query
description: The channel_ref to be stopped
required: true
schema:
type: string
example: beec634503c238f5b84f737275bfd4ba.855b8193bb6f419381eac6cc087aea3f
responses:
"200":
description: If the operation is performed succesfully.
content:
application/json:
schema:
oneOf:
- $ref: '#/components/schemas/SuccessResponse'
"400":
description: Bad request due to missing required parameter
content:
application/json:
schema:
$ref: '#/components/schemas/InvalidBodyResponse'

components:
schemas:
Messages:
Expand Down
106 changes: 91 additions & 15 deletions channel-sender/lib/channel_sender_ex/core/channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ defmodule ChannelSenderEx.Core.Channel do
pending_sending: ChannelSenderEx.Core.Channel.pending_sending(),
stop_cause: atom(),
socket_stop_cause: atom(),
user_ref: String.t()
user_ref: String.t(),
meta: String.t()
}

defstruct channel: "",
Expand All @@ -41,9 +42,10 @@ defmodule ChannelSenderEx.Core.Channel do
pending_sending: BoundedMap.new(),
stop_cause: nil,
socket_stop_cause: nil,
user_ref: ""
user_ref: "",
meta: nil

def new(channel, application, user_ref) do
def new(channel, application, user_ref, meta) do
%Data{
channel: channel,
application: application,
Expand All @@ -52,7 +54,8 @@ defmodule ChannelSenderEx.Core.Channel do
pending_sending: BoundedMap.new(),
stop_cause: nil,
socket_stop_cause: nil,
user_ref: user_ref
user_ref: user_ref,
meta: meta
}
end

Expand All @@ -72,6 +75,13 @@ defmodule ChannelSenderEx.Core.Channel do
GenStateMachine.call(server, {:socket_disconnected_reason, reason}, timeout)
end

@doc """
get information about this channel
"""
def info(server, timeout \\ @on_connected_channel_reply_timeout) do
GenStateMachine.call(server, :info, timeout)
end

@doc """
operation to mark a message as acknowledged
"""
Expand All @@ -89,18 +99,23 @@ defmodule ChannelSenderEx.Core.Channel do
)
end

def stop(server) do
GenStateMachine.call(server, :stop)
end

@spec start_link(any()) :: :gen_statem.start_ret()
@doc """
Starts the state machine.
"""
def start_link(args = {_channel, _application, _user_ref}, opts \\ []) do
def start_link(args = {_channel, _application, _user_ref, _meta}, opts \\ []) do
GenStateMachine.start_link(__MODULE__, args, opts)
end

@impl GenStateMachine
@doc false
def init({channel, application, user_ref}) do
data = Data.new(channel, application, user_ref)
def init({channel, application, user_ref, meta}) do
data = Data.new(channel, application, user_ref, meta)
Logger.debug("Channel #{channel} created. Data: #{inspect(data)}")
Process.flag(:trap_exit, true)
CustomTelemetry.execute_custom_event([:adf, :channel], %{count: 1})
{:ok, :waiting, data}
Expand All @@ -123,6 +138,21 @@ defmodule ChannelSenderEx.Core.Channel do
end
end

def waiting({:call, from}, :info, data) do
actions = [
_reply = {:reply, from, {:waiting, data}}
]
{:keep_state_and_data, actions}
end

def waiting({:call, from}, :stop, data) do
actions = [
_reply = {:reply, from, :ok}
]
Logger.info("Channel #{data.channel} stopping, reason: :explicit_close")
{:next_state, :closed, %{data | stop_cause: :explicit_close}, actions}
end

## stop the process with a timeout cause if the socket is not
## authenticated in the given time
def waiting(:state_timeout, :waiting_timeout, data) do
Expand Down Expand Up @@ -205,6 +235,24 @@ defmodule ChannelSenderEx.Core.Channel do
{:keep_state_and_data, [{:state_timeout, refresh_timeout, :refresh_token_timeout}]}
end

def connected({:call, from}, :info, data) do
actions = [
_reply = {:reply, from, {:connected, data}}
]
{:keep_state_and_data, actions}
end

def connected({:call, from}, {:socket_connected, socket_pid}, data) do
socket_ref = Process.monitor(socket_pid)
new_data = %{data | socket: {socket_pid, socket_ref}, socket_stop_cause: nil}

actions = [
_reply = {:reply, from, :ok}
]
Logger.debug("Channel #{data.channel} overwritting socket pid.")
{:keep_state, new_data, actions}
end

# this method will be called when the socket is disconnected
# to inform this process about the disconnection reason
# this will be later used to define if this process will go back to the waiting state
Expand Down Expand Up @@ -304,8 +352,8 @@ defmodule ChannelSenderEx.Core.Channel do

## Handle info notification when socket process terminates. This method is called because the socket is monitored.
## via Process.monitor(socket_pid) in the waited/connected state.
def connected(:info, {:DOWN, _ref, :process, _object, _reason}, data) do
new_data = %{data | socket: nil}
def connected(:info, {:DOWN, _ref, :process, _object, reason}, data) do
new_data = %{data | socket: nil, socket_stop_cause: reason}
Logger.warning("Channel #{data.channel} detected socket close/disconnection. Will enter :waiting state")
{:next_state, :waiting, new_data, []}
end
Expand All @@ -321,6 +369,14 @@ defmodule ChannelSenderEx.Core.Channel do
{:stop, :normal, %{data | stop_cause: :name_conflict}}
end

# capture shutdown signal
def connected(:info, {:EXIT, from_pid, :shutdown}, data) do
source_process = Process.info(from_pid)
Logger.info("Channel #{inspect(data)} received shutdown signal: #{inspect(source_process)}")
:keep_state_and_data
end

# capture any other info message
def connected(
:info,
info_payload,
Expand All @@ -330,19 +386,38 @@ defmodule ChannelSenderEx.Core.Channel do
{:keep_state_and_data, :postpone}
end

@impl true
def terminate(reason, state, data) do
CustomTelemetry.execute_custom_event([:adf, :channel], %{count: -1})
level = if reason == :normal, do: :info, else: :warning
Logger.log(level, "Channel #{data.channel} terminating, from state #{inspect(state)} and reason #{inspect(reason)}")
:ok
def connected({:call, from}, :stop, data) do
actions = [
_reply = {:reply, from, :ok}
]
Logger.debug("Channel #{data.channel} stopping, reason: :explicit_close")
{:next_state, :closed, %{data | stop_cause: :explicit_close}, actions}
end

defp new_token_message(_data = %{application: app, channel: channel, user_ref: user}) do
new_token = ChannelIDGenerator.generate_token(channel, app, user)
ProtocolMessage.of(UUID.uuid4(:hex), ":n_token", new_token)
end

############################################
### CLOSED STATE ####
############################################
def closed(:enter, _old_state, data) do
{:stop, :normal, data}
end

@impl true
def terminate(reason, state, data) do
CustomTelemetry.execute_custom_event([:adf, :channel], %{count: -1})
level = if reason == :normal, do: :info, else: :warning
Logger.log(level,
"""
Channel #{data.channel} terminating, from state #{inspect(state)}
and reason #{inspect(reason)}. Data: #{inspect(data)}
""")
:ok
end

#########################################
### Support functions ####
#########################################
Expand Down Expand Up @@ -430,6 +505,7 @@ defmodule ChannelSenderEx.Core.Channel do

defp socket_clean_disconnection?(data) do
case data.socket_stop_cause do
:normal -> true
{:remote, 1000, _} -> true
_ -> false
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,22 @@ defmodule ChannelSenderEx.Core.ChannelSupervisor do
@type channel_ref :: String.t()
@type application :: String.t()
@type user_ref :: String.t()
@type channel_init_args :: {channel_ref(), application(), user_ref()}
@type meta :: list()
@type channel_init_args :: {channel_ref(), application(), user_ref(), meta()}

@spec start_channel(channel_init_args()) :: any()
def start_channel(args) do
Horde.DynamicSupervisor.start_child(__MODULE__, channel_child_spec(args))
end

@spec channel_child_spec(channel_init_args()) :: any()
@compile {:inline, channel_child_spec: 1}
def channel_child_spec(channel_args = {channel_ref, application, user_ref}) do
def channel_child_spec(channel_args = {channel_ref, application, user_ref, _meta}) do
channel_child_spec(channel_args, via_tuple(channel_ref, application, user_ref))
end

@compile {:inline, channel_child_spec: 2}
def channel_child_spec(channel_args = {channel_ref, _application, _user_ref}, name) do
def channel_child_spec(channel_args = {channel_ref, _application, _user_ref, _meta}, name) do
%{
id: "Channel_#{channel_ref}",
start: {Channel, :start_link, [channel_args, [name: name]]},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ defmodule ChannelSenderEx.Core.PubSub.PubSubCore do
Delivers a message to a single channel associated with the given channel reference.
If the channel is not found, the message is retried up to @max_retries times with exponential backoff.
"""
@spec deliver_to_channel(channel_ref(), ProtocolMessage.t()) :: Channel.deliver_response()
@spec deliver_to_channel(channel_ref(), ProtocolMessage.t()) :: any()
def deliver_to_channel(channel_ref, message) do
action_fn = fn _ -> do_deliver_to_channel(channel_ref, message) end
execute(@min_backoff, @max_backoff, @max_retries, action_fn, fn ->
Expand Down Expand Up @@ -65,4 +65,12 @@ defmodule ChannelSenderEx.Core.PubSub.PubSubCore do
end
end

def delete_channel(channel_ref) do
case ChannelRegistry.lookup_channel_addr(channel_ref) do
pid when is_pid(pid) ->
Channel.stop(pid)
:noproc ->
:noproc
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ defmodule ChannelSenderEx.Core.PubSub.ReConnectProcess do
timeout = Application.get_env(:channel_sender_ex,
:on_connected_channel_reply_timeout)
Channel.socket_connected(pid, socket_pid, timeout)
pid
end
catch
_type, _err -> :noproc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ defmodule ChannelSenderEx.Core.Security.ChannelAuthenticator do
@type user_ref() :: String.t()
@type channel_ref() :: String.t()
@type channel_secret() :: String.t()
@type meta() :: list()

@spec create_channel(application(), user_ref()) :: {channel_ref(), channel_secret()}
def create_channel(application, user_ref) do
@spec create_channel(application(), user_ref(), meta()) :: {channel_ref(), channel_secret()}
def create_channel(application, user_ref, meta \\ []) do
{channel_ref, _channel_secret} = credentials = create_channel_data_for(application, user_ref)
{:ok, _pid} = ChannelSupervisor.start_channel({channel_ref, application, user_ref})
{:ok, _pid} = ChannelSupervisor.start_channel({channel_ref, application, user_ref, meta})
credentials
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@ defmodule ChannelSenderEx.Transport.CowboyStarter do
CustomTelemetry.execute_custom_event([:adf, :socket, :switchprotocol],
%{count: 1},
%{request_path: "/ext/socket", status: 101, code: "0"})
_ ->
:ok
end

rescue
e -> Logger.warning("Error in metrics callback: #{inspect(e)}")
end

defp compile_routes(paths) do
Expand Down
Loading

0 comments on commit 5ba16e7

Please sign in to comment.