Skip to content

Commit

Permalink
add [:phoenix, :socket_drain] telemetry event
Browse files Browse the repository at this point in the history
and also use it for logging drain events
  • Loading branch information
SteffenDE committed Feb 4, 2025
1 parent 7252caa commit 20c9278
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 13 deletions.
1 change: 1 addition & 0 deletions installer/templates/phx_web/telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ defmodule <%= @web_namespace %>.Telemetry do
summary("phoenix.socket_connected.duration",
unit: {:native, :millisecond}
),
sum("phoenix.socket_drain.count"),
summary("phoenix.channel_joined.duration",
unit: {:native, :millisecond}
),
Expand Down
2 changes: 2 additions & 0 deletions lib/phoenix/endpoint.ex
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,8 @@ defmodule Phoenix.Endpoint do
batch to terminate. Defaults to 2000ms.
* `:shutdown` - The maximum amount of time in milliseconds allowed
to drain all batches. Defaults to 30000ms.
* `:log` - the log level for drain actions. Defaults the `:log` option
passed to `use Phoenix.Socket` or `:info`. Set it to `false` to disable logging.
For example, if you have 150k connections, the default values will
split them into 15 batches of 10k connections. Each batch takes
Expand Down
22 changes: 22 additions & 0 deletions lib/phoenix/logger.ex
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ defmodule Phoenix.Logger do
* Metadata: `%{endpoint: atom, transport: atom, params: term, connect_info: map, vsn: binary, user_socket: atom, result: :ok | :error, serializer: atom, log: Logger.level | false}`
* Disable logging: `use Phoenix.Socket, log: false` or `socket "/foo", MySocket, websocket: [log: false]` in your endpoint
* `[:phoenix, :socket_drain]` - dispatched by `Phoenix.Socket` when using the `:drainer` option
* Measurement: `%{count: integer, total: integer, index: integer, rounds: integer}`
* Metadata: `%{endpoint: atom, socket: atom, intervasl: integer, log: Logger.level | false}`
* Disable logging: `use Phoenix.Socket, log: false` in your endpoint or pass `:log` option in the `:drainer` option
* `[:phoenix, :channel_joined]` - dispatched at the end of a channel join
* Measurement: `%{duration: native_time}`
* Metadata: `%{result: :ok | :error, params: term, socket: Phoenix.Socket.t}`
Expand Down Expand Up @@ -134,6 +139,7 @@ defmodule Phoenix.Logger do
[:phoenix, :router_dispatch, :start] => &__MODULE__.phoenix_router_dispatch_start/4,
[:phoenix, :error_rendered] => &__MODULE__.phoenix_error_rendered/4,
[:phoenix, :socket_connected] => &__MODULE__.phoenix_socket_connected/4,
[:phoenix, :socket_drain] => &__MODULE__.phoenix_socket_drain/4,
[:phoenix, :channel_joined] => &__MODULE__.phoenix_channel_joined/4,
[:phoenix, :channel_handled_in] => &__MODULE__.phoenix_channel_handled_in/4
}
Expand Down Expand Up @@ -339,6 +345,22 @@ defmodule Phoenix.Logger do
defp connect_result(:ok), do: "CONNECTED TO "
defp connect_result(:error), do: "REFUSED CONNECTION TO "

@doc false
def phoenix_socket_drain(_, _, %{log: false}, _), do: :ok

def phoenix_socket_drain(_, %{count: count, total: total, index: index, rounds: rounds}, %{log: level} = meta, _) do
Logger.log(level, fn ->
%{socket: socket, interval: interval} = meta

[
"DRAINING #{count} of #{total} total connection(s) for socket ",
inspect(socket),
" every #{interval}ms - ",
"round #{index} of #{rounds}"
]
end)
end

## Event: [:phoenix, :channel_joined]

@doc false
Expand Down
6 changes: 5 additions & 1 deletion lib/phoenix/socket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -464,10 +464,14 @@ defmodule Phoenix.Socket do
case drainer do
{module, function, arguments} ->
apply(module, function, arguments)

_ ->
drainer
end
{Phoenix.Socket.PoolDrainer, {endpoint, handler, drainer}}

opts = Keyword.merge(opts, drainer: drainer)

{Phoenix.Socket.PoolDrainer, {endpoint, handler, opts}}
else
:ignore
end
Expand Down
28 changes: 19 additions & 9 deletions lib/phoenix/socket/pool_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ defmodule Phoenix.Socket.PoolDrainer do
%{
id: {:terminator, name},
start: {__MODULE__, :start_link, [tuple]},
shutdown: Keyword.get(opts, :shutdown, 30_000)
shutdown: Keyword.get(opts[:drainer], :shutdown, 30_000)
}
end

Expand All @@ -86,13 +86,14 @@ defmodule Phoenix.Socket.PoolDrainer do
@impl true
def init({endpoint, name, opts}) do
Process.flag(:trap_exit, true)
size = Keyword.get(opts, :batch_size, 10_000)
interval = Keyword.get(opts, :batch_interval, 2_000)
{:ok, {endpoint, name, size, interval}}
size = Keyword.get(opts[:drainer], :batch_size, 10_000)
interval = Keyword.get(opts[:drainer], :batch_interval, 2_000)
log_level = Keyword.get(opts[:drainer], :log, opts[:log] || :info)
{:ok, {endpoint, name, size, interval, log_level}}
end

@impl true
def terminate(_reason, {endpoint, name, size, interval}) do
def terminate(_reason, {endpoint, name, size, interval, log_level}) do
ets = endpoint.config({:socket, name})
partitions = :ets.lookup_element(ets, :partitions, 2)

Expand All @@ -109,12 +110,21 @@ defmodule Phoenix.Socket.PoolDrainer do

rounds = div(total, size) + 1

if total != 0 do
Logger.info("Shutting down #{total} sockets in #{rounds} rounds of #{interval}ms")
end

for {pids, index} <-
collection |> Stream.concat() |> Stream.chunk_every(size) |> Stream.with_index(1) do
count = if index == rounds, do: length(pids), else: size

:telemetry.execute(
[:phoenix, :socket_drain],
%{count: count, total: total, index: index, rounds: rounds},
%{
endpoint: endpoint,
socket: name,
interval: interval,
log: log_level
}
)

spawn(fn ->
for pid <- pids do
send(pid, %Phoenix.Socket.Broadcast{event: "phx_drain"})
Expand Down
8 changes: 5 additions & 3 deletions test/phoenix/socket/socket_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ defmodule Phoenix.SocketTest do
test "merges keyword lists" do
socket = %Phoenix.Socket{}
socket = assign(socket, %{foo: :bar, abc: :def})
socket = assign(socket, [foo: :baz])
socket = assign(socket, foo: :baz)
assert socket.assigns[:foo] == :baz
assert socket.assigns[:abc] == :def
end
Expand Down Expand Up @@ -109,7 +109,8 @@ defmodule Phoenix.SocketTest do
]

assert DrainerSpecSocket.drainer_spec(drainer: drainer_spec, endpoint: Endpoint) ==
{Phoenix.Socket.PoolDrainer, {Endpoint, DrainerSpecSocket, drainer_spec}}
{Phoenix.Socket.PoolDrainer,
{Endpoint, DrainerSpecSocket, [endpoint: Endpoint, drainer: drainer_spec]}}
end

test "loads dynamic drainer config" do
Expand All @@ -119,7 +120,8 @@ defmodule Phoenix.SocketTest do
drainer: {DrainerSpecSocket, :dynamic_drainer_config, []},
endpoint: Endpoint
) ==
{Phoenix.Socket.PoolDrainer, {Endpoint, DrainerSpecSocket, drainer_spec}}
{Phoenix.Socket.PoolDrainer,
{Endpoint, DrainerSpecSocket, [endpoint: Endpoint, drainer: drainer_spec]}}
end

test "returns ignore if drainer is set to false" do
Expand Down

0 comments on commit 20c9278

Please sign in to comment.