Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fair dispatcher #111

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions lib/gen_stage/fair_dispatcher.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
alias Experimental.GenStage

defmodule GenStage.FairDispatcher do
@moduledoc """
A dispatcher that sends batches to consumers using round robin.

## Options

The partition dispatcher accepts the following options
on initialization:

* `:batch` - the maximum batch size before moving on to the next consumer
when dispatching. For example `batch: 5` will send at most 5 events to a
consumer at a time when there are other consumers.
"""

@behaviour GenStage.Dispatcher

@doc false
def init(opts) do
{:ok, {:queue.new(), %{}, 0, 0, batch_size(opts)}}
end

@doc false
def notify(msg, {_, demands, _, _, _} = state) do
Enum.each(demands, fn {ref, {pid, _}} ->
Process.send(pid, {:"$gen_consumer", {self(), ref}, {:notification, msg}}, [:noconnect])
end)
{:ok, state}
end

@doc false
def subscribe(_, {pid, ref}, {robin, demands, held, pending, batch}) do
robin = :queue.in(ref, robin)
demands = Map.put(demands, ref, {pid, 0})
{:ok, 0, {robin, demands, held, pending, batch}}
end

@doc false
def cancel({_, ref}, {robin, demands, held, pending, batch}) do
robin = :queue.filter(&(&1 != ref), robin)
{{_, current}, demands} = Map.pop(demands, ref)
{:ok, 0, {robin, demands, held-current, current+pending, batch}}
end

@doc false
def ask(counter, {_, ref}, {robin, demands, held, pending, batch}) do
update = fn({pid, current}) -> {pid, current+counter} end
demands = Map.update!(demands, ref, update)

already_sent = min(pending, counter)

new = counter-already_sent
held = held+counter
pending = pending-already_sent

{:ok, new, {robin, demands, held, pending, batch}}
end

@doc false
def dispatch(events, {robin, demands, held, pending, batch}) do
{events, robin, demands, held} =
dispatch(events, robin, demands, held, batch)
{:ok, events, {robin, demands, held, pending, batch}}
end

defp batch_size(opts) do
case Keyword.get(opts, :batch, 100) do
batch when is_integer(batch) and batch > 0 ->
batch
batch ->
msg = "expected :batch to be a positive integer, got #{inspect batch}"
raise ArgumentError, msg
end
end

defp dispatch(events, robin, demands, 0, _batch) do
{events, robin, demands, 0}
end

defp dispatch([], robin, demands, held, _batch) do
{[], robin, demands, held}
end

defp dispatch(events, robin, demands, held, batch) do
{{:value, ref}, robin} = :queue.out(robin)
dispatch_to(ref, events, :queue.in(ref, robin), demands, held, batch)
end

defp dispatch_to(ref, events, robin, demands, held, batch) do
case demands do
%{^ref => {_pid, 0}} ->
dispatch(events, robin, demands, held, batch)
%{^ref => {pid, counter}} ->
{deliver_now, deliver_later} = split_events(events, counter, held, batch)
Process.send(pid, {:"$gen_consumer", {self(), ref}, deliver_now}, [:noconnect])
len = length(deliver_now)
counter = counter - len
held = held - len
demands = %{demands | ref => {pid, counter}}
dispatch(deliver_later, robin, demands, held, batch)
end
end

defp split_events(events, held, held, _batch) do
Enum.split(events, held)
end
defp split_events(events, counter, _held, batch) do
Enum.split(events, min(counter, batch))
end
end
231 changes: 231 additions & 0 deletions test/gen_stage/fair_dispatcher_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
alias Experimental.GenStage

defmodule GenStage.FairDispatcherTest do
use ExUnit.Case, async: true

alias GenStage.FairDispatcher, as: D

defp dispatcher(opts) do
{:ok, {_, %{}, 0, 0, _} = state} = D.init(opts)
state
end

test "subscribes and cancels" do
pid = self()
ref = make_ref()
disp = dispatcher([])

{:ok, 0, disp} = D.subscribe([], {pid, ref}, disp)
{queue, rest} = split(disp)
assert queue == [ref]
assert rest == {%{ref => {pid, 0}}, 0, 0, 100}

{:ok, 0, disp} = D.cancel({pid, ref}, disp)
{queue, rest} = split(disp)
assert queue == []
assert rest == {%{}, 0, 0, 100}
end

test "subscribes, asks and cancels" do
pid = self()
ref = make_ref()
disp = dispatcher([])

# Subscribe, ask and cancel and leave some demand
{:ok, 0, disp} = D.subscribe([], {pid, ref}, disp)
{queue, rest} = split(disp)
assert queue == [ref]
assert rest == {%{ref => {pid, 0}}, 0, 0, 100}

{:ok, 10, disp} = D.ask(10, {pid, ref}, disp)
{queue, rest} = split(disp)
assert queue == [ref]
assert rest == {%{ref => {pid, 10}}, 10, 0, 100}

{:ok, 0, disp} = D.cancel({pid, ref}, disp)
{queue, rest} = split(disp)
assert queue == []
assert rest == {%{}, 0, 10, 100}

# Subscribe, ask and cancel and leave the same demand
{:ok, 0, disp} = D.subscribe([], {pid, ref}, disp)
{queue, rest} = split(disp)
assert queue == [ref]
assert rest == {%{ref => {pid, 0}}, 0, 10, 100}

{:ok, 0, disp} = D.ask(5, {pid, ref}, disp)
{queue, rest} = split(disp)
assert queue == [ref]
assert rest == {%{ref => {pid, 5}}, 5, 5, 100}

{:ok, 0, disp} = D.cancel({pid, ref}, disp)
{queue, rest} = split(disp)
assert queue == []
assert rest == {%{}, 0, 10, 100}
end

test "subscribes, asks and dispatches" do
pid = self()
ref = make_ref()
disp = dispatcher([])
{:ok, 0, disp} = D.subscribe([], {pid, ref}, disp)

{:ok, 3, disp} = D.ask(3, {pid, ref}, disp)
{queue, rest} = split(disp)
assert queue == [ref]
assert rest == {%{ref => {pid, 3}}, 3, 0, 100}

{:ok, [], disp} = D.dispatch([:a], disp)
{queue, rest} = split(disp)
assert queue == [ref]
assert rest == {%{ref => {pid, 2}}, 2, 0, 100}
assert_received {:"$gen_consumer", {_, ^ref}, [:a]}

{:ok, 3, disp} = D.ask(3, {pid, ref}, disp)
{queue, rest} = split(disp)
assert queue == [ref]
assert rest == {%{ref => {pid, 5}}, 5, 0, 100}

{:ok, [:g, :h], disp} = D.dispatch([:b, :c, :d, :e, :f, :g, :h], disp)
{queue, rest} = split(disp)
assert queue == [ref]
assert rest == {%{ref => {pid, 0}}, 0, 0, 100}
assert_received {:"$gen_consumer", {_, ^ref}, [:b, :c, :d, :e, :f]}

{:ok, [:i, :j], disp} = D.dispatch([:i, :j], disp)
{queue, rest} = split(disp)
assert queue == [ref]
assert rest == {%{ref => {pid, 0}}, 0, 0, 100}
refute_received {:"$gen_consumer", {_, ^ref}, _}
end

test "subscribes, asks multiple consumers" do
pid = self()
ref1 = make_ref()
ref2 = make_ref()
ref3 = make_ref()
disp = dispatcher([])

{:ok, 0, disp} = D.subscribe([], {pid, ref1}, disp)
{:ok, 0, disp} = D.subscribe([], {pid, ref2}, disp)
{:ok, 0, disp} = D.subscribe([], {pid, ref3}, disp)

{:ok, 4, disp} = D.ask(4, {pid, ref1}, disp)
{:ok, 2, disp} = D.ask(2, {pid, ref2}, disp)
{:ok, 3, disp} = D.ask(3, {pid, ref3}, disp)
{queue, rest} = split(disp)
assert queue == [ref1, ref2, ref3]
assert rest == {%{ref1 => {pid, 4}, ref2 => {pid, 2}, ref3 => {pid, 3}},
9, 0, 100}

{:ok, 2, disp} = D.ask(2, {pid, ref3}, disp)
{queue, rest} = split(disp)
assert queue == [ref1, ref2, ref3]
assert rest == {%{ref1 => {pid, 4}, ref2 => {pid, 2}, ref3 => {pid, 5}},
11, 0, 100}

{:ok, 4, disp} = D.ask(4, {pid, ref2}, disp)
{queue, rest} = split(disp)
assert queue == [ref1, ref2, ref3]
assert rest == {%{ref1 => {pid, 4}, ref2 => {pid, 6}, ref3 => {pid, 5}},
15, 0, 100}
end

test "subscribes, asks and dispatches to multiple consumers" do
pid = self()
ref1 = make_ref()
ref2 = make_ref()
disp = dispatcher([])

{:ok, 0, disp} = D.subscribe([], {pid, ref1}, disp)
{:ok, 0, disp} = D.subscribe([], {pid, ref2}, disp)

{:ok, 3, disp} = D.ask(3, {pid, ref1}, disp)
{:ok, 2, disp} = D.ask(2, {pid, ref2}, disp)
{queue, rest} = split(disp)
assert queue == [ref1, ref2]
assert rest == {%{ref1 => {pid, 3}, ref2 => {pid, 2}}, 5, 0, 100}

# One batch fits all
{:ok, [], disp} = D.dispatch([:a, :b, :c, :d, :e], disp)
{queue, rest} = split(disp)
assert queue == [ref1, ref2]
assert rest == {%{ref1 => {pid, 0}, ref2 => {pid, 0}}, 0, 0, 100}
assert_received {:"$gen_consumer", {_, ^ref1}, [:a, :b, :c]}
assert_received {:"$gen_consumer", {_, ^ref2}, [:d, :e]}

{:ok, [:a, :b, :c], disp} = D.dispatch([:a, :b, :c], disp)
assert {^queue, ^rest} = split(disp)
refute_received {:"$gen_consumer", {_, _}, _}

# Two batches with left over
{:ok, 3, disp} = D.ask(3, {pid, ref1}, disp)
{:ok, 3, disp} = D.ask(3, {pid, ref2}, disp)
{queue, rest} = split(disp)
assert queue == [ref1, ref2]
assert rest == {%{ref1 => {pid, 3}, ref2 => {pid, 3}}, 6, 0, 100}

{:ok, [], disp} = D.dispatch([:a, :b], disp)
{queue, rest} = split(disp)
assert queue == [ref2, ref1]
assert rest == {%{ref1 => {pid, 1}, ref2 => {pid, 3}}, 4, 0, 100}
assert_received {:"$gen_consumer", {_, ^ref1}, [:a, :b]}

{:ok, [], disp} = D.dispatch([:c, :d], disp)
{queue, rest} = split(disp)
assert queue == [ref1, ref2]
assert rest == {%{ref1 => {pid, 1}, ref2 => {pid, 1}}, 2, 0, 100}
assert_received {:"$gen_consumer", {_, ^ref2}, [:c, :d]}

# Eliminate the left-over
{:ok, [:g], disp} = D.dispatch([:e, :f, :g], disp)
{queue, rest} = split(disp)
assert queue == [ref1, ref2]
assert rest == {%{ref1 => {pid, 0}, ref2 => {pid, 0}}, 0, 0, 100}
assert_received {:"$gen_consumer", {_, ^ref1}, [:e]}
assert_received {:"$gen_consumer", {_, ^ref2}, [:f]}
end

test "delivers notifications to all consumers" do
pid = self()
ref1 = make_ref()
ref2 = make_ref()
disp = dispatcher([])

{:ok, 0, disp} = D.subscribe([], {pid, ref1}, disp)
{:ok, 0, disp} = D.subscribe([], {pid, ref2}, disp)
{:ok, 3, disp} = D.ask(3, {pid, ref1}, disp)

{:ok, notify_disp} = D.notify(:hello, disp)
assert disp == notify_disp

assert_received {:"$gen_consumer", {_, ^ref1}, {:notification, :hello}}
assert_received {:"$gen_consumer", {_, ^ref2}, {:notification, :hello}}
end

test "batch limits max events per consumer" do
pid = self()
ref1 = make_ref()
ref2 = make_ref()
disp = dispatcher([batch: 2])
{_, rest} = split(disp)
assert rest == {%{}, 0, 0, 2}

{:ok, 0, disp} = D.subscribe([], {pid, ref1}, disp)
{:ok, 0, disp} = D.subscribe([], {pid, ref2}, disp)

{:ok, 3, disp} = D.ask(3, {pid, ref1}, disp)
{:ok, 4, disp} = D.ask(4, {pid, ref2}, disp)

{:ok, [], disp} = D.dispatch([:a, :b, :c], disp)
{queue, rest} = split(disp)
assert queue == [ref1, ref2]
assert rest == {%{ref1 => {pid, 1}, ref2 => {pid, 3}}, 4, 0, 2}
assert_received {:"$gen_consumer", {_, ^ref1}, [:a, :b]}
assert_received {:"$gen_consumer", {_, ^ref2}, [:c]}
end

defp split(disp) do
{:queue.to_list(elem(disp, 0)), Tuple.delete_at(disp, 0)}
end
end