Skip to content

Commit

Permalink
[RTC-456] Add peerless room purge (#150)
Browse files Browse the repository at this point in the history
* [RTC-385] Add peerless room purge
* Add change from fix_metrics, update api spec
* Add log and guard
* Remove guard
* Update controversial test name
* Mock timers in tests
* Restore Elixir to 1.14, remove calls to Map.intersect
* lint
  • Loading branch information
sgfn authored Feb 15, 2024
1 parent 6b61fc4 commit 090a338
Show file tree
Hide file tree
Showing 17 changed files with 202 additions and 73 deletions.
13 changes: 6 additions & 7 deletions lib/jellyfish/component/file.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@ defmodule Jellyfish.Component.File do
{:ok, framerate} <- validate_framerate(valid_opts.framerate),
{:ok, track_config} <-
get_track_config(path, framerate) do
endpoint_spec =
%FileEndpoint{
rtc_engine: engine,
file_path: path,
track_config: track_config,
payload_type: track_config.fmtp.pt
}
endpoint_spec = %FileEndpoint{
rtc_engine: engine,
file_path: path,
track_config: track_config,
payload_type: track_config.fmtp.pt
}

properties = valid_opts |> Map.from_struct()

Expand Down
60 changes: 51 additions & 9 deletions lib/jellyfish/room.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ defmodule Jellyfish.Room do
:engine_pid,
:network_options
]
defstruct @enforce_keys ++ [components: %{}, peers: %{}]
defstruct @enforce_keys ++ [components: %{}, peers: %{}, last_peer_left: 0]

@type id :: String.t()

Expand All @@ -46,14 +46,17 @@ defmodule Jellyfish.Room do
* `components` - map of components
* `peers` - map of peers
* `engine` - pid of engine
* `network_options` - network options
* `last_peer_left` - arbitrary timestamp with latest occurence of the room becoming peerless
"""
@type t :: %__MODULE__{
id: id(),
config: Config.t(),
components: %{Component.id() => Component.t()},
peers: %{Peer.id() => Peer.t()},
engine_pid: pid(),
network_options: map()
network_options: map(),
last_peer_left: integer()
}

defguardp endpoint_exists?(state, endpoint_id)
Expand Down Expand Up @@ -227,7 +230,9 @@ defmodule Jellyfish.Room do
def handle_call({:remove_peer, peer_id}, _from, state) do
{reply, state} =
if Map.has_key?(state.peers, peer_id) do
state = handle_remove_peer(peer_id, state, :peer_removed)
state =
handle_remove_peer(peer_id, state, :peer_removed)
|> maybe_schedule_peerless_purge()

{:ok, state}
else
Expand Down Expand Up @@ -434,6 +439,7 @@ defmodule Jellyfish.Room do
def handle_info(%EndpointRemoved{endpoint_id: endpoint_id}, state) do
{_endpoint, state} = pop_in(state, [:peers, endpoint_id])
Logger.info("Peer #{endpoint_id} removed")
state = maybe_schedule_peerless_purge(state)
{:noreply, state}
end

Expand Down Expand Up @@ -546,6 +552,19 @@ defmodule Jellyfish.Room do
{:noreply, state}
end

@impl true
def handle_info(:peerless_purge, state) do
if peerless_long_enough?(state) do
Logger.info(
"Removing room because it was peerless for #{state.config.peerless_purge_timeout} seconds"
)

{:stop, :normal, state}
else
{:noreply, state}
end
end

@impl true
def handle_info(info, state) do
Logger.warning("Received unexpected info: #{inspect(info)}")
Expand Down Expand Up @@ -600,14 +619,37 @@ defmodule Jellyfish.Room do
TURNManager.ensure_tcp_turn_launched(turn_options, port: tcp_turn_port)
end

%__MODULE__{
id: id,
config: config,
engine_pid: pid,
network_options: [turn_options: turn_options]
}
state =
%__MODULE__{
id: id,
config: config,
engine_pid: pid,
network_options: [turn_options: turn_options]
}
|> maybe_schedule_peerless_purge()

state
end

defp maybe_schedule_peerless_purge(%{config: %{peerless_purge_timeout: nil}} = state), do: state

defp maybe_schedule_peerless_purge(%{config: config, peers: peers} = state)
when map_size(peers) == 0 do
last_peer_left = Klotho.monotonic_time(:millisecond)
Klotho.send_after(config.peerless_purge_timeout * 1000, self(), :peerless_purge)

%{state | last_peer_left: last_peer_left}
end

defp maybe_schedule_peerless_purge(state), do: state

defp peerless_long_enough?(%{config: config, peers: peers, last_peer_left: last_peer_left})
when map_size(peers) == 0 do
Klotho.monotonic_time(:millisecond) >= last_peer_left + config.peerless_purge_timeout * 1000
end

defp peerless_long_enough?(_state), do: false

defp handle_remove_component(component_id, state, reason) do
{component, state} = pop_in(state, [:components, component_id])
:ok = Engine.remove_endpoint(state.engine_pid, component_id)
Expand Down
19 changes: 13 additions & 6 deletions lib/jellyfish/room/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,22 @@ defmodule Jellyfish.Room.Config do
@moduledoc """
Room configuration
"""
@enforce_keys [:room_id, :max_peers, :video_codec, :webhook_url]
@enforce_keys [:room_id, :max_peers, :video_codec, :webhook_url, :peerless_purge_timeout]

defstruct @enforce_keys

@type room_id :: String.t() | nil
@type max_peers :: non_neg_integer() | nil
@type video_codec :: :h264 | :vp8 | nil
@type webhook_url :: String.t()
@type peerless_purge_timeout :: pos_integer() | nil

@type t :: %__MODULE__{
room_id: room_id(),
max_peers: max_peers(),
video_codec: video_codec(),
webhook_url: URI.t()
webhook_url: URI.t(),
peerless_purge_timeout: peerless_purge_timeout()
}

@spec from_params(map()) :: {:ok, __MODULE__.t()} | {:error, atom()}
Expand All @@ -24,20 +26,21 @@ defmodule Jellyfish.Room.Config do
max_peers = Map.get(params, "maxPeers")
video_codec = Map.get(params, "videoCodec")
webhook_url = Map.get(params, "webhookUrl")
peerless_purge_timeout = Map.get(params, "peerlessPurgeTimeout")

with {:ok, room_id} <- parse_room_id(room_id),
:ok <- validate_max_peers(max_peers),
{:ok, video_codec} <- codec_to_atom(video_codec),
:ok <- validate_webhook_url(webhook_url) do
:ok <- validate_webhook_url(webhook_url),
:ok <- validate_purge_timeout(peerless_purge_timeout) do
{:ok,
%__MODULE__{
room_id: room_id,
max_peers: max_peers,
video_codec: video_codec,
webhook_url: webhook_url
webhook_url: webhook_url,
peerless_purge_timeout: peerless_purge_timeout
}}
else
error -> error
end
end

Expand Down Expand Up @@ -67,4 +70,8 @@ defmodule Jellyfish.Room.Config do
defp codec_to_atom("vp8"), do: {:ok, :vp8}
defp codec_to_atom(nil), do: {:ok, nil}
defp codec_to_atom(_codec), do: {:error, :invalid_video_codec}

defp validate_purge_timeout(nil), do: :ok
defp validate_purge_timeout(timeout) when is_integer(timeout) and timeout > 0, do: :ok
defp validate_purge_timeout(_timeout), do: {:error, :invalid_peerless_purge_timeout}
end
8 changes: 8 additions & 0 deletions lib/jellyfish_web/api_spec/room.ex
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ defmodule JellyfishWeb.ApiSpec.Room do
type: :string,
example: "https://backend.address.com/jellyfish-notifications-endpoint",
nullable: true
},
peerlessPurgeTimeout: %Schema{
description:
"Duration (in seconds) after which the room will be removed if no peers are connected. If not provided, this feature is disabled.",
type: :integer,
minimum: 1,
example: 60,
nullable: true
}
}
})
Expand Down
6 changes: 6 additions & 0 deletions lib/jellyfish_web/controllers/room_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ defmodule JellyfishWeb.RoomController do
webhook_url = Map.get(params, "webhookUrl")
{:error, :bad_request, "Expected webhookUrl to be valid URL, got: #{webhook_url}"}

{:error, :invalid_peerless_purge_timeout} ->
timeout = Map.get(params, "peerlessPurgeTimeout")

{:error, :bad_request,
"Expected peerlessPurgeTimeout to be a positive integer, got: #{timeout}"}

{:error, :room_already_exists} ->
room_id = Map.get(params, "roomId")
{:error, :bad_request, "Cannot add room with id \"#{room_id}\" - room already exists"}
Expand Down
3 changes: 3 additions & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ defmodule Jellyfish.MixProject do
{:libcluster, "~> 3.3"},
{:httpoison, "~> 2.0"},

# Mocking timer in tests
{:klotho, "~> 0.1.0"},

# Test deps
{:websockex, "~> 0.4.3", only: [:test, :ci], runtime: false},
{:excoveralls, "~> 0.15.0", only: :test, runtime: false},
Expand Down
1 change: 1 addition & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"httpoison": {:hex, :httpoison, "2.2.1", "87b7ed6d95db0389f7df02779644171d7319d319178f6680438167d7b69b1f3d", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "51364e6d2f429d80e14fe4b5f8e39719cacd03eb3f9a9286e61e216feac2d2df"},
"idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"},
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
"klotho": {:hex, :klotho, "0.1.2", "3b1f1a569703e0cdce1ba964f41711351a7b06846c38fcbd601faa407e712bf2", [:mix], [], "hexpm", "a6a387982753582e30a5246fe9561721c6b9a4dd27678296cf2cd44faa6f3733"},
"libcluster": {:hex, :libcluster, "3.3.3", "a4f17721a19004cfc4467268e17cff8b1f951befe428975dd4f6f7b84d927fe0", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "7c0a2275a0bb83c07acd17dab3c3bfb4897b145106750eeccc62d302e3bdfee5"},
"membrane_aac_fdk_plugin": {:hex, :membrane_aac_fdk_plugin, "0.18.5", "74a0fd9b121a9f18e038573931fa2952b95a977a4e982a844734129e977e0fb9", [:mix], [{:bunch, "~> 1.4", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.3", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.16.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:membrane_raw_audio_format, "~> 0.12.0", [hex: :membrane_raw_audio_format, repo: "hexpm", optional: false]}, {:unifex, "~> 1.1", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "560ea01c1fc707770bcdfb30d47be5f77be3e4d86a872bc1e34261a134bf6f98"},
"membrane_aac_format": {:hex, :membrane_aac_format, "0.8.0", "515631eabd6e584e0e9af2cea80471fee6246484dbbefc4726c1d93ece8e0838", [:mix], [{:bimap, "~> 1.1", [hex: :bimap, repo: "hexpm", optional: false]}], "hexpm", "a30176a94491033ed32be45e51d509fc70a5ee6e751f12fd6c0d60bd637013f6"},
Expand Down
6 changes: 6 additions & 0 deletions openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,12 @@ components:
minimum: 1
nullable: true
type: integer
peerlessPurgeTimeout:
description: Duration (in seconds) after which the room will be removed if no peers are connected. If not provided, this feature is disabled.
example: 60
minimum: 1
nullable: true
type: integer
roomId:
description: Custom id used for identifying room within Jellyfish. Must be unique across all rooms. If not provided, random UUID is generated.
nullable: true
Expand Down
74 changes: 74 additions & 0 deletions test/jellyfish/room_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
defmodule Jellyfish.RoomTest do
use ExUnit.Case, async: true

alias Jellyfish.{Peer, Room}

@purge_timeout_s 60
@purge_timeout_ms @purge_timeout_s * 1000
@message_timeout_ms 20

setup do
Klotho.Mock.reset()
Klotho.Mock.freeze()
end

describe "peerless purge" do
test "happens if peers never joined" do
{:ok, config} = Room.Config.from_params(%{"peerlessPurgeTimeout" => @purge_timeout_s})
{:ok, pid, _id} = Room.start(config)
Process.monitor(pid)

Klotho.Mock.warp_by(@purge_timeout_ms + 10)

assert_receive {:DOWN, _ref, :process, ^pid, :normal}, @message_timeout_ms
end

test "happens if peers joined, then left" do
{:ok, config} = Room.Config.from_params(%{"peerlessPurgeTimeout" => @purge_timeout_s})
{:ok, pid, id} = Room.start(config)
Process.monitor(pid)

{:ok, peer} = Room.add_peer(id, Peer.WebRTC)

Klotho.Mock.warp_by(@purge_timeout_ms + 10)
refute_receive {:DOWN, _ref, :process, ^pid, :normal}, @message_timeout_ms

:ok = Room.remove_peer(id, peer.id)

Klotho.Mock.warp_by(@purge_timeout_ms + 10)
assert_receive {:DOWN, _ref, :process, ^pid, :normal}, @message_timeout_ms
end

test "does not happen if peers rejoined quickly" do
{:ok, config} = Room.Config.from_params(%{"peerlessPurgeTimeout" => @purge_timeout_s})
{:ok, pid, id} = Room.start(config)
Process.monitor(pid)

{:ok, peer} = Room.add_peer(id, Peer.WebRTC)

Klotho.Mock.warp_by(@purge_timeout_ms + 10)
refute_receive {:DOWN, _ref, :process, ^pid, :normal}, @message_timeout_ms

:ok = Room.remove_peer(id, peer.id)

Klotho.Mock.warp_by(@purge_timeout_ms |> div(2))
refute_receive {:DOWN, _ref, :process, ^pid, :normal}, @message_timeout_ms

{:ok, _peer} = Room.add_peer(id, Peer.WebRTC)
Klotho.Mock.warp_by(@purge_timeout_ms + 10)
refute_receive {:DOWN, _ref, :process, ^pid, :normal}, @message_timeout_ms

:ok = GenServer.stop(pid)
end

test "does not happen when not configured" do
{:ok, config} = Room.Config.from_params(%{})
{:ok, pid, _id} = Room.start(config)

Klotho.Mock.warp_by(@purge_timeout_ms + 10)
refute_receive {:DOWN, _ref, :process, ^pid, :normal}, @message_timeout_ms

:ok = GenServer.stop(pid)
end
end
end
12 changes: 4 additions & 8 deletions test/jellyfish_web/controllers/component/file_component_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ defmodule JellyfishWeb.Component.FileComponentTest do
"filePath" => @video_source
}
}
} =
model_response(conn, :created, "ComponentDetailsResponse")
} = model_response(conn, :created, "ComponentDetailsResponse")

assert_component_created(conn, room_id, id, "file")

Expand Down Expand Up @@ -92,8 +91,7 @@ defmodule JellyfishWeb.Component.FileComponentTest do
"framerate" => 60
}
}
} =
model_response(conn, :created, "ComponentDetailsResponse")
} = model_response(conn, :created, "ComponentDetailsResponse")

assert_component_created(conn, room_id, id, "file")
end
Expand All @@ -115,8 +113,7 @@ defmodule JellyfishWeb.Component.FileComponentTest do
"filePath" => @audio_source
}
}
} =
model_response(conn, :created, "ComponentDetailsResponse")
} = model_response(conn, :created, "ComponentDetailsResponse")

assert_component_created(conn, room_id, id, "file")

Expand Down Expand Up @@ -164,8 +161,7 @@ defmodule JellyfishWeb.Component.FileComponentTest do
"filePath" => ^video_relative_path
}
}
} =
model_response(conn, :created, "ComponentDetailsResponse")
} = model_response(conn, :created, "ComponentDetailsResponse")

assert_component_created(conn, room_id, id, "file")
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ defmodule JellyfishWeb.Component.HlsComponentTest do
"type" => "hls",
"properties" => @hls_properties
}
} =
model_response(conn, :created, "ComponentDetailsResponse")
} = model_response(conn, :created, "ComponentDetailsResponse")

assert_component_created(conn, room_id, id, "hls")

Expand Down Expand Up @@ -93,8 +92,7 @@ defmodule JellyfishWeb.Component.HlsComponentTest do
"type" => "hls",
"properties" => @hls_properties
}
} =
model_response(conn, :created, "ComponentDetailsResponse")
} = model_response(conn, :created, "ComponentDetailsResponse")

parent = self()
ref = make_ref()
Expand Down
Loading

0 comments on commit 090a338

Please sign in to comment.