Skip to content

Commit

Permalink
[RTC-543] Request routing first iteration (#212)
Browse files Browse the repository at this point in the history
* [RTC-543] Request routing first iteration
* Add integration tests, review fixes
* fix types
* fix more types
* Increase timeout
* Review fixes vol.2
* Review fixes vol.3
* Review fixes vol.4
  • Loading branch information
sgfn authored Jun 14, 2024
1 parent c8b4fdd commit 09dd6a1
Show file tree
Hide file tree
Showing 56 changed files with 2,089 additions and 1,421 deletions.
1 change: 1 addition & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ jobs:
- checkout
- run: mix deps.get
- run: mix coveralls.json --warnings-as-errors
- run: FJ_FEATURE_FLAG_REQUEST_ROUTING_ENABLED=true mix coveralls.json --warnings-as-errors
- codecov/upload

check_api_update:
Expand Down
8 changes: 6 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ ENV PATH="/root/.cargo/bin:${PATH}"
RUN mix local.hex --force && \
mix local.rebar --force

ENV MIX_ENV=prod
ARG MIX_ENV=prod
ENV MIX_ENV=$MIX_ENV

# The order of the following commands is important.
# It ensures that:
Expand Down Expand Up @@ -51,6 +52,9 @@ RUN mix release

FROM alpine:3.17 AS app

ARG MIX_ENV=prod
ENV MIX_ENV=$MIX_ENV

ARG FJ_GIT_COMMIT
ENV FJ_GIT_COMMIT=$FJ_GIT_COMMIT

Expand Down Expand Up @@ -122,7 +126,7 @@ RUN mkdir ${FJ_RESOURCES_BASE_PATH} && chown fishjam:fishjam ${FJ_RESOURCES_BASE
RUN mkdir ${FJ_RESOURCES_BASE_PATH}/file_component_sources \
&& chown fishjam:fishjam ${FJ_RESOURCES_BASE_PATH}/file_component_sources

COPY --from=build /app/_build/prod/rel/fishjam ./
COPY --from=build /app/_build/${MIX_ENV}/rel/fishjam ./

COPY docker-entrypoint.sh ./docker-entrypoint.sh
RUN chmod +x docker-entrypoint.sh
Expand Down
4 changes: 2 additions & 2 deletions config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ end

config :fishjam,
feature_flags: [
custom_room_name_disabled:
ConfigReader.read_boolean("FJ_FEATURE_FLAG_CUSTOM_ROOM_NAME_DISABLED") || false
request_routing_enabled?:
ConfigReader.read_boolean("FJ_FEATURE_FLAG_REQUEST_ROUTING_ENABLED") || false
]

check_origin = ConfigReader.read_check_origin("FJ_CHECK_ORIGIN")
Expand Down
3 changes: 2 additions & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ config :fishjam,
port: 4002,
server_api_token: "development",
webrtc_metrics_scrape_interval: 50,
room_metrics_scrape_interval: 1
room_metrics_scrape_interval: 1,
test_routes: true

config :fishjam, FishjamWeb.Endpoint,
http: [ip: {127, 0, 0, 1}, port: 4002],
Expand Down
35 changes: 24 additions & 11 deletions docker-compose-dns.yaml
Original file line number Diff line number Diff line change
@@ -1,31 +1,44 @@
version: "3"

x-fishjam-template: &fishjam-template
build: .
build:
context: .
args:
- MIX_ENV=test
environment: &fishjam-environment
FJ_SERVER_API_TOKEN: "development"
FJ_DIST_ENABLED: "true"
FJ_DIST_MODE: "name"
FJ_DIST_STRATEGY_NAME: "DNS"
MIX_ENV: "test"
FJ_FEATURE_FLAG_REQUEST_ROUTING_ENABLED: "true"
FJ_COMPONENTS_USED: "hls file"
volumes:
- ./test/fixtures:/app/fishjam_resources/file_component_sources
restart: on-failure
healthcheck:
interval: 1s
timeout: 8s
retries: 16

services:
test:
image: membraneframeworklabs/docker_membrane:latest
build:
context: .
target: build
args:
- MIX_ENV=test_cluster
command:
- sh
- -c
- |
cd app/
mix deps.get
MIX_ENV=test_cluster mix test --only cluster
- mix test --only cluster
volumes:
- .:/app
- /app/_build
- /app/deps
- ./test:/app/test
depends_on:
- app1
- app2
app1:
condition: service_healthy
app2:
condition: service_healthy

app1:
<<: *fishjam-template
Expand Down
35 changes: 24 additions & 11 deletions docker-compose-epmd.yaml
Original file line number Diff line number Diff line change
@@ -1,30 +1,43 @@
version: "3"

x-fishjam-template: &fishjam-template
build: .
build:
context: .
args:
- MIX_ENV=test
environment: &fishjam-environment
FJ_SERVER_API_TOKEN: "development"
FJ_DIST_ENABLED: "true"
FJ_DIST_NODES: "app@app1 app@app2"
MIX_ENV: "test"
FJ_FEATURE_FLAG_REQUEST_ROUTING_ENABLED: "true"
FJ_COMPONENTS_USED: "hls file"
volumes:
- ./test/fixtures:/app/fishjam_resources/file_component_sources
restart: on-failure
healthcheck:
interval: 1s
timeout: 8s
retries: 16

services:
test:
image: membraneframeworklabs/docker_membrane:latest
build:
context: .
target: build
args:
- MIX_ENV=test_cluster
command:
- sh
- -c
- |
cd app/
mix deps.get
MIX_ENV=test_cluster mix test --only cluster
- mix test --only cluster
volumes:
- .:/app
- /app/_build
- /app/deps
- ./test:/app/test
depends_on:
- app1
- app2
app1:
condition: service_healthy
app2:
condition: service_healthy

app1:
<<: *fishjam-template
Expand Down
3 changes: 2 additions & 1 deletion lib/fishjam/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ defmodule Fishjam.Application do
Logger.info("Distribution config: #{inspect(Keyword.delete(dist_config, :cookie))}")
Logger.info("WebRTC config: #{inspect(webrtc_config)}")
Logger.info("Allowed components: #{inspect(components_used)}")
Logger.info(Fishjam.FeatureFlags.info())

children =
[
Expand All @@ -38,7 +39,7 @@ defmodule Fishjam.Application do
{Fishjam.MetricsScraper, scrape_interval},
FishjamWeb.Endpoint,
# Start the RoomService
Fishjam.RoomService,
Fishjam.Local.RoomService,
# Start the ResourceManager, responsible for cleaning old recordings
{Fishjam.ResourceManager, @resource_manager_opts},
Fishjam.WebhookNotifier,
Expand Down
58 changes: 58 additions & 0 deletions lib/fishjam/cluster/room.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
defmodule Fishjam.Cluster.Room do
@moduledoc """
Module responsible for managing a room present anywhere in the cluster.
"""

@behaviour Fishjam.Room

alias Fishjam.{Room, RPCClient}

@local_module Fishjam.Local.Room

@impl true
def add_peer(room_id, peer_type, options \\ %{}),
do: route_request(room_id, :add_peer, [room_id, peer_type, options])

@impl true
def set_peer_connected(room_id, peer_id),
do: route_request(room_id, :set_peer_connected, [room_id, peer_id])

@impl true
def get_peer_connection_status(room_id, peer_id),
do: route_request(room_id, :get_peer_connection_status, [room_id, peer_id])

@impl true
def remove_peer(room_id, peer_id),
do: route_request(room_id, :remove_peer, [room_id, peer_id])

@impl true
def add_component(room_id, component_type, options \\ %{}),
do: route_request(room_id, :add_component, [room_id, component_type, options])

@impl true
def remove_component(room_id, component_id),
do: route_request(room_id, :remove_component, [room_id, component_id])

@impl true
def subscribe(room_id, component_id, origins),
do: route_request(room_id, :subscribe, [room_id, component_id, origins])

@impl true
def dial(room_id, component_id, phone_number),
do: route_request(room_id, :dial, [room_id, component_id, phone_number])

@impl true
def end_call(room_id, component_id),
do: route_request(room_id, :end_call, [room_id, component_id])

@impl true
def receive_media_event(room_id, peer_id, event),
do: route_request(room_id, :receive_media_event, [room_id, peer_id, event])

defp route_request(room_id, fun, args) do
with {:ok, node} <- Room.ID.determine_node(room_id),
{:ok, result} <- RPCClient.call(node, @local_module, fun, args) do
result
end
end
end
69 changes: 69 additions & 0 deletions lib/fishjam/cluster/room_service.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
defmodule Fishjam.Cluster.RoomService do
@moduledoc """
Module responsible for managing rooms in the entire cluster.
"""

@behaviour Fishjam.RoomService

require Logger

alias Fishjam.{FeatureFlags, Room, RPCClient}

@local_module Fishjam.Local.RoomService

@impl true
def create_room(config) do
node_resources = RPCClient.multicall(@local_module, :get_resource_usage)
min_node = find_best_node(node_resources)

if length(node_resources) > 1,
do: Logger.info("Node with least used resources is #{inspect(min_node)}")

with {:ok, result} <- RPCClient.call(min_node, @local_module, :create_room, [config]) do
result
end
end

@impl true
def list_rooms() do
if FeatureFlags.request_routing_enabled?() do
@local_module |> RPCClient.multicall(:list_rooms) |> List.flatten()
else
apply(@local_module, :list_rooms, [])
end
end

@impl true
def find_room(room_id), do: route_request(room_id, :find_room, [room_id])

@impl true
def get_room(room_id), do: route_request(room_id, :get_room, [room_id])

@impl true
def delete_room(room_id), do: route_request(room_id, :delete_room, [room_id])

defp find_best_node(node_resources) do
%{node: min_node} =
Enum.min(
node_resources,
fn
%{forwarded_tracks_number: forwarded_tracks, rooms_number: rooms_num1},
%{forwarded_tracks_number: forwarded_tracks, rooms_number: rooms_num2} ->
rooms_num1 < rooms_num2

%{forwarded_tracks_number: forwarded_tracks1},
%{forwarded_tracks_number: forwarded_tracks2} ->
forwarded_tracks1 < forwarded_tracks2
end
)

min_node
end

defp route_request(room_id, fun, args) do
with {:ok, node} <- Room.ID.determine_node(room_id),
{:ok, result} <- RPCClient.call(node, @local_module, fun, args) do
result
end
end
end
2 changes: 1 addition & 1 deletion lib/fishjam/component.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ defmodule Fishjam.Component do

use Bunch.Access

alias Fishjam.Room
alias Fishjam.Local.Room
alias Fishjam.Component.{File, HLS, Recording, RTSP, SIP}
alias Fishjam.Track

Expand Down
7 changes: 4 additions & 3 deletions lib/fishjam/component/hls.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ defmodule Fishjam.Component.HLS do
LLStorage,
Manager,
Recording,
RequestHandler,
Storage
}

alias Fishjam.Room
alias Fishjam.Component.HLS.Local.RequestHandler

alias Fishjam.Room.ID

alias FishjamWeb.ApiSpec.Component.HLS.Options

Expand Down Expand Up @@ -91,7 +92,7 @@ defmodule Fishjam.Component.HLS do
if low_latency, do: remove_request_handler(room_id)
end

@spec output_dir(Room.id(), persistent: boolean()) :: String.t()
@spec output_dir(ID.id(), persistent: boolean()) :: String.t()
def output_dir(room_id, persistent: true) do
Recording.directory(room_id)
end
Expand Down
39 changes: 39 additions & 0 deletions lib/fishjam/component/hls/cluster/request_handler.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
defmodule Fishjam.Component.HLS.Cluster.RequestHandler do
@moduledoc """
Module responsible for handling HLS Retrieve Content requests in the cluster.
"""

@behaviour Fishjam.Component.HLS.RequestHandler

alias Fishjam.{Room, RPCClient}

@local_module Fishjam.Component.HLS.Local.RequestHandler

@impl true
def handle_file_request(room_id, filename),
do: route_request(room_id, :handle_file_request, [room_id, filename])

@impl true
def handle_partial_request(room_id, filename),
do: route_request(room_id, :handle_partial_request, [room_id, filename])

@impl true
def handle_manifest_request(room_id, partial),
do: route_request(room_id, :handle_manifest_request, [room_id, partial])

@impl true
def handle_delta_manifest_request(room_id, partial),
do: route_request(room_id, :handle_delta_manifest_request, [room_id, partial])

defp route_request(room_id, fun, args) do
with {:ok, node} <- Room.ID.determine_node(room_id),
{:here?, false} <- {:here?, node == Node.self()},
# FIXME: Fishjam addresses could easily be cached
{:ok, address} <- RPCClient.call(node, Fishjam, :address) do
{:redirect, address}
else
{:here?, true} -> apply(@local_module, fun, args)
{:error, _reason} = error -> error
end
end
end
Loading

0 comments on commit 09dd6a1

Please sign in to comment.