Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
FelonEkonom committed Nov 3, 2023
1 parent b66662d commit 93e78c5
Show file tree
Hide file tree
Showing 9 changed files with 196 additions and 140 deletions.
2 changes: 1 addition & 1 deletion .github/actions/add_pr_to_smackore_board/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ runs:
export STATUS_FIELD_ID=PVTSSF_lADOAYE_z84AWEIBzgOGd1k
export TARGET_COLUMN_ID=e6b1ee10
export AUTHOR_ORIGIN=$(curl --request GET --url "https://api.github.com/orgs/membraneframework/members" --header "Authorization: Bearer $GH_TOKEN" -s | python scripts/python/get_author_origin.py $AUTHOR_LOGIN)
export AUTHOR_ORIGIN=$(curl --request GET --url "https://api.github.com/orgs/membraneframework/members" --header "Authorization: Bearer $GH_TOKEN" -s | python scripts/python/get_author_origin.py "$AUTHOR_LOGIN")
if [ "$AUTHOR_ORIGIN" == "COMMUNITY" ]
then
Expand Down
3 changes: 2 additions & 1 deletion lib/membrane/core/element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ defmodule Membrane.Core.Element do
effective_flow_control: :push,
handling_action?: false,
pads_to_snapshot: MapSet.new(),
stalker: options.stalker
stalker: options.stalker,
satisfied_auto_output_pads: MapSet.new()
}
|> PadSpecHandler.init_pads()

Expand Down
23 changes: 13 additions & 10 deletions lib/membrane/core/element/atomic_demand.ex
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,14 @@ defmodule Membrane.Core.Element.AtomicDemand do
:ok
end

@spec decrease(t, non_neg_integer()) :: t
@spec decrease(t, non_neg_integer()) :: {{:decreased, integer()}, t} | {:unchanged, t}
def decrease(%__MODULE__{} = atomic_demand, value) do
atomic_demand = Map.update!(atomic_demand, :buffered_decrementation, &(&1 + value))

if atomic_demand.buffered_decrementation >= atomic_demand.throttling_factor do
flush_buffered_decrementation(atomic_demand)
else
atomic_demand
{:unchanged, atomic_demand}
end
end

Expand All @@ -164,14 +164,17 @@ defmodule Membrane.Core.Element.AtomicDemand do

atomic_demand = %{atomic_demand | buffered_decrementation: 0}

if not atomic_demand.toilet_overflowed? and
get_receiver_status(atomic_demand) == {:resolved, :pull} and
get_sender_status(atomic_demand) == {:resolved, :push} and
-1 * atomic_demand_value > atomic_demand.toilet_capacity do
overflow(atomic_demand, atomic_demand_value)
else
atomic_demand
end
atomic_demand =
if not atomic_demand.toilet_overflowed? and
get_receiver_status(atomic_demand) == {:resolved, :pull} and
get_sender_status(atomic_demand) == {:resolved, :push} and
-1 * atomic_demand_value > atomic_demand.toilet_capacity do
overflow(atomic_demand, atomic_demand_value)
else
atomic_demand
end

{{:decreased, atomic_demand_value}, atomic_demand}
end

defp overflow(atomic_demand, atomic_demand_value) do
Expand Down
23 changes: 21 additions & 2 deletions lib/membrane/core/element/demand_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,18 @@ defmodule Membrane.Core.Element.DemandController do
} = pad_data

if AtomicDemand.get(atomic_demand) > 0 do
# tutaj powinno mieć miejsce
# - usuniecie pada z mapsetu
# - sflushowanie kolejek, jesli mapset jest pusty
# zwroc uwage, czy gdzies w czyms w stylu handle_outgoing_buffers nie wjedzie ci tutaj jakas nieprzyjemna rekurencja
# kolejna rzecz: przerwanie rekurencji moze nastąpić, nawet wtedy, gdy kolejki będą miały w sobie bufory
state = Map.update!(state, :satisfied_auto_output_pads, &MapSet.delete(&1, pad_data.ref))
if MapSet.size(state.satisfied_auto_output_pads) == 0 do
AutoFlowUtils.flush_auto_flow_queues1(state)
else
state
end

AutoFlowUtils.auto_adjust_atomic_demand(associated_pads, state)
else
state
Expand Down Expand Up @@ -91,9 +103,16 @@ defmodule Membrane.Core.Element.DemandController do
buffers_size = Buffer.Metric.from_unit(pad_data.demand_unit).buffers_size(buffers)

demand = pad_data.demand - buffers_size
atomic_demand = AtomicDemand.decrease(pad_data.atomic_demand, buffers_size)
{decrease_result, atomic_demand} = AtomicDemand.decrease(pad_data.atomic_demand, buffers_size)

PadModel.set_data!(state, pad_ref, %{
with {:decreased, new_value} when new_value + buffers_size > 0 and new_value <= 0 <-
decrease_result,
%{flow_control: :auto} <- pad_data do
Map.update!(state, :satisfied_auto_output_pads, &MapSet.put(&1, pad_ref))
else
_other -> state
end
|> PadModel.set_data!(pad_ref, %{
pad_data
| demand: demand,
atomic_demand: atomic_demand
Expand Down
56 changes: 43 additions & 13 deletions lib/membrane/core/element/demand_controller/auto_flow_utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do
pad_data = PadModel.get_data!(state, pad_ref)

state.effective_flow_control == :pull and pad_data.direction == :input and
pad_data.flow_control == :auto and pad_data.demand < 0
pad_data.flow_control == :auto and
# pad_data.demand < 0
not output_auto_demand_positive?(state)
end

@spec store_buffers_in_queue(Pad.ref(), [Buffer.t()], State.t()) :: State.t()
Expand Down Expand Up @@ -108,6 +110,7 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do
end
end)

# aktualnie flushujemy bumpniętę pady, a powinniśmy flushować wszystkie auto input pady po tym jak sprawiamy ze mapset jest pusty
flush_auto_flow_queues(bumped_pads, state)
end

Expand Down Expand Up @@ -141,15 +144,7 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do
state.effective_flow_control == :pull and
not pad_data.auto_demand_paused? and
pad_data.demand < pad_data.auto_demand_size / 2 and
Enum.all?(pad_data.associated_pads, &atomic_demand_positive?(&1, state))
end

defp atomic_demand_positive?(pad_ref, state) do
atomic_demand_value =
PadModel.get_data!(state, pad_ref, :atomic_demand)
|> AtomicDemand.get()

atomic_demand_value > 0
output_auto_demand_positive?(state)
end

defp flush_auto_flow_queues([], state), do: state
Expand All @@ -167,15 +162,50 @@ defmodule Membrane.Core.Element.DemandController.AutoFlowUtils do

flush_auto_flow_queues(pads_to_flush, state)

{:empty, empty_queue} ->
state = PadModel.set_data!(state, selected_pad, :auto_flow_queue, empty_queue)

{:empty, _empty_queue} ->
pads_to_flush
|> List.delete(selected_pad)
|> flush_auto_flow_queues(state)
end
end

def flush_auto_flow_queues1(state) do
Enum.flat_map(state.pads_data, fn {pad_ref, pad_data} ->
with %{direction: :input, flow_control: :auto, auto_flow_queue: queue} <- pad_data,
{:value, _value} <- Qex.first(queue) do
[pad_ref]
else
_other -> []
end
end)
|> do_flush_auto_flow_queues1(state)
end

def do_flush_auto_flow_queues1(pads_to_flush, state) do
if pads_to_flush != [] and output_auto_demand_positive?(state) do
selected_pad = Enum.random(pads_to_flush)

PadModel.get_data!(state, selected_pad, :auto_flow_queue)
|> Qex.pop()
|> case do
{{:value, queue_item}, popped_queue} ->
state = PadModel.set_data!(state, selected_pad, popped_queue)
state = exec_queue_item_callback(selected_pad, queue_item, state)
do_flush_auto_flow_queues1(pads_to_flush, state)

{:empty, _empty_queue} ->
pads_to_flush
|> List.delete(selected_pad)
|> do_flush_auto_flow_queues1(state)
end
else
state
end
end

defp output_auto_demand_positive?(%State{satisfied_auto_output_pads: pads}),
do: MapSet.size(pads) == 0

defp exec_queue_item_callback(pad_ref, {:buffers, buffers}, state) do
BufferController.exec_buffer_callback(pad_ref, buffers, state)
end
Expand Down
1 change: 1 addition & 0 deletions lib/membrane/core/element/pad_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ defmodule Membrane.Core.Element.PadController do
PadModel.update_data!(state, pad, :associated_pads, &List.delete(&1, pad_data.ref))
end)
|> PadModel.set_data!(pad_ref, :associated_pads, [])
|> Map.update!(:satisfied_auto_output_pads, &MapSet.delete(&1, pad_ref))

if pad_data.direction == :output,
do: AutoFlowUtils.auto_adjust_atomic_demand(pad_data.associated_pads, state),
Expand Down
6 changes: 4 additions & 2 deletions lib/membrane/core/element/state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ defmodule Membrane.Core.Element.State do
effective_flow_control: EffectiveFlowController.effective_flow_control(),
handling_action?: boolean(),
pads_to_snapshot: MapSet.t(),
stalker: Membrane.Core.Stalker.t()
stalker: Membrane.Core.Stalker.t(),
satisfied_auto_output_pads: MapSet.t()
}

# READ THIS BEFORE ADDING NEW FIELD!!!
Expand Down Expand Up @@ -79,6 +80,7 @@ defmodule Membrane.Core.Element.State do
:demand_size,
:pads_to_snapshot,
:playback_queue,
:pads_data
:pads_data,
:satisfied_auto_output_pads
]
end
Loading

0 comments on commit 93e78c5

Please sign in to comment.