Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug in handling actions returned from handle_tick. Deprecate handle_spec_started. #708

Merged
merged 26 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
3457016
wip
FelonEkonom Jan 17, 2024
69d8a03
Write tests wip
FelonEkonom Jan 18, 2024
ebf574c
Write tests wip
FelonEkonom Jan 18, 2024
ff72c7b
Revert "Fix timer running late (#685)"
FelonEkonom Jan 23, 2024
ed06281
Fix actions handling order bug related to Pipeline.handle_playing
FelonEkonom Jan 29, 2024
0e40638
Add assertion on value passed with :setup action
FelonEkonom Jan 29, 2024
990172b
WIP Fix bug in executing handle_buffer while handling actions from pr…
FelonEkonom Jan 29, 2024
d6a0169
Fix tests wip
FelonEkonom Jan 30, 2024
9c567e6
Fix CI
FelonEkonom Jan 31, 2024
9efb412
Update changelog
FelonEkonom Jan 31, 2024
e323c36
Merge remote-tracking branch 'origin/master' into fix-handling-action…
FelonEkonom Jan 31, 2024
47e97c9
Stopt calling handle_spec_started in between handling actions
FelonEkonom Feb 2, 2024
bbec5ef
Make demands test more strict
FelonEkonom Feb 2, 2024
e6b8acb
Add dots to changelog
FelonEkonom Feb 6, 2024
7ac52c7
Merge remote-tracking branch 'origin/master' into fix-handling-action…
FelonEkonom Feb 6, 2024
989764f
Fix double tick bug
FelonEkonom Feb 8, 2024
1076db2
wip
FelonEkonom Feb 12, 2024
37bc27c
Deprecate handle_spec_started/3
FelonEkonom Feb 13, 2024
f4402ad
Remove unused aliases
FelonEkonom Feb 13, 2024
2fa7aba
Remove unnecessary warning
FelonEkonom Feb 13, 2024
c498f75
Bump version to 1.0.1
FelonEkonom Feb 13, 2024
6acbe47
Merge remote-tracking branch 'origin/master' into fix-handling-action…
FelonEkonom Feb 13, 2024
9438d45
Remove leftovers
FelonEkonom Feb 13, 2024
eda3f64
Fix docs
FelonEkonom Feb 13, 2024
2f78509
Implement suggestions from CR, bump version to 1.1.0-rc
FelonEkonom Feb 14, 2024
02db64b
Merge remote-tracking branch 'origin/master' into fix-handling-action…
FelonEkonom Feb 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

## 1.0.1
* Specify the order in which state fields will be printed in the error logs. [#614](https://github.com/membraneframework/membrane_core/pull/614)
* Fix clock selection [#626](https://github.com/membraneframework/membrane_core/pull/626)
* Log messages in the default handle_info implementation [#680](https://github.com/membraneframework/membrane_core/pull/680)
* Fix typespecs in Membrane.UtilitySupervisor [#681](https://github.com/membraneframework/membrane_core/pull/681)
* Improve callback return types and group actions types [#702](https://github.com/membraneframework/membrane_core/pull/702)
* Fix clock selection. [#626](https://github.com/membraneframework/membrane_core/pull/626)
* Log messages in the default handle_info implementation. [#680](https://github.com/membraneframework/membrane_core/pull/680)
* Fix typespecs in Membrane.UtilitySupervisor. [#681](https://github.com/membraneframework/membrane_core/pull/681)
* Improve callback return types and group actions types. [#702](https://github.com/membraneframework/membrane_core/pull/702)
* Fix bug in the order of handling actions from callbacks. [#708](https://github.com/membraneframework/membrane_core/pull/708)
* Deprecate `handle_spec_started/3` callback in Bins and Pipelines. [#708](https://github.com/membraneframework/membrane_core/pull/708)

## 1.0.0
* Introduce `:remove_link` action in pipelines and bins.
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ Apart from plugins, Membrane has stream formats, which live in `membrane_X_forma
The API for creating pipelines (and custom elements too) is provided by [membrane_core](https://github.com/membraneframework/membrane_core). To install it, add the following line to your `deps` in `mix.exs` and run `mix deps.get`

```elixir
{:membrane_core, "~> 1.0.0"}
{:membrane_core, "~> 1.0"}
```

**Standalone libraries**
Expand Down
9 changes: 4 additions & 5 deletions lib/membrane/bin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,11 @@ defmodule Membrane.Bin do
) :: callback_return

@doc """
This callback is deprecated since v1.1.0-rc0

Callback invoked when children of `Membrane.ChildrenSpec` are started.

By default, it does nothing.
It is invoked, only if pipeline module contains its definition. Otherwise, nothing happens.
"""
@callback handle_spec_started(
children :: [Child.name()],
Expand Down Expand Up @@ -309,6 +311,7 @@ defmodule Membrane.Bin do
alias unquote(__MODULE__)
@behaviour unquote(__MODULE__)
@before_compile unquote(__MODULE__)
@after_compile {Membrane.Core.Parent, :check_deprecated_callbacks}

unquote(bring_spec)
unquote(bring_pad)
Expand Down Expand Up @@ -354,9 +357,6 @@ defmodule Membrane.Bin do
{[], state}
end

@impl true
def handle_spec_started(new_children, _ctx, state), do: {[], state}

@impl true
def handle_element_start_of_stream(_element, _pad, _ctx, state), do: {[], state}

Expand All @@ -381,7 +381,6 @@ defmodule Membrane.Bin do
handle_setup: 2,
handle_playing: 2,
handle_info: 3,
handle_spec_started: 3,
handle_element_start_of_stream: 4,
handle_element_end_of_stream: 4,
handle_child_notification: 4,
Expand Down
13 changes: 13 additions & 0 deletions lib/membrane/core/callback_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Membrane.Core.CallbackHandler do
use Bunch

alias Membrane.CallbackError
alias Membrane.Core.Component

require Membrane.Logger

Expand Down Expand Up @@ -191,6 +192,13 @@ defmodule Membrane.Core.CallbackHandler do
was_handling_action? = state.handling_action?
state = %{state | handling_action?: true}

# Updating :supplying_demand? flag value here is a temporal fix.
# Setting it to `true` while handling actions causes postponing calls
# of handle_redemand/2 and supply_demand/2 until a moment, when all
# actions returned from the callback are handled
was_supplying_demand? = Map.get(state, :supplying_demand?, false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add a comment that it's a temporary fix

state = if Component.is_element?(state), do: %{state | supplying_demand?: true}, else: state

state =
Enum.reduce(actions, state, fn action, state ->
try do
Expand All @@ -210,6 +218,11 @@ defmodule Membrane.Core.CallbackHandler do
do: state,
else: %{state | handling_action?: false}

state =
if Component.is_element?(state) and not was_supplying_demand?,
do: %{state | supplying_demand?: false},
else: state

handler_module.handle_end_of_actions(state)
end
end
13 changes: 0 additions & 13 deletions lib/membrane/core/element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -256,20 +256,7 @@ defmodule Membrane.Core.Element do
end

defp do_handle_info(Message.new(:timer_tick, timer_id), state) do
# Guarding the `TimerController.handle_tick/2` invocation is
# required since there might be a case in which `handle_tick`
# callback's implementation returns demand action.
# In this scenario, without this guard, there would a possibility that
# the `handle_buffer` would be called immediately, returning
# some action that would affect the timer and the original state
# of the timer, set with actions returned from `handle_tick`,
# would be overwritten with that action.
#
# For more information see: https://github.com/membraneframework/membrane_core/issues/670
state = %{state | supplying_demand?: true}
state = TimerController.handle_tick(timer_id, state)
state = %{state | supplying_demand?: false}
state = Membrane.Core.Element.DemandHandler.handle_delayed_demands(state)
{:noreply, state}
end

Expand Down
67 changes: 35 additions & 32 deletions lib/membrane/core/element/action_handler.ex
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,47 @@ defmodule Membrane.Core.Element.ActionHandler do
require Membrane.Logger

@impl CallbackHandler
def transform_actions(actions, callback, _handler_params, state) do
def transform_actions(actions, _callback, _handler_params, state) do
actions = join_buffers(actions)
ensure_nothing_after_redemand(actions, callback, state)
{actions, state}
end

defguardp is_demand_size(size) when is_integer(size) or is_function(size)

@impl CallbackHandler
def handle_end_of_actions(state) when not state.handling_action? do
Enum.reduce(state.pads_to_snapshot, state, &DemandController.snapshot_atomic_demand/2)
|> Map.put(:pads_to_snapshot, MapSet.new())
def handle_end_of_actions(state) do
# Fixed order of handling demand of manual and auto pads would lead to
# favoring manual pads over auto pads (or vice versa), especially after
# introducting auto flow queues.
manual_demands_first? = Enum.random([1, 2]) == 1

state =
if manual_demands_first?,
do: maybe_handle_delayed_demands(state),
else: state

state = maybe_handle_pads_to_snapshot(state)

state =
if manual_demands_first?,
do: state,
else: maybe_handle_delayed_demands(state)

state
end

@impl CallbackHandler
def handle_end_of_actions(state), do: state
defp maybe_handle_delayed_demands(state) do
with %{supplying_demand?: false} <- state do
DemandHandler.handle_delayed_demands(state)
end
end

defp maybe_handle_pads_to_snapshot(state) do
with %{handling_action?: false} <- state do
Enum.reduce(state.pads_to_snapshot, state, &DemandController.snapshot_atomic_demand/2)
|> Map.put(:pads_to_snapshot, MapSet.new())
end
end

@impl CallbackHandler
def handle_action({action, _}, :handle_init, _params, _state)
Expand Down Expand Up @@ -280,30 +305,6 @@ defmodule Membrane.Core.Element.ActionHandler do
)
end

defp ensure_nothing_after_redemand(actions, callback, state) do
{redemands, actions_after_redemands} =
actions
|> Enum.drop_while(fn
{:redemand, _args} -> false
_other_action -> true
end)
|> Enum.split_while(fn
{:redemand, _args} -> true
_other_action -> false
end)

case {redemands, actions_after_redemands} do
{_redemands, []} ->
:ok

{[redemand | _redemands], _actions_after_redemands} ->
raise ActionError,
reason: :actions_after_redemand,
action: redemand,
callback: {state.module, callback}
end
end

@spec send_buffer(Pad.ref(), [Buffer.t()] | Buffer.t(), State.t()) :: State.t()
defp send_buffer(_pad_ref, [], state) do
state
Expand Down Expand Up @@ -467,7 +468,9 @@ defmodule Membrane.Core.Element.ActionHandler do
defp handle_outgoing_event(pad_ref, %Events.EndOfStream{}, state) do
with %{direction: :output, end_of_stream?: false} <- PadModel.get_data!(state, pad_ref) do
state = PadController.remove_pad_associations(pad_ref, state)
PadModel.set_data!(state, pad_ref, :end_of_stream?, true)

DemandHandler.remove_pad_from_delayed_demands(pad_ref, state)
|> PadModel.set_data!(pad_ref, :end_of_stream?, true)
else
%{direction: :input} ->
raise PadDirectionError, action: "end of stream", direction: :input, pad: pad_ref
Expand Down
9 changes: 9 additions & 0 deletions lib/membrane/core/element/demand_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,15 @@ defmodule Membrane.Core.Element.DemandHandler do
end
end

@spec remove_pad_from_delayed_demands(Pad.ref(), State.t()) :: State.t()
def remove_pad_from_delayed_demands(pad_ref, state) do
Map.update!(state, :delayed_demands, fn delayed_demands_set ->
delayed_demands_set
|> MapSet.delete({pad_ref, :supply})
|> MapSet.delete({pad_ref, :redemand})
end)
end

@spec handle_input_queue_output(
Pad.ref(),
[InputQueue.output_value()],
Expand Down
2 changes: 2 additions & 0 deletions lib/membrane/core/element/event_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ defmodule Membrane.Core.Element.EventController do
alias Membrane.Core.Element.{
ActionHandler,
CallbackContext,
DemandHandler,
InputQueue,
PadController,
PlaybackQueue,
Expand Down Expand Up @@ -98,6 +99,7 @@ defmodule Membrane.Core.Element.EventController do
Membrane.Logger.debug("Received end of stream on pad #{inspect(pad_ref)}")

state = PadModel.set_data!(state, pad_ref, :end_of_stream?, true)
state = DemandHandler.remove_pad_from_delayed_demands(pad_ref, state)
state = PadController.remove_pad_associations(pad_ref, state)

%{
Expand Down
18 changes: 15 additions & 3 deletions lib/membrane/core/lifecycle_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@ defmodule Membrane.Core.LifecycleController do
def handle_setup_operation(operation, state) do
:ok = assert_operation_allowed!(operation, state.setup_incomplete?)

case operation do
:incomplete ->
cond do
operation == :incomplete ->
Membrane.Logger.debug("Component deferred initialization")
%{state | setup_incomplete?: true}

:complete ->
Component.is_pipeline?(state) ->
# complete_setup/1 will be called in Membrane.Core.Pipeline.ActionHandler.handle_end_of_actions/1
%{state | awaiting_setup_completition?: true}

Component.is_child?(state) ->
complete_setup(state)
end
end
Expand Down Expand Up @@ -52,5 +56,13 @@ defmodule Membrane.Core.LifecycleController do
"""
end

defp assert_operation_allowed!(operation, _status)
when operation not in [:incomplete, :complete] do
raise SetupError, """
Action {:setup, #{inspect(operation)}} was returned, but second element in the tuple must
be :complete or :incomplete
"""
end

defp assert_operation_allowed!(_operation, _status), do: :ok
end
17 changes: 17 additions & 0 deletions lib/membrane/core/parent.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,21 @@ defmodule Membrane.Core.Parent do
@moduledoc false

@type state :: Membrane.Core.Bin.State.t() | Membrane.Core.Pipeline.State.t()

@spec check_deprecated_callbacks(Macro.Env.t(), binary) :: :ok
def check_deprecated_callbacks(env, _bytecode) do
modules_whitelist = [Membrane.Testing.Pipeline]

if env.module not in modules_whitelist and
Module.defines?(env.module, {:handle_spec_started, 3}, :def) do
warn_message = """
Callback handle_spec_started/3 has been deprecated since \
:membrane_core v1.1.0-rc0, but it is implemented in #{inspect(env.module)}
"""

IO.warn(warn_message, [])
end

:ok
end
end
21 changes: 13 additions & 8 deletions lib/membrane/core/parent/child_life_controller/startup_utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,20 @@ defmodule Membrane.Core.Parent.ChildLifeController.StartupUtils do

@spec exec_handle_spec_started([Membrane.Child.name()], Parent.state()) :: Parent.state()
def exec_handle_spec_started(children_names, state) do
action_handler = Component.action_handler(state)

CallbackHandler.exec_and_handle_callback(
:handle_spec_started,
action_handler,
%{context: &Component.context_from_state/1},
[children_names],
# handle_spec_started/3 callback is deprecated, so we don't require its implementation
if function_exported?(state.module, :handle_spec_started, 3) do
action_handler = Component.action_handler(state)

CallbackHandler.exec_and_handle_callback(
:handle_spec_started,
action_handler,
%{context: &Component.context_from_state/1},
[children_names],
state
)
else
state
)
end
end

@spec check_if_children_names_and_children_groups_ids_are_unique(
Expand Down
10 changes: 10 additions & 0 deletions lib/membrane/core/pipeline/action_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ defmodule Membrane.Core.Pipeline.ActionHandler do
alias Membrane.Core.Parent.LifecycleController
alias Membrane.Core.Pipeline.State

require Membrane.Logger

@impl CallbackHandler
def handle_action({:spec, args}, _cb, _params, %State{terminating?: true}) do
raise Membrane.ParentError,
Expand Down Expand Up @@ -103,4 +105,12 @@ defmodule Membrane.Core.Pipeline.ActionHandler do
def handle_action(action, _callback, _params, _state) do
raise ActionError, action: action, reason: {:unknown_action, Membrane.Pipeline.Action}
end

@impl CallbackHandler
def handle_end_of_actions(state) do
with %{awaiting_setup_completition?: true} <- state do
%{state | awaiting_setup_completition?: false}
|> Membrane.Core.LifecycleController.complete_setup()
end
end
end
6 changes: 4 additions & 2 deletions lib/membrane/core/pipeline/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ defmodule Membrane.Core.Pipeline.State do
setup_incomplete?: boolean(),
handling_action?: boolean(),
stalker: Membrane.Core.Stalker.t(),
subprocess_supervisor: pid()
subprocess_supervisor: pid(),
awaiting_setup_completition?: boolean()
}

# READ THIS BEFORE ADDING NEW FIELD!!!
Expand All @@ -58,5 +59,6 @@ defmodule Membrane.Core.Pipeline.State do
handling_action?: false,
stalker: nil,
resource_guard: nil,
subprocess_supervisor: nil
subprocess_supervisor: nil,
awaiting_setup_completition?: false
end
Loading