Skip to content

Commit

Permalink
RTC-377 SIP Component (#147)
Browse files Browse the repository at this point in the history
* WiP

* WiP x2

* WIP x3

* Improve SIP configuration

* Generate new api.spec

* Fix dialyzer issues

* Rename SIP properties

* Little fixes

* Fixes after review

* Generate new OpenAPI spec

* Add default implementation of Jellyfish.Component behaviour

* Fix ApiSpec.Component.SIP

* Fix OpenAPISpex

* Update lib/jellyfish_web/router.ex

Co-authored-by: Jakub Pisarek <[email protected]>

* Add missing unauthorized response

* Fix SIP resource

---------

Co-authored-by: Jakub Pisarek <[email protected]>
  • Loading branch information
Rados13 and sgfn authored Feb 19, 2024
1 parent 090a338 commit 9dff586
Show file tree
Hide file tree
Showing 25 changed files with 930 additions and 102 deletions.
3 changes: 2 additions & 1 deletion config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ config :jellyfish,
metrics_ip: ConfigReader.read_ip("JF_METRICS_IP") || {127, 0, 0, 1},
metrics_port: ConfigReader.read_port("JF_METRICS_PORT") || 9568,
dist_config: ConfigReader.read_dist_config(),
webrtc_config: ConfigReader.read_webrtc_config()
webrtc_config: ConfigReader.read_webrtc_config(),
sip_config: ConfigReader.read_sip_config()

case System.get_env("JF_SERVER_API_TOKEN") do
nil when prod? == true ->
Expand Down
59 changes: 56 additions & 3 deletions lib/jellyfish/component.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ defmodule Jellyfish.Component do

use Bunch.Access

alias Jellyfish.Component.{File, HLS, RTSP}
alias Jellyfish.Room
alias Jellyfish.Component.{File, HLS, RTSP, SIP}
alias Jellyfish.Track

@enforce_keys [
Expand All @@ -22,8 +23,8 @@ defmodule Jellyfish.Component do
defstruct @enforce_keys ++ [tracks: %{}]

@type id :: String.t()
@type component :: HLS | RTSP | File
@type properties :: HLS.properties() | RTSP.properties() | File.properties()
@type component :: HLS | RTSP | File | SIP
@type properties :: HLS.properties() | RTSP.properties() | File.properties() | SIP.properties()

@typedoc """
This module contains:
Expand All @@ -40,12 +41,64 @@ defmodule Jellyfish.Component do
tracks: %{Track.id() => Track.t()}
}

@doc """
This callback is run after initialization of the component.
In it some additional work can be done, which can't be run inside Engine endpoint.
"""
@callback after_init(
room_state :: Room.t(),
component :: __MODULE__.t(),
component_options :: map()
) :: :ok

@doc """
This callback is run after scheduling removing of component.
In it some additional cleanup can be done.
"""
@callback on_remove(
room_state :: Room.t(),
component :: __MODULE__.t()
) :: :ok

defmacro __using__(_opts) do
quote location: :keep do
@behaviour Jellyfish.Component

@impl true
def after_init(_room_state, _component, _component_options), do: :ok

@impl true
def on_remove(_room_state, _component), do: :ok

defoverridable after_init: 3, on_remove: 2

def serialize_options(opts, opts_schema) do
with {:ok, valid_opts} <- OpenApiSpex.Cast.cast(opts_schema, opts) do
valid_opts =
valid_opts
|> Map.from_struct()
|> Map.new(fn {k, v} -> {underscore(k), serialize(v)} end)

{:ok, valid_opts}
end
end

defp serialize(v) when is_struct(v),
do: v |> Map.from_struct() |> Map.new(fn {k, v} -> {underscore(k), v} end)

defp serialize(v), do: v

defp underscore(k), do: k |> Atom.to_string() |> Macro.underscore() |> String.to_atom()
end
end

@spec parse_type(String.t()) :: {:ok, component()} | {:error, :invalid_type}
def parse_type(type) do
case type do
"hls" -> {:ok, HLS}
"rtsp" -> {:ok, RTSP}
"file" -> {:ok, File}
"sip" -> {:ok, SIP}
_other -> {:error, :invalid_type}
end
end
Expand Down
1 change: 1 addition & 0 deletions lib/jellyfish/component/file.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule Jellyfish.Component.File do
"""

@behaviour Jellyfish.Endpoint.Config
use Jellyfish.Component

alias ExSDP.Attribute.FMTP
alias Membrane.RTC.Engine.Endpoint.File, as: FileEndpoint
Expand Down
77 changes: 54 additions & 23 deletions lib/jellyfish/component/hls.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,17 @@ defmodule Jellyfish.Component.HLS do
"""

@behaviour Jellyfish.Endpoint.Config
use Jellyfish.Component

alias Jellyfish.Component.HLS.{
EtsHelper,
LLStorage,
Manager,
Recording,
RequestHandler,
Storage
}

alias Jellyfish.Component.HLS.{LLStorage, Recording, Storage}
alias Jellyfish.Room

alias JellyfishWeb.ApiSpec.Component.HLS.Options
Expand All @@ -25,11 +34,14 @@ defmodule Jellyfish.Component.HLS do

@impl true
def config(options) do
with {:ok, valid_opts} <- serialize_options(options) do
hls_config = create_hls_config(options.room_id, valid_opts)
options = Map.delete(options, "s3")

with {:ok, serialized_opts} <- serialize_options(options, Options.schema()),
result_opts <- Map.update!(serialized_opts, :subscribe_mode, &String.to_atom/1) do
hls_config = create_hls_config(options.room_id, result_opts)

properties =
valid_opts
result_opts
|> Map.put(:playable, false)
|> Enum.into(%{})

Expand All @@ -51,7 +63,7 @@ defmodule Jellyfish.Component.HLS do
}
},
hls_config: hls_config,
subscribe_mode: valid_opts.subscribe_mode
subscribe_mode: result_opts.subscribe_mode
},
properties: properties
}}
Expand All @@ -60,6 +72,25 @@ defmodule Jellyfish.Component.HLS do
end
end

@impl true
def after_init(room_state, component, options) do
on_hls_startup(room_state.id, component.properties)

spawn_hls_manager(options)
:ok
end

@impl true
def on_remove(room_state, component) do
room_id = room_state.id

%{low_latency: low_latency} = component.properties

EtsHelper.delete_hls_folder_path(room_id)

if low_latency, do: remove_request_handler(room_id)
end

@spec output_dir(Room.id(), persistent: boolean()) :: String.t()
def output_dir(room_id, persistent: true) do
Recording.directory(room_id)
Expand All @@ -70,20 +101,27 @@ defmodule Jellyfish.Component.HLS do
Path.join([base_path, "temporary_hls", "#{room_id}"])
end

def serialize_options(options) do
with {:ok, valid_opts} <- OpenApiSpex.Cast.cast(Options.schema(), options) do
valid_opts =
valid_opts
|> Map.from_struct()
|> Map.new(fn {k, v} -> {underscore(k), serialize(v)} end)
|> Map.update!(:subscribe_mode, &String.to_atom/1)
defp on_hls_startup(room_id, %{low_latency: low_latency, persistent: persistent}) do
room_id
|> output_dir(persistent: persistent)
|> then(&EtsHelper.add_hls_folder_path(room_id, &1))

{:ok, valid_opts}
else
{:error, _reason} = error -> error
end
if low_latency, do: spawn_request_handler(room_id)
end

defp spawn_hls_manager(%{engine_pid: engine_pid, room_id: room_id} = options) do
{:ok, hls_dir} = EtsHelper.get_hls_folder_path(room_id)
{:ok, valid_opts} = serialize_options(options, Options.schema())

{:ok, _pid} = Manager.start(room_id, engine_pid, hls_dir, valid_opts)
end

defp spawn_request_handler(room_id),
do: RequestHandler.start(room_id)

defp remove_request_handler(room_id),
do: RequestHandler.stop(room_id)

defp create_hls_config(
room_id,
%{
Expand Down Expand Up @@ -113,11 +151,4 @@ defmodule Jellyfish.Component.HLS do
defp setup_hls_storage(_room_id, low_latency: false) do
fn directory -> %Storage{directory: directory} end
end

defp underscore(k), do: k |> Atom.to_string() |> Macro.underscore() |> String.to_atom()

defp serialize(v) when is_struct(v),
do: v |> Map.from_struct() |> Map.new(fn {k, v} -> {underscore(k), v} end)

defp serialize(v), do: v
end
1 change: 1 addition & 0 deletions lib/jellyfish/component/hls/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ defmodule Jellyfish.Component.HLS.Manager do

defp remove_hls(hls_dir, room_id) do
File.rm_rf!(hls_dir)

Logger.info("Remove hls from a disk, room: #{room_id}")
end

Expand Down
11 changes: 4 additions & 7 deletions lib/jellyfish/component/rtsp.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ defmodule Jellyfish.Component.RTSP do
"""

@behaviour Jellyfish.Endpoint.Config
use Jellyfish.Component

alias Membrane.RTC.Engine.Endpoint.RTSP

Expand All @@ -21,18 +22,14 @@ defmodule Jellyfish.Component.RTSP do
def config(%{engine_pid: engine} = options) do
options = Map.drop(options, [:engine_pid, :room_id])

with {:ok, valid_opts} <- OpenApiSpex.cast_value(options, Options.schema()) do
with {:ok, serialized_opts} <- serialize_options(options, Options.schema()) do
endpoint_spec =
Map.from_struct(valid_opts)
# OpenApiSpex will remove invalid options, so the following conversion, while ugly, is memory-safe
|> Map.new(fn {k, v} ->
{Atom.to_string(k) |> Macro.underscore() |> String.to_atom(), v}
end)
serialized_opts
|> Map.put(:rtc_engine, engine)
|> Map.put(:max_reconnect_attempts, :infinity)
|> then(&struct(RTSP, &1))

properties = valid_opts |> Map.from_struct()
properties = serialized_opts

{:ok, %{endpoint: endpoint_spec, properties: properties}}
else
Expand Down
60 changes: 60 additions & 0 deletions lib/jellyfish/component/sip.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
defmodule Jellyfish.Component.SIP do
@moduledoc """
Module representing the SIP component.
"""

@behaviour Jellyfish.Endpoint.Config
use Jellyfish.Component

alias Membrane.RTC.Engine.Endpoint.SIP
alias Membrane.RTC.Engine.Endpoint.SIP.RegistrarCredentials

alias JellyfishWeb.ApiSpec.Component.SIP.Options

@type properties :: %{
registrar_credentials: %{
address: String.t(),
username: String.t(),
password: String.t()
}
}

@impl true
def config(%{engine_pid: engine} = options) do
sip_config = Application.fetch_env!(:jellyfish, :sip_config)

external_ip =
if sip_config[:sip_used?] do
Application.fetch_env!(:jellyfish, :sip_config)[:sip_external_ip]
else
raise """
SIP components can only be used if JF_SIP_USED environmental variable is set to \"true\"
"""
end

with {:ok, serialized_opts} <- serialize_options(options, Options.schema()) do
endpoint_spec = %SIP{
rtc_engine: engine,
external_ip: external_ip,
registrar_credentials: create_register_credentials(serialized_opts.registrar_credentials)
}

properties = serialized_opts

{:ok, %{endpoint: endpoint_spec, properties: properties}}
else
{:error, [%OpenApiSpex.Cast.Error{reason: :missing_field, name: name} | _rest]} ->
{:error, {:missing_parameter, name}}

{:error, _reason} = error ->
error
end
end

defp create_register_credentials(credentials) do
credentials
|> Map.to_list()
|> Keyword.new()
|> RegistrarCredentials.new()
end
end
32 changes: 28 additions & 4 deletions lib/jellyfish/config_reader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -98,19 +98,19 @@ defmodule Jellyfish.ConfigReader do
end

def read_webrtc_config() do
webrtc_used = read_boolean("JF_WEBRTC_USED")
webrtc_used? = read_boolean("JF_WEBRTC_USED")

if webrtc_used != false do
if webrtc_used? != false do
[
webrtc_used: true,
webrtc_used?: true,
turn_ip: read_ip("JF_WEBRTC_TURN_IP") || {127, 0, 0, 1},
turn_listen_ip: read_ip("JF_WEBRTC_TURN_LISTEN_IP") || {127, 0, 0, 1},
turn_port_range: read_port_range("JF_WEBRTC_TURN_PORT_RANGE") || {50_000, 59_999},
turn_tcp_port: read_port("JF_WEBRTC_TURN_TCP_PORT")
]
else
[
webrtc_used: false,
webrtc_used?: false,
turn_ip: nil,
turn_listen_ip: nil,
turn_port_range: nil,
Expand All @@ -119,6 +119,30 @@ defmodule Jellyfish.ConfigReader do
end
end

def read_sip_config() do
sip_used? = read_boolean("JF_SIP_USED")
sip_ip = System.get_env("JF_SIP_IP") || ""

cond do
sip_used? != true ->
[
sip_used?: false,
sip_external_ip: nil
]

ip_address?(sip_ip) ->
[
sip_used?: true,
sip_external_ip: sip_ip
]

true ->
raise """
JF_SIP_USED has been set to true but incorrect IP address was provided as `JF_SIP_IP`
"""
end
end

def read_dist_config() do
dist_enabled? = read_boolean("JF_DIST_ENABLED")
dist_strategy = System.get_env("JF_DIST_STRATEGY_NAME")
Expand Down
2 changes: 1 addition & 1 deletion lib/jellyfish/peer/webrtc.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ defmodule Jellyfish.Peer.WebRTC do

@impl true
def config(options) do
if not Application.fetch_env!(:jellyfish, :webrtc_config)[:webrtc_used] do
if not Application.fetch_env!(:jellyfish, :webrtc_config)[:webrtc_used?] do
raise(
"WebRTC peers can only be used if JF_WEBRTC_USED environmental variable is not set to \"false\""
)
Expand Down
Loading

0 comments on commit 9dff586

Please sign in to comment.