Skip to content

Commit

Permalink
Fix peer being removed when it disconnects (#162)
Browse files Browse the repository at this point in the history
* Fix peer removal

* Purge when peers disconnected

* Add comment
  • Loading branch information
roznawsk authored Mar 5, 2024
1 parent 6de98af commit f9ff413
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 90 deletions.
41 changes: 25 additions & 16 deletions lib/jellyfish/room.ex
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ defmodule Jellyfish.Room do
try do
GenServer.call(registry_room_id, :get_state)
catch
:exit, {:noproc, {GenServer, :call, [^registry_room_id, :get_state, _timeout]}} ->
:exit, {reason, {GenServer, :call, [^registry_room_id, :get_state, _timeout]}}
when reason in [:noproc, :normal] ->
Logger.warning(
"Cannot get state of #{inspect(room_id)}, the room's process doesn't exist anymore"
)
Expand Down Expand Up @@ -187,7 +188,7 @@ defmodule Jellyfish.Room do
)

with {:ok, peer} <- Peer.new(peer_type, options) do
state = put_in(state, [:peers, peer.id], peer)
state = put_in(state, [:peers, peer.id], peer) |> maybe_schedule_peerless_purge()

Logger.info("Added peer #{inspect(peer.id)}")

Expand Down Expand Up @@ -433,7 +434,9 @@ defmodule Jellyfish.Room do
:ok = Engine.remove_endpoint(state.engine_pid, peer_id)
Event.broadcast_server_notification({:peer_disconnected, state.id, peer_id})
peer = %{peer | status: :disconnected, socket_pid: nil}

put_in(state, [:peers, peer_id], peer)
|> maybe_schedule_peerless_purge()
end

{:noreply, state}
Expand Down Expand Up @@ -475,9 +478,8 @@ defmodule Jellyfish.Room do
@impl true
def handle_info(%EndpointRemoved{endpoint_id: endpoint_id}, state)
when is_map_key(state.peers, endpoint_id) do
{_endpoint, state} = pop_in(state, [:peers, endpoint_id])
Logger.info("Peer #{endpoint_id} removed")
state = maybe_schedule_peerless_purge(state)
# The peer has been either removed, crashed or disconnected
# The changes in state are applied in appropriate callbacks
{:noreply, state}
end

Expand Down Expand Up @@ -674,22 +676,29 @@ defmodule Jellyfish.Room do

defp maybe_schedule_peerless_purge(%{config: %{peerless_purge_timeout: nil}} = state), do: state

defp maybe_schedule_peerless_purge(%{config: config, peers: peers} = state)
when map_size(peers) == 0 do
last_peer_left = Klotho.monotonic_time(:millisecond)
Klotho.send_after(config.peerless_purge_timeout * 1000, self(), :peerless_purge)
defp maybe_schedule_peerless_purge(%{config: config, peers: peers} = state) do
if all_peers_disconnected?(peers) do
last_peer_left = Klotho.monotonic_time(:millisecond)

%{state | last_peer_left: last_peer_left}
end
Klotho.send_after(config.peerless_purge_timeout * 1000, self(), :peerless_purge)

defp maybe_schedule_peerless_purge(state), do: state
%{state | last_peer_left: last_peer_left}
else
state
end
end

defp peerless_long_enough?(%{config: config, peers: peers, last_peer_left: last_peer_left})
when map_size(peers) == 0 do
Klotho.monotonic_time(:millisecond) >= last_peer_left + config.peerless_purge_timeout * 1000
defp peerless_long_enough?(%{config: config, peers: peers, last_peer_left: last_peer_left}) do
if all_peers_disconnected?(peers) do
Klotho.monotonic_time(:millisecond) >= last_peer_left + config.peerless_purge_timeout * 1000
else
false
end
end

defp peerless_long_enough?(_state), do: false
defp all_peers_disconnected?(peers) do
peers |> Map.values() |> Enum.all?(&(&1.status == :disconnected))
end

defp handle_remove_component(component_id, state, reason) do
{component, state} = pop_in(state, [:components, component_id])
Expand Down
74 changes: 0 additions & 74 deletions test/jellyfish/room_test.exs

This file was deleted.

162 changes: 162 additions & 0 deletions test/jellyfish_web/controllers/room_controller_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,53 @@ defmodule JellyfishWeb.RoomControllerTest do
use JellyfishWeb.ConnCase, async: false

import OpenApiSpex.TestAssertions

alias __MODULE__.Endpoint

alias Jellyfish.PeerMessage.Authenticated
alias Jellyfish.RoomService
alias JellyfishWeb.{PeerSocket, WS}

@schema JellyfishWeb.ApiSpec.spec()

@purge_timeout_s 3
@purge_timeout_ms @purge_timeout_s * 1000

@port 5910
@path "ws://127.0.0.1:#{@port}/socket/peer/websocket"
@auth_response %Authenticated{}

Application.put_env(
:jellyfish,
Endpoint,
https: false,
http: [port: @port],
server: true
)

defmodule Endpoint do
use Phoenix.Endpoint, otp_app: :jellyfish

alias JellyfishWeb.PeerSocket

socket "/socket/peer", PeerSocket,
websocket: true,
longpoll: false
end

setup_all do
delete_all_rooms()
assert {:ok, _pid} = Endpoint.start_link()
:ok
end

setup %{conn: conn} do
server_api_token = Application.fetch_env!(:jellyfish, :server_api_token)
conn = put_req_header(conn, "authorization", "Bearer " <> server_api_token)

Klotho.Mock.reset()
Klotho.Mock.freeze()

on_exit(fn -> delete_all_rooms() end)

[conn: conn]
Expand Down Expand Up @@ -142,6 +177,125 @@ defmodule JellyfishWeb.RoomControllerTest do
end
end

describe "peerless purge" do
setup %{conn: conn} do
conn = post(conn, ~p"/room", peerlessPurgeTimeout: @purge_timeout_s)
assert %{"id" => id} = json_response(conn, :created)["data"]["room"]
%{conn: conn, id: id}
end

test "happens if peers never joined", %{conn: conn, id: id} do
conn = post(conn, ~p"/room/#{id}/peer", type: "webrtc")
assert response(conn, :created)

Klotho.Mock.warp_by(@purge_timeout_ms + 10)

conn = get(conn, ~p"/room")
assert Enum.empty?(json_response(conn, :ok)["data"])
end

test "happens if peer joined, then removed", %{conn: conn, id: id} do
conn = post(conn, ~p"/room/#{id}/peer", type: "webrtc")

assert %{"token" => token, "peer" => %{"id" => peer_id}} =
json_response(conn, :created)["data"]

connect_peer(token)

Klotho.Mock.warp_by(@purge_timeout_ms + 10)
conn = get(conn, ~p"/room/#{id}")
assert response(conn, :ok)

conn = delete(conn, ~p"/room/#{id}/peer/#{peer_id}")
assert response(conn, :no_content)
Klotho.Mock.warp_by(@purge_timeout_ms + 10)

conn = get(conn, ~p"/room")
assert Enum.empty?(json_response(conn, :ok)["data"])
end

test "happens if peer joined, then disconnected", %{conn: conn, id: id} do
conn = post(conn, ~p"/room/#{id}/peer", type: "webrtc")
assert %{"token" => token} = json_response(conn, :created)["data"]

ws = connect_peer(token)

Klotho.Mock.warp_by(@purge_timeout_ms + 10)
conn = get(conn, ~p"/room/#{id}")
assert response(conn, :ok)

GenServer.stop(ws)
Process.sleep(10)

conn = get(conn, ~p"/room/#{id}")

assert %{"status" => "disconnected"} =
json_response(conn, :ok)["data"]["peers"] |> List.first()

Klotho.Mock.warp_by(@purge_timeout_ms + 10)

conn = get(conn, ~p"/room")
assert Enum.empty?(json_response(conn, :ok)["data"])
end

test "does not happen if peers rejoined quickly", %{conn: conn, id: id} do
conn = post(conn, ~p"/room/#{id}/peer", type: "webrtc")
assert %{"token" => token} = json_response(conn, :created)["data"]

ws = connect_peer(token)

Klotho.Mock.warp_by(@purge_timeout_ms + 10)
conn = get(conn, ~p"/room/#{id}")
assert response(conn, :ok)

GenServer.stop(ws)
Process.sleep(10)
conn = get(conn, ~p"/room/#{id}")

assert %{"status" => "disconnected"} =
json_response(conn, :ok)["data"]["peers"] |> List.first()

Klotho.Mock.warp_by(@purge_timeout_ms |> div(2))
conn = get(conn, ~p"/room/#{id}")
assert response(conn, :ok)

connect_peer(token)
Klotho.Mock.warp_by(@purge_timeout_ms + 10)
conn = get(conn, ~p"/room/#{id}")
assert response(conn, :ok)
end

test "timeout is reset if another peer is created and removed", %{conn: conn, id: id} do
conn = post(conn, ~p"/room/#{id}/peer", type: "webrtc")
assert %{"peer" => %{"id" => peer_id}} = json_response(conn, :created)["data"]

conn = delete(conn, ~p"/room/#{id}/peer/#{peer_id}")
assert response(conn, :no_content)

Klotho.Mock.warp_by(@purge_timeout_ms |> div(2))

conn = post(conn, ~p"/room/#{id}/peer", type: "webrtc")
assert response(conn, :created)

Klotho.Mock.warp_by(@purge_timeout_ms |> div(2) |> Kernel.+(10))
conn = get(conn, ~p"/room/#{id}")
assert response(conn, :ok)

Klotho.Mock.warp_by(@purge_timeout_ms |> div(2))
conn = get(conn, ~p"/room")
assert Enum.empty?(json_response(conn, :ok)["data"])
end

test "does not happen when not configured", %{conn: conn} do
conn = post(conn, ~p"/room")
assert %{"id" => id} = json_response(conn, :created)["data"]["room"]

Klotho.Mock.warp_by(@purge_timeout_ms + 10)
conn = get(conn, ~p"/room/#{id}")
assert response(conn, :ok)
end
end

describe "delete room" do
setup [:create_room]

Expand Down Expand Up @@ -237,4 +391,12 @@ defmodule JellyfishWeb.RoomControllerTest do
HTTPoison.delete("http://127.0.0.1:4002/room/#{room["id"]}", headers)
end)
end

def connect_peer(token) do
{:ok, ws} = WS.start_link(@path, :peer)
WS.send_auth_request(ws, token)
assert_receive @auth_response

ws
end
end
3 changes: 3 additions & 0 deletions test/jellyfish_web/integration/server_notification_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,9 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do
%PeerDisconnected{room_id: ^room_id, peer_id: ^peer_id}},
2_000

state = :sys.get_state(room_pid)
assert Map.has_key?(state.peers, peer_id)

delete(conn, ~p"/room/#{room_id}")
end

Expand Down

0 comments on commit f9ff413

Please sign in to comment.