Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix clock selection #626

Merged
merged 2 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## 1.0.1
* Specify the order in which state fields will be printed in the error logs. [#614](https://github.com/membraneframework/membrane_core/pull/614)
* Fix clock selection [#626](https://github.com/membraneframework/membrane_core/pull/626)

## 1.0.0
* Introduce `:remove_link` action in pipelines and bins.
Expand Down
2 changes: 1 addition & 1 deletion lib/membrane/core/bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ defmodule Membrane.Core.Bin do
parent_clock: options.parent_clock,
timers: %{},
clock: clock,
clock_provider: %{clock: nil, provider: nil, choice: :auto},
clock_provider: %{clock: nil, provider: nil},
clock_proxy: clock_proxy,
# This is a sync for siblings. This is not yet allowed.
stream_sync: Membrane.Sync.no_sync(),
Expand Down
3 changes: 1 addition & 2 deletions lib/membrane/core/bin/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ defmodule Membrane.Core.Bin.State do
clock_proxy: Clock.t(),
clock_provider: %{
clock: Clock.t() | nil,
provider: Child.name() | nil,
choice: :auto | :manual
provider: Child.name() | nil
}
},
children_log_metadata: Keyword.t(),
Expand Down
31 changes: 8 additions & 23 deletions lib/membrane/core/parent/clock_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ defmodule Membrane.Core.Parent.ClockHandler do
Core.Parent.state()
) ::
Core.Parent.state() | no_return
def choose_clock(_children, nil, state) do
state
end

def choose_clock(children, provider, state) do
%{synchronization: synchronization} = state

Expand All @@ -20,17 +24,13 @@ defmodule Membrane.Core.Parent.ClockHandler do
end

components = components ++ children

cond do
provider != nil -> set_clock_provider(get_clock_from_provider(components, provider), state)
synchronization.clock_provider.choice == :manual -> state
true -> choose_clock_provider(components) |> set_clock_provider(state)
end
clock = get_clock_from_provider(components, provider)
set_clock_provider(clock, state)
Comment on lines +27 to +28
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
clock = get_clock_from_provider(components, provider)
set_clock_provider(clock, state)
get_clock_from_provider(components, provider)
|> set_clock_provider(clock, state)

end

@spec reset_clock(Core.Parent.state()) :: Core.Parent.state()
def reset_clock(state),
do: set_clock_provider(%{clock: nil, provider: nil, choice: :auto}, state)
do: set_clock_provider(%{clock: nil, provider: nil}, state)

defp set_clock_provider(clock_provider, state) do
Clock.proxy_for(state.synchronization.clock_proxy, clock_provider.clock)
Expand All @@ -48,22 +48,7 @@ defmodule Membrane.Core.Parent.ClockHandler do
raise ParentError, "#{inspect(provider)} is not a clock provider"

%{clock: clock} ->
%{clock: clock, provider: provider, choice: :manual}
end
end

defp choose_clock_provider(components) do
case components |> Enum.filter(& &1.clock) do
[] ->
%{clock: nil, provider: nil, choice: :auto}

[%{name: name, clock: clock}] ->
%{clock: clock, provider: name, choice: :auto}

components ->
raise ParentError, """
Cannot choose clock for the parent, as multiple components provide one, namely: #{Enum.map_join(components, ", ", & &1.name)}. Please explicitly select the clock by setting `ChildrenSpec.clock_provider` parameter.
"""
%{clock: clock, provider: provider}
end
end
end
2 changes: 1 addition & 1 deletion lib/membrane/core/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ defmodule Membrane.Core.Pipeline do
module: params.module,
synchronization: %{
clock_proxy: clock_proxy,
clock_provider: %{clock: nil, provider: nil, choice: :auto},
clock_provider: %{clock: nil, provider: nil},
timers: %{}
},
subprocess_supervisor: subprocess_supervisor,
Expand Down
3 changes: 1 addition & 2 deletions lib/membrane/core/pipeline/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ defmodule Membrane.Core.Pipeline.State do
timers: %{Timer.id() => Timer.t()},
clock_provider: %{
clock: Membrane.Clock.t() | nil,
provider: Child.name() | nil,
choice: :auto | :manual
provider: Child.name() | nil
},
clock_proxy: Membrane.Clock.t()
},
Expand Down
37 changes: 6 additions & 31 deletions test/membrane/clock_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ defmodule Membrane.ClockTest do
synchronization: %{clock_proxy: result_proxy, clock_provider: result_provider}
} = ClockHandler.choose_clock(children, :el2, dummy_pipeline_state)

assert %{choice: :manual, clock: ^clock, provider: :el2} = result_provider
assert %{clock: ^clock, provider: :el2} = result_provider
assert ^proxy_clock = result_proxy
end

Expand All @@ -188,15 +188,15 @@ defmodule Membrane.ClockTest do
dummy_pipeline_state =
struct(State,
module: __MODULE__,
synchronization: %{clock_proxy: proxy_clock, clock_provider: %{choice: :auto}}
synchronization: %{clock_proxy: proxy_clock, clock_provider: %{}}
)

assert_raise Membrane.ParentError, ~r/.*el1.*clock provider/, fn ->
ClockHandler.choose_clock(children, :el1, dummy_pipeline_state)
end
end

test "when provider is not specified and there are multiple clock providers among children" do
test "when provider is not specified" do
{:ok, clock} = Clock.start_link()
{:ok, clock2} = Clock.start_link()
{:ok, proxy_clock} = Clock.start_link(proxy: true)
Expand All @@ -210,36 +210,11 @@ defmodule Membrane.ClockTest do
dummy_pipeline_state =
struct(State,
module: __MODULE__,
synchronization: %{clock_proxy: proxy_clock, clock_provider: %{choice: :auto}}
synchronization: %{clock_proxy: proxy_clock, clock_provider: %{}}
)

assert_raise Membrane.ParentError, ~r/.*multiple components.*/, fn ->
ClockHandler.choose_clock(children, nil, dummy_pipeline_state)
end
end

test "when there is no clock provider and there is exactly one clock provider among children" do
{:ok, clock} = Clock.start_link()
{:ok, proxy_clock} = Clock.start_link(proxy: true)

children = [
%ChildEntry{name: :el1, pid: :c.pid(0, 1, 0)},
%ChildEntry{name: :el2, clock: clock, pid: :c.pid(0, 2, 0)},
%ChildEntry{name: :el3, pid: :c.pid(0, 3, 0)}
]

dummy_pipeline_state =
struct(State,
module: __MODULE__,
synchronization: %{clock_proxy: proxy_clock, clock_provider: %{choice: :auto}}
)

assert %State{
synchronization: %{clock_proxy: result_proxy, clock_provider: result_provider}
} = ClockHandler.choose_clock(children, nil, dummy_pipeline_state)

assert %{choice: :auto, clock: ^clock, provider: :el2} = result_provider
assert ^proxy_clock = result_proxy
assert dummy_pipeline_state ==
ClockHandler.choose_clock(children, nil, dummy_pipeline_state)
end
end

Expand Down
4 changes: 2 additions & 2 deletions test/membrane/integration/bin_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -241,15 +241,15 @@ defmodule Membrane.Core.BinTest do
%Membrane.Core.Pipeline.State{synchronization: %{clock_provider: pipeline_clock_provider}} =
state = :sys.get_state(pid)

assert %{choice: :manual, clock: clock1, provider: :bin_child} = pipeline_clock_provider
assert %{clock: clock1, provider: :bin_child} = pipeline_clock_provider
refute is_nil(clock1)

%{pid: bin_pid} = state.children[:bin_child]

%Membrane.Core.Bin.State{synchronization: %{clock_provider: bin_clock_provider}} =
:sys.get_state(bin_pid)

assert %{choice: :manual, clock: clock2, provider: :element_child} = bin_clock_provider
assert %{clock: clock2, provider: :element_child} = bin_clock_provider
refute is_nil(clock2)

assert proxy_for?(clock1, clock2)
Expand Down
9 changes: 5 additions & 4 deletions test/membrane/integration/sync_test/ticking_pace.exs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@ defmodule Membrane.Integration.SyncTest.TickingPace do
actual_report_interval = 100
reported_interval = 300

links = [
spec = {
child(:source, %Sync.Source{
tick_interval: tick_interval |> Time.milliseconds(),
test_process: self()
})
|> child(:sink, Sync.Sink)
]
|> child(:sink, Sync.Sink),
clock_provider: :sink
}

pipeline = Testing.Pipeline.start_link_supervised!(spec: links)
pipeline = Testing.Pipeline.start_link_supervised!(spec: spec)

%{synchronization: %{clock_provider: %{clock: original_clock, provider: :sink}}} =
:sys.get_state(pipeline)
Expand Down
Loading