From c48f7a88c5c49f8b6fce3a79e871e77d199672db Mon Sep 17 00:00:00 2001 From: Howard Beard-Marlowe Date: Thu, 8 Oct 2020 20:49:22 +0300 Subject: [PATCH 1/6] Adds full support for handle_continue/2 to gen_stage * {:continue, _term} instructions can now be returned as one would expect from gen_server. * :hibernate is now supported on init similar to gen_server. --- lib/gen_stage.ex | 191 ++++++++++++++++++++++++++++++++----- test/gen_stage_test.exs | 203 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 372 insertions(+), 22 deletions(-) diff --git a/lib/gen_stage.ex b/lib/gen_stage.ex index eccd0b1..043e9ae 100644 --- a/lib/gen_stage.ex +++ b/lib/gen_stage.ex @@ -909,11 +909,18 @@ defmodule GenStage do @callback init(args :: term) :: {:producer, state} + | {:producer, state, {:continue, term} | :hibernate} | {:producer, state, [producer_option]} + | {:producer, state, {:continue, term} | :hibernate, [producer_option]} | {:producer_consumer, state} + | {:producer_consumer, state, {:continue, term} | :hibernate} | {:producer_consumer, state, [producer_consumer_option]} + | {:producer_consumer, state, {:continue, term} | :hibernate, + [producer_consumer_option]} | {:consumer, state} + | {:consumer, state, {:continue, term} | :hibernate} | {:consumer, state, [consumer_option]} + | {:consumer, state, {:continue, term} | :hibernate, [consumer_option]} | :ignore | {:stop, reason :: any} when state: any @@ -996,6 +1003,7 @@ defmodule GenStage do @callback handle_demand(demand :: pos_integer, state :: term) :: {:noreply, [event], new_state} | {:noreply, [event], new_state, :hibernate} + | {:noreply, [event], new_state, {:continue, term}} | {:stop, reason, new_state} when new_state: term, reason: term, event: term @@ -1075,6 +1083,7 @@ defmodule GenStage do ) :: {:noreply, [event], new_state} | {:noreply, [event], new_state, :hibernate} + | {:noreply, [event], new_state, {:continue, term}} | {:stop, reason, new_state} when event: term, new_state: term, reason: term @@ -1088,6 +1097,7 @@ defmodule GenStage do @callback handle_events(events :: [event], from, state :: term) :: {:noreply, [event], new_state} | {:noreply, [event], new_state, :hibernate} + | {:noreply, [event], new_state, {:continue, term}} | {:stop, reason, new_state} when new_state: term, reason: term, event: term @@ -1129,8 +1139,10 @@ defmodule GenStage do @callback handle_call(request :: term, from :: GenServer.from(), state :: term) :: {:reply, reply, [event], new_state} | {:reply, reply, [event], new_state, :hibernate} + | {:reply, reply, [event], new_state, {:continue, term}} | {:noreply, [event], new_state} | {:noreply, [event], new_state, :hibernate} + | {:noreply, [event], new_state, {:continue, term}} | {:stop, reason, reply, new_state} | {:stop, reason, new_state} when reply: term, new_state: term, reason: term, event: term @@ -1161,6 +1173,7 @@ defmodule GenStage do @callback handle_cast(request :: term, state :: term) :: {:noreply, [event], new_state} | {:noreply, [event], new_state, :hibernate} + | {:noreply, [event], new_state, {:continue, term}} | {:stop, reason :: term, new_state} when new_state: term, event: term @@ -1181,6 +1194,27 @@ defmodule GenStage do @callback handle_info(message :: term, state :: term) :: {:noreply, [event], new_state} | {:noreply, [event], new_state, :hibernate} + | {:noreply, [event], new_state, {:continue, term}} + | {:stop, reason :: term, new_state} + when new_state: term, event: term + + @doc """ + Invoked to handle `continue` instructions. + + It is useful for performing work after initialization or for splitting the work + in a callback in multiple steps, updating the process state along the way. + + Return values are the same as `c:handle_cast/2`. + + This callback is optional. If one is not implemented, the server will fail + if a continue instruction is used. + + This callback is only supported on Erlang/OTP 21+. + """ + @callback handle_continue(continue :: term, state :: term) :: + {:noreply, [event], new_state} + | {:noreply, [event], new_state, :hibernate} + | {:noreply, [event], new_state, {:continue, term}} | {:stop, reason :: term, new_state} when new_state: term, event: term @@ -1217,6 +1251,7 @@ defmodule GenStage do format_status: 2, handle_call: 3, handle_cast: 2, + handle_continue: 2, handle_info: 2, terminate: 2 ] @@ -1815,22 +1850,58 @@ defmodule GenStage do def init({mod, args}) do case mod.init(args) do {:producer, state} -> - init_producer(mod, [], state) + init_producer(mod, [], state, nil) + + {:producer, state, {:continue, _term} = continue} -> + init_producer(mod, [], state, continue) + + {:producer, state, :hibernate} -> + init_producer(mod, [], state, :hibernate) {:producer, state, opts} when is_list(opts) -> - init_producer(mod, opts, state) + init_producer(mod, opts, state, nil) + + {:producer, state, {:continue, _term} = continue, opts} when is_list(opts) -> + init_producer(mod, opts, state, continue) + + {:producer, state, :hibernate, opts} when is_list(opts) -> + init_producer(mod, opts, state, :hibernate) {:producer_consumer, state} -> - init_producer_consumer(mod, [], state) + init_producer_consumer(mod, [], state, nil) + + {:producer_consumer, state, {:continue, _term} = continue} -> + init_producer_consumer(mod, [], state, continue) + + {:producer_consumer, state, :hibernate} -> + init_producer_consumer(mod, [], state, :hibernate) {:producer_consumer, state, opts} when is_list(opts) -> - init_producer_consumer(mod, opts, state) + init_producer_consumer(mod, opts, state, nil) + + {:producer_consumer, state, {:continue, _term} = continue, opts} when is_list(opts) -> + init_producer_consumer(mod, opts, state, continue) + + {:producer_consumer, state, :hibernate, opts} when is_list(opts) -> + init_producer_consumer(mod, opts, state, :hibernate) {:consumer, state} -> - init_consumer(mod, [], state) + init_consumer(mod, [], state, nil) + + {:consumer, state, {:continue, _term} = continue} -> + init_consumer(mod, [], state, continue) + + {:consumer, state, :hibernate} -> + init_consumer(mod, [], state, :hibernate) {:consumer, state, opts} when is_list(opts) -> - init_consumer(mod, opts, state) + init_consumer(mod, opts, state, nil) + + {:consumer, state, {:continue, _term} = continue, opts} when is_list(opts) -> + init_consumer(mod, opts, state, continue) + + {:consumer, state, :hibernate, opts} when is_list(opts) -> + init_consumer(mod, opts, state, :hibernate) {:stop, _} = stop -> stop @@ -1843,7 +1914,7 @@ defmodule GenStage do end end - defp init_producer(mod, opts, state) do + defp init_producer(mod, opts, state, continue_or_hibernate) do with {:ok, dispatcher_mod, dispatcher_state, opts} <- init_dispatcher(opts), {:ok, buffer_size, opts} <- Utils.validate_integer(opts, :buffer_size, 10000, 0, :infinity, true), @@ -1863,7 +1934,7 @@ defmodule GenStage do dispatcher_state: dispatcher_state } - {:ok, stage} + if continue_or_hibernate, do: {:ok, stage, continue_or_hibernate}, else: {:ok, stage} else {:error, message} -> {:stop, {:bad_opts, message}} end @@ -1885,7 +1956,7 @@ defmodule GenStage do end end - defp init_producer_consumer(mod, opts, state) do + defp init_producer_consumer(mod, opts, state, continue_or_hibernate) do with {:ok, dispatcher_mod, dispatcher_state, opts} <- init_dispatcher(opts), {:ok, subscribe_to, opts} <- Utils.validate_list(opts, :subscribe_to, []), {:ok, buffer_size, opts} <- @@ -1904,22 +1975,68 @@ defmodule GenStage do dispatcher_state: dispatcher_state } - consumer_init_subscribe(subscribe_to, stage) + case handle_gen_server_init_args(continue_or_hibernate, stage) do + {:ok, stage} -> + consumer_init_subscribe(subscribe_to, stage) + + {:ok, stage, args} -> + {:ok, stage} = consumer_init_subscribe(subscribe_to, stage) + {:ok, stage, args} + + {:stop, _, _} = error -> + error + end else {:error, message} -> {:stop, {:bad_opts, message}} end end - defp init_consumer(mod, opts, state) do + defp init_consumer(mod, opts, state, continue_or_hibernate) do with {:ok, subscribe_to, opts} <- Utils.validate_list(opts, :subscribe_to, []), :ok <- Utils.validate_no_opts(opts) do stage = %GenStage{mod: mod, state: state, type: :consumer} - consumer_init_subscribe(subscribe_to, stage) + + case handle_gen_server_init_args(continue_or_hibernate, stage) do + {:ok, stage} -> + consumer_init_subscribe(subscribe_to, stage) + + {:ok, stage, args} -> + {:ok, stage} = consumer_init_subscribe(subscribe_to, stage) + {:ok, stage, args} + + {:stop, _, _} = error -> + error + end else {:error, message} -> {:stop, {:bad_opts, message}} end end + defp handle_gen_server_init_args({:continue, _term} = continue, stage) do + case handle_continue(continue, stage) do + {:noreply, stage} -> + {:ok, stage} + + {:noreply, stage, :hibernate} -> + {:ok, stage, :hibernate} + + {:noreply, stage, {:continue, _term} = continue} -> + {:ok, stage, continue} + + {:stop, reason, stage} -> + {:stop, reason, stage} + end + end + + defp handle_gen_server_init_args(:hibernate, stage), do: {:ok, stage, :hibernate} + defp handle_gen_server_init_args(nil, stage), do: {:ok, stage} + + @doc false + + def handle_continue(continue, %{state: state} = stage) do + noreply_callback(:handle_continue, [continue, state], stage) + end + @doc false def handle_call({:"$info", msg}, _from, stage) do @@ -1948,6 +2065,10 @@ defmodule GenStage do stage = dispatch_events(events, length(events), %{stage | state: state}) {:reply, reply, stage, :hibernate} + {:reply, reply, events, state, {:continue, _term} = continue} -> + stage = dispatch_events(events, length(events), %{stage | state: state}) + {:reply, reply, stage, continue} + {:stop, reason, reply, state} -> {:stop, reason, reply, %{stage | state: state}} @@ -2092,7 +2213,7 @@ defmodule GenStage do case producers do %{^ref => entry} -> {batches, stage} = consumer_receive(from, entry, events, stage) - consumer_dispatch(batches, from, mod, state, stage, false) + consumer_dispatch(batches, from, mod, state, stage, nil) _ -> msg = {:"$gen_producer", {self(), ref}, {:cancel, :unknown_subscription}} @@ -2219,6 +2340,14 @@ defmodule GenStage do end end + defp noreply_callback(:handle_continue, [continue, state], %{mod: mod} = stage) do + if function_exported?(mod, :handle_continue, 2) do + handle_noreply_callback(mod.handle_continue(continue, state), stage) + else + :error_handler.raise_undef_exception(mod, :handle_continue, [continue, state]) + end + end + defp noreply_callback(callback, args, %{mod: mod} = stage) do handle_noreply_callback(apply(mod, callback, args), stage) end @@ -2233,6 +2362,10 @@ defmodule GenStage do stage = dispatch_events(events, length(events), %{stage | state: state}) {:noreply, stage, :hibernate} + {:noreply, events, state, {:continue, _term} = continue} when is_list(events) -> + stage = dispatch_events(events, length(events), %{stage | state: state}) + {:noreply, stage, continue} + {:stop, reason, state} -> {:stop, reason, %{stage | state: state}} @@ -2364,6 +2497,9 @@ defmodule GenStage do # main module must know the consumer is no longer subscribed. dispatcher_callback(:cancel, [{pid, ref}, dispatcher_state], stage) + {:noreply, %{dispatcher_state: dispatcher_state} = stage, _hibernate_or_continue} -> + dispatcher_callback(:cancel, [{pid, ref}, dispatcher_state], stage) + {:stop, _, _} = stop -> stop end @@ -2574,17 +2710,22 @@ defmodule GenStage do {[{events, 0}], stage} end - defp consumer_dispatch([{batch, ask} | batches], from, mod, state, stage, _hibernate?) do + defp consumer_dispatch([{batch, ask} | batches], from, mod, state, stage, _gen_opts) do case mod.handle_events(batch, from, state) do {:noreply, events, state} when is_list(events) -> stage = dispatch_events(events, length(events), stage) ask(from, ask, [:noconnect]) - consumer_dispatch(batches, from, mod, state, stage, false) + consumer_dispatch(batches, from, mod, state, stage, nil) - {:noreply, events, state, :hibernate} when is_list(events) -> + {:noreply, events, state, :hibernate} -> stage = dispatch_events(events, length(events), stage) ask(from, ask, [:noconnect]) - consumer_dispatch(batches, from, mod, state, stage, true) + consumer_dispatch(batches, from, mod, state, stage, :hibernate) + + {:noreply, events, state, {:continue, _} = continue} -> + stage = dispatch_events(events, length(events), stage) + ask(from, ask, [:noconnect]) + consumer_dispatch(batches, from, mod, state, stage, continue) {:stop, reason, state} -> {:stop, reason, %{stage | state: state}} @@ -2594,12 +2735,12 @@ defmodule GenStage do end end - defp consumer_dispatch([], _from, _mod, state, stage, false) do + defp consumer_dispatch([], _from, _mod, state, stage, nil) do {:noreply, %{stage | state: state}} end - defp consumer_dispatch([], _from, _mod, state, stage, true) do - {:noreply, %{stage | state: state}, :hibernate} + defp consumer_dispatch([], _from, _mod, state, stage, gen_opts) do + {:noreply, %{stage | state: state}, gen_opts} end defp consumer_subscribe({to, opts}, stage) when is_list(opts), @@ -2738,11 +2879,11 @@ defmodule GenStage do {producer_id, _, _} = entry from = {producer_id, ref} {batches, stage} = consumer_receive(from, entry, events, stage) - consumer_dispatch(batches, from, mod, state, stage, false) + consumer_dispatch(batches, from, mod, state, stage, nil) %{} -> # We queued but producer was removed - consumer_dispatch([{events, 0}], {:pid, ref}, mod, state, stage, false) + consumer_dispatch([{events, 0}], {:pid, ref}, mod, state, stage, nil) end end @@ -2759,6 +2900,9 @@ defmodule GenStage do {:noreply, stage, :hibernate} -> take_pc_events(queue, counter, stage) + {:noreply, stage, {:continue, _term}} -> + take_pc_events(queue, counter, stage) + {:stop, _, _} = stop -> stop end @@ -2771,6 +2915,9 @@ defmodule GenStage do {:noreply, %{events: {queue, counter}} = stage, :hibernate} -> take_pc_events(queue, counter, stage) + {:noreply, %{events: {queue, counter}} = stage, {:continue, _term}} -> + take_pc_events(queue, counter, stage) + {:stop, _, _} = stop -> stop end diff --git a/test/gen_stage_test.exs b/test/gen_stage_test.exs index 0a59b74..8cd511a 100644 --- a/test/gen_stage_test.exs +++ b/test/gen_stage_test.exs @@ -81,6 +81,109 @@ defmodule GenStageTest do events = Enum.to_list(counter..(counter + demand - 1)) {:noreply, events, counter + demand} end + + # Use continue instructions to modify the counter, this + # can be reached from any gen_server callback by supplying + # a continue instruction with an integer term. + def handle_continue(new_counter, _counter) when is_integer(new_counter) do + {:noreply, [], new_counter} + end + end + + defmodule CounterNestedContinue do + @moduledoc """ + A producer that works as a counter in batches. + It also supports events to be queued via sync + and async calls. A negative counter disables + the counting behaviour. + + This counter uses a nested handle_continue on init. + """ + + use GenStage + + def start_link(init, opts \\ []) do + GenStage.start_link(__MODULE__, init, opts) + end + + def sync_queue(stage, events) do + GenStage.call(stage, {:queue, events}) + end + + def async_queue(stage, events) do + GenStage.cast(stage, {:queue, events}) + end + + def stop(stage) do + GenStage.call(stage, :stop) + end + + ## Callbacks + + def init(init) do + init + end + + def handle_call(:stop, _from, state) do + {:stop, :shutdown, :ok, state} + end + + def handle_call({:early_reply_queue, events}, from, state) do + GenStage.reply(from, state) + {:noreply, events, state} + end + + def handle_call({:queue, events}, _from, state) do + {:reply, state, events, state} + end + + def handle_cast({:queue, events}, state) do + {:noreply, events, state} + end + + def handle_info({:queue, events}, state) do + {:noreply, events, state} + end + + def handle_info(other, state) do + is_pid(state) && send(state, other) + {:noreply, [], state} + end + + def handle_subscribe(:consumer, opts, from, state) do + is_pid(state) && send(state, {:producer_subscribed, from}) + {Keyword.get(opts, :producer_demand, :automatic), state} + end + + def handle_cancel(reason, from, state) do + is_pid(state) && send(state, {:producer_cancelled, from, reason}) + {:noreply, [], state} + end + + def handle_demand(demand, pid) when is_pid(pid) and demand > 0 do + {:noreply, [], pid} + end + + def handle_demand(demand, counter) when demand > 0 do + # If the counter is 3 and we ask for 2 items, we will + # emit the items 3 and 4, and set the state to 5. + events = Enum.to_list(counter..(counter + demand - 1)) + {:noreply, events, counter + demand} + end + + # Use continue instructions to modify the counter, this + # can be reached from any gen_server callback by supplying + # a continue instruction with an integer term. + # + # This particular handle_continue returns another continue instruction + # testing that we handle nested continues properly. + def handle_continue(500, _counter) do + {:noreply, [], 500, {:continue, 2000}} + end + + def handle_continue(2000, _counter) do + {:noreply, [], 2000} + end end defmodule DemandProducer do @@ -139,6 +242,11 @@ defmodule GenStageTest do is_pid(state) && send(state, other) {:noreply, [], state} end + + def handle_continue({:continue, term}, recipient) do + send(recipient, term) + {:noreply, [], recipient} + end end defmodule Postponer do @@ -258,6 +366,11 @@ defmodule GenStageTest do {:noreply, [], recipient} end + def handle_continue({:continue, term}, recipient) do + send(recipient, term) + {:noreply, [], recipient} + end + def terminate(reason, state) do send(state, {:terminated, reason}) end @@ -324,6 +437,96 @@ defmodule GenStageTest do } end + {otp_version, ""} = :otp_release |> :erlang.system_info() |> to_string() |> Integer.parse() + + if otp_version >= 21 do + describe "handle_continue tests" do + test "producing_init with continue instruction setting counter start position" do + {:ok, producer} = Counter.start_link({:producer, 0, {:continue, 500}}) + {:ok, _} = Forwarder.start_link({:consumer, self(), subscribe_to: [producer]}) + + batch = Enum.to_list(0..499) + refute_receive {:consumed, ^batch} + batch = Enum.to_list(500..999) + assert_receive {:consumed, ^batch} + batch = Enum.to_list(1000..1499) + assert_receive {:consumed, ^batch} + end + + test "producer_init with nested continue instruction setting counter start position" do + {:ok, producer} = CounterNestedContinue.start_link({:producer, 0, {:continue, 500}}) + {:ok, _} = Forwarder.start_link({:consumer, self(), subscribe_to: [producer]}) + + # The nested continue sets the counter to 2000 + batch = Enum.to_list(0..499) + refute_receive {:consumed, ^batch} + batch = Enum.to_list(500..999) + refute_receive {:consumed, ^batch} + batch = Enum.to_list(1000..1499) + refute_receive {:consumed, ^batch} + batch = Enum.to_list(1500..1999) + refute_receive {:consumed, ^batch} + batch = Enum.to_list(2000..2499) + assert_receive {:consumed, ^batch} + end + + test "consumer_init with continue instruction" do + {:ok, producer} = Counter.start_link({:producer, 0, {:continue, 500}}) + + {:ok, _} = + Forwarder.start_link( + {:consumer, self(), {:continue, :continue_reached}, subscribe_to: [producer]} + ) + + assert_receive :continue_reached + end + + test "producer_consumer with continue instruction" do + {:ok, producer} = Counter.start_link({:producer, 0}) + + {:ok, _doubler} = + Doubler.start_link( + {:producer_consumer, self(), {:continue, :continue_reached}, + subscribe_to: [{producer, max_demand: 100, min_demand: 80}]} + ) + + assert_receive :continue_reached + end + end + end + + describe "hibernate tests" do + test "producer_init with hibernate instruction" do + {:ok, producer} = Counter.start_link({:producer, 0, :hibernate}) + + assert :erlang.process_info(producer, :current_function) == + {:current_function, {:erlang, :hibernate, 3}} + end + + test "consumer_init with hibernate instruction" do + {:ok, producer} = Counter.start_link({:producer, 0}) + + {:ok, consumer} = + Forwarder.start_link({:consumer, self(), :hibernate, subscribe_to: [producer]}) + + assert :erlang.process_info(consumer, :current_function) == + {:current_function, {:erlang, :hibernate, 3}} + end + + test "producer_consumer with hibernate instruction" do + {:ok, producer} = Counter.start_link({:producer, 0}) + + {:ok, doubler} = + Doubler.start_link( + {:producer_consumer, self(), :hibernate, + subscribe_to: [{producer, max_demand: 100, min_demand: 80}]} + ) + + assert :erlang.process_info(doubler, :current_function) == + {:current_function, {:erlang, :hibernate, 3}} + end + end + describe "producer-to-consumer demand" do test "with default max and min demand" do {:ok, producer} = Counter.start_link({:producer, 0}) From 674c1dc6e954752a27c17bad00d489b985b6418e Mon Sep 17 00:00:00 2001 From: Maarten van Vliet Date: Thu, 11 May 2023 20:54:14 +0200 Subject: [PATCH 2/6] Address PR feedback --- lib/gen_stage.ex | 47 ++++++++++------------------------------- test/gen_stage_test.exs | 21 +++++++++--------- 2 files changed, 22 insertions(+), 46 deletions(-) diff --git a/lib/gen_stage.ex b/lib/gen_stage.ex index 043e9ae..11e73a9 100644 --- a/lib/gen_stage.ex +++ b/lib/gen_stage.ex @@ -1199,10 +1199,12 @@ defmodule GenStage do when new_state: term, event: term @doc """ - Invoked to handle `continue` instructions. + Invoked to handle `:continue` instructions. - It is useful for performing work after initialization or for splitting the work - in a callback in multiple steps, updating the process state along the way. + This callback can be used to perform work right after emitting events from + other callbacks. The "continue mechanism" makes sure that no messages, + calls, casts, or anything else will be handled between a callback emitting + a `:continue` tuple and the `c:handle_continue/2` callback being invoked. Return values are the same as `c:handle_cast/2`. @@ -1852,56 +1854,29 @@ defmodule GenStage do {:producer, state} -> init_producer(mod, [], state, nil) - {:producer, state, {:continue, _term} = continue} -> - init_producer(mod, [], state, continue) - - {:producer, state, :hibernate} -> - init_producer(mod, [], state, :hibernate) - {:producer, state, opts} when is_list(opts) -> init_producer(mod, opts, state, nil) - {:producer, state, {:continue, _term} = continue, opts} when is_list(opts) -> - init_producer(mod, opts, state, continue) - - {:producer, state, :hibernate, opts} when is_list(opts) -> - init_producer(mod, opts, state, :hibernate) + {:producer, state, opts, continue_or_hibernate} when is_list(opts) -> + init_producer(mod, opts, state, continue_or_hibernate) {:producer_consumer, state} -> init_producer_consumer(mod, [], state, nil) - {:producer_consumer, state, {:continue, _term} = continue} -> - init_producer_consumer(mod, [], state, continue) - - {:producer_consumer, state, :hibernate} -> - init_producer_consumer(mod, [], state, :hibernate) - {:producer_consumer, state, opts} when is_list(opts) -> init_producer_consumer(mod, opts, state, nil) - {:producer_consumer, state, {:continue, _term} = continue, opts} when is_list(opts) -> - init_producer_consumer(mod, opts, state, continue) - - {:producer_consumer, state, :hibernate, opts} when is_list(opts) -> - init_producer_consumer(mod, opts, state, :hibernate) + {:producer_consumer, state, opts, continue_or_hibernate} when is_list(opts) -> + init_producer_consumer(mod, opts, state, continue_or_hibernate) {:consumer, state} -> init_consumer(mod, [], state, nil) - {:consumer, state, {:continue, _term} = continue} -> - init_consumer(mod, [], state, continue) - - {:consumer, state, :hibernate} -> - init_consumer(mod, [], state, :hibernate) - {:consumer, state, opts} when is_list(opts) -> init_consumer(mod, opts, state, nil) - {:consumer, state, {:continue, _term} = continue, opts} when is_list(opts) -> - init_consumer(mod, opts, state, continue) - - {:consumer, state, :hibernate, opts} when is_list(opts) -> - init_consumer(mod, opts, state, :hibernate) + {:consumer, state, opts, continue_or_hibernate} when is_list(opts) -> + init_consumer(mod, opts, state, continue_or_hibernate) {:stop, _} = stop -> stop diff --git a/test/gen_stage_test.exs b/test/gen_stage_test.exs index 8cd511a..17bf015 100644 --- a/test/gen_stage_test.exs +++ b/test/gen_stage_test.exs @@ -442,7 +442,7 @@ defmodule GenStageTest do if otp_version >= 21 do describe "handle_continue tests" do test "producing_init with continue instruction setting counter start position" do - {:ok, producer} = Counter.start_link({:producer, 0, {:continue, 500}}) + {:ok, producer} = Counter.start_link({:producer, 0, [], {:continue, 500}}) {:ok, _} = Forwarder.start_link({:consumer, self(), subscribe_to: [producer]}) batch = Enum.to_list(0..499) @@ -454,7 +454,7 @@ defmodule GenStageTest do end test "producer_init with nested continue instruction setting counter start position" do - {:ok, producer} = CounterNestedContinue.start_link({:producer, 0, {:continue, 500}}) + {:ok, producer} = CounterNestedContinue.start_link({:producer, 0, [], {:continue, 500}}) {:ok, _} = Forwarder.start_link({:consumer, self(), subscribe_to: [producer]}) # The nested continue sets the counter to 2000 @@ -471,11 +471,11 @@ defmodule GenStageTest do end test "consumer_init with continue instruction" do - {:ok, producer} = Counter.start_link({:producer, 0, {:continue, 500}}) + {:ok, producer} = Counter.start_link({:producer, 0, [], {:continue, 500}}) {:ok, _} = Forwarder.start_link( - {:consumer, self(), {:continue, :continue_reached}, subscribe_to: [producer]} + {:consumer, self(), [subscribe_to: [producer]], {:continue, :continue_reached}} ) assert_receive :continue_reached @@ -486,8 +486,9 @@ defmodule GenStageTest do {:ok, _doubler} = Doubler.start_link( - {:producer_consumer, self(), {:continue, :continue_reached}, - subscribe_to: [{producer, max_demand: 100, min_demand: 80}]} + {:producer_consumer, self(), + [subscribe_to: [{producer, max_demand: 100, min_demand: 80}]], + {:continue, :continue_reached}} ) assert_receive :continue_reached @@ -497,7 +498,7 @@ defmodule GenStageTest do describe "hibernate tests" do test "producer_init with hibernate instruction" do - {:ok, producer} = Counter.start_link({:producer, 0, :hibernate}) + {:ok, producer} = Counter.start_link({:producer, 0, [], :hibernate}) assert :erlang.process_info(producer, :current_function) == {:current_function, {:erlang, :hibernate, 3}} @@ -507,7 +508,7 @@ defmodule GenStageTest do {:ok, producer} = Counter.start_link({:producer, 0}) {:ok, consumer} = - Forwarder.start_link({:consumer, self(), :hibernate, subscribe_to: [producer]}) + Forwarder.start_link({:consumer, self(), [subscribe_to: [producer]], :hibernate}) assert :erlang.process_info(consumer, :current_function) == {:current_function, {:erlang, :hibernate, 3}} @@ -518,8 +519,8 @@ defmodule GenStageTest do {:ok, doubler} = Doubler.start_link( - {:producer_consumer, self(), :hibernate, - subscribe_to: [{producer, max_demand: 100, min_demand: 80}]} + {:producer_consumer, self(), + [subscribe_to: [{producer, max_demand: 100, min_demand: 80}]], :hibernate} ) assert :erlang.process_info(doubler, :current_function) == From 180a441dbcd2af4fbdc17c4347bb9741e00c9a7a Mon Sep 17 00:00:00 2001 From: Maarten van Vliet Date: Thu, 11 May 2023 22:04:34 +0200 Subject: [PATCH 3/6] Fix callback spec --- lib/gen_stage.ex | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/lib/gen_stage.ex b/lib/gen_stage.ex index 11e73a9..4ce3d26 100644 --- a/lib/gen_stage.ex +++ b/lib/gen_stage.ex @@ -909,18 +909,15 @@ defmodule GenStage do @callback init(args :: term) :: {:producer, state} - | {:producer, state, {:continue, term} | :hibernate} | {:producer, state, [producer_option]} - | {:producer, state, {:continue, term} | :hibernate, [producer_option]} + | {:producer, state, [producer_option], {:continue, term} | :hibernate} | {:producer_consumer, state} - | {:producer_consumer, state, {:continue, term} | :hibernate} | {:producer_consumer, state, [producer_consumer_option]} - | {:producer_consumer, state, {:continue, term} | :hibernate, - [producer_consumer_option]} + | {:producer_consumer, state, [producer_consumer_option], + {:continue, term} | :hibernate} | {:consumer, state} - | {:consumer, state, {:continue, term} | :hibernate} | {:consumer, state, [consumer_option]} - | {:consumer, state, {:continue, term} | :hibernate, [consumer_option]} + | {:consumer, state, [consumer_option], {:continue, term} | :hibernate} | :ignore | {:stop, reason :: any} when state: any From 1a0f4a63e684d89817692a68b62305ad6918f0f2 Mon Sep 17 00:00:00 2001 From: Maarten van Vliet Date: Fri, 12 May 2023 11:44:18 +0200 Subject: [PATCH 4/6] Support timeout in return value --- lib/gen_stage.ex | 49 +++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 44 insertions(+), 5 deletions(-) diff --git a/lib/gen_stage.ex b/lib/gen_stage.ex index 4ce3d26..d7f9a44 100644 --- a/lib/gen_stage.ex +++ b/lib/gen_stage.ex @@ -858,8 +858,9 @@ defmodule GenStage do end The returned tuple may also contain 3 or 4 elements. The third - element may be the `:hibernate` atom or a set of options defined - below. + element may be a set of options defined below. The fourth element + is a timeout, the `:hibernate` atom or a `:continue` tuple. See + the return values for `c:GenServer.init/1` for more information. Returning `:ignore` will cause `start_link/3` to return `:ignore` and the process will exit normally without entering the loop or @@ -910,14 +911,14 @@ defmodule GenStage do @callback init(args :: term) :: {:producer, state} | {:producer, state, [producer_option]} - | {:producer, state, [producer_option], {:continue, term} | :hibernate} + | {:producer, state, [producer_option], timeout() | {:continue, term} | :hibernate} | {:producer_consumer, state} | {:producer_consumer, state, [producer_consumer_option]} | {:producer_consumer, state, [producer_consumer_option], - {:continue, term} | :hibernate} + timeout() | {:continue, term} | :hibernate} | {:consumer, state} | {:consumer, state, [consumer_option]} - | {:consumer, state, [consumer_option], {:continue, term} | :hibernate} + | {:consumer, state, [consumer_option], timeout() | {:continue, term} | :hibernate} | :ignore | {:stop, reason :: any} when state: any @@ -999,6 +1000,7 @@ defmodule GenStage do """ @callback handle_demand(demand :: pos_integer, state :: term) :: {:noreply, [event], new_state} + | {:noreply, [event], new_state, timeout()} | {:noreply, [event], new_state, :hibernate} | {:noreply, [event], new_state, {:continue, term}} | {:stop, reason, new_state} @@ -1079,6 +1081,7 @@ defmodule GenStage do state :: term ) :: {:noreply, [event], new_state} + | {:noreply, [event], new_state, timeout()} | {:noreply, [event], new_state, :hibernate} | {:noreply, [event], new_state, {:continue, term}} | {:stop, reason, new_state} @@ -1093,6 +1096,7 @@ defmodule GenStage do """ @callback handle_events(events :: [event], from, state :: term) :: {:noreply, [event], new_state} + | {:noreply, [event], new_state, timeout()} | {:noreply, [event], new_state, :hibernate} | {:noreply, [event], new_state, {:continue, term}} | {:stop, reason, new_state} @@ -1135,9 +1139,11 @@ defmodule GenStage do """ @callback handle_call(request :: term, from :: GenServer.from(), state :: term) :: {:reply, reply, [event], new_state} + | {:reply, reply, [event], new_state, timeout()} | {:reply, reply, [event], new_state, :hibernate} | {:reply, reply, [event], new_state, {:continue, term}} | {:noreply, [event], new_state} + | {:noreply, [event], new_state, timeout()} | {:noreply, [event], new_state, :hibernate} | {:noreply, [event], new_state, {:continue, term}} | {:stop, reason, reply, new_state} @@ -1154,12 +1160,21 @@ defmodule GenStage do the loop with new state `new_state`. Only `:producer` and `:producer_consumer` stages can return a non-empty list of events. + Returning `{:noreply, [event], state, timeout}` is similar to `{:noreply, state}` + , except that it also sets a timeout. See the "Timeouts" section in the + `GenServer` documentation for more information. + Returning `{:noreply, [event], new_state, :hibernate}` is similar to `{:noreply, new_state}` except the process is hibernated before continuing the loop. See the return values for `c:GenServer.handle_call/3` for more information on hibernation. Only `:producer` and `:producer_consumer` stages can return a non-empty list of events. + Returning `{:noreply, [event], new_state, {:continue, continue_arg}}` is similar + to `{:noreply, new_state}` except that immediately after entering the loop, the + `c:handle_continue/2` callback will be invoked with `continue_arg` as the first + argument and `state` as the second one. + Returning `{:stop, reason, new_state}` stops the loop and `terminate/2` is called with the reason `reason` and state `new_state`. The process exits with reason `reason`. @@ -1169,6 +1184,7 @@ defmodule GenStage do """ @callback handle_cast(request :: term, state :: term) :: {:noreply, [event], new_state} + | {:noreply, [event], new_state, timeout()} | {:noreply, [event], new_state, :hibernate} | {:noreply, [event], new_state, {:continue, term}} | {:stop, reason :: term, new_state} @@ -1190,6 +1206,7 @@ defmodule GenStage do """ @callback handle_info(message :: term, state :: term) :: {:noreply, [event], new_state} + | {:noreply, [event], new_state, timeout()} | {:noreply, [event], new_state, :hibernate} | {:noreply, [event], new_state, {:continue, term}} | {:stop, reason :: term, new_state} @@ -1212,6 +1229,7 @@ defmodule GenStage do """ @callback handle_continue(continue :: term, state :: term) :: {:noreply, [event], new_state} + | {:noreply, [event], new_state, timeout()} | {:noreply, [event], new_state, :hibernate} | {:noreply, [event], new_state, {:continue, term}} | {:stop, reason :: term, new_state} @@ -1995,12 +2013,20 @@ defmodule GenStage do {:noreply, stage, {:continue, _term} = continue} -> {:ok, stage, continue} + {:noreply, stage, timeout} -> + {:ok, stage, timeout} + {:stop, reason, stage} -> {:stop, reason, stage} end end defp handle_gen_server_init_args(:hibernate, stage), do: {:ok, stage, :hibernate} + + defp handle_gen_server_init_args(timeout, stage) + when (is_integer(timeout) and timeout >= 0) or timeout == :infinity, + do: {:ok, stage, timeout} + defp handle_gen_server_init_args(nil, stage), do: {:ok, stage} @doc false @@ -2041,6 +2067,10 @@ defmodule GenStage do stage = dispatch_events(events, length(events), %{stage | state: state}) {:reply, reply, stage, continue} + {:reply, reply, events, state, timeout} -> + stage = dispatch_events(events, length(events), %{stage | state: state}) + {:reply, reply, stage, timeout} + {:stop, reason, reply, state} -> {:stop, reason, reply, %{stage | state: state}} @@ -2338,6 +2368,10 @@ defmodule GenStage do stage = dispatch_events(events, length(events), %{stage | state: state}) {:noreply, stage, continue} + {:noreply, events, state, timeout} when is_list(events) -> + stage = dispatch_events(events, length(events), %{stage | state: state}) + {:noreply, stage, timeout} + {:stop, reason, state} -> {:stop, reason, %{stage | state: state}} @@ -2699,6 +2733,11 @@ defmodule GenStage do ask(from, ask, [:noconnect]) consumer_dispatch(batches, from, mod, state, stage, continue) + {:noreply, events, state, timeout} -> + stage = dispatch_events(events, length(events), stage) + ask(from, ask, [:noconnect]) + consumer_dispatch(batches, from, mod, state, stage, timeout) + {:stop, reason, state} -> {:stop, reason, %{stage | state: state}} From 142da70521b4ee583d81aa9690068a24b98fe0f9 Mon Sep 17 00:00:00 2001 From: Maarten van Vliet Date: Sun, 17 Mar 2024 21:47:19 +0100 Subject: [PATCH 5/6] Rename var to `additional_info` --- lib/gen_stage.ex | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/lib/gen_stage.ex b/lib/gen_stage.ex index d7f9a44..9a031c6 100644 --- a/lib/gen_stage.ex +++ b/lib/gen_stage.ex @@ -1872,8 +1872,8 @@ defmodule GenStage do {:producer, state, opts} when is_list(opts) -> init_producer(mod, opts, state, nil) - {:producer, state, opts, continue_or_hibernate} when is_list(opts) -> - init_producer(mod, opts, state, continue_or_hibernate) + {:producer, state, opts, additional_info} when is_list(opts) -> + init_producer(mod, opts, state, additional_info) {:producer_consumer, state} -> init_producer_consumer(mod, [], state, nil) @@ -1881,8 +1881,8 @@ defmodule GenStage do {:producer_consumer, state, opts} when is_list(opts) -> init_producer_consumer(mod, opts, state, nil) - {:producer_consumer, state, opts, continue_or_hibernate} when is_list(opts) -> - init_producer_consumer(mod, opts, state, continue_or_hibernate) + {:producer_consumer, state, opts, additional_info} when is_list(opts) -> + init_producer_consumer(mod, opts, state, additional_info) {:consumer, state} -> init_consumer(mod, [], state, nil) @@ -1890,8 +1890,8 @@ defmodule GenStage do {:consumer, state, opts} when is_list(opts) -> init_consumer(mod, opts, state, nil) - {:consumer, state, opts, continue_or_hibernate} when is_list(opts) -> - init_consumer(mod, opts, state, continue_or_hibernate) + {:consumer, state, opts, additional_info} when is_list(opts) -> + init_consumer(mod, opts, state, additional_info) {:stop, _} = stop -> stop @@ -1904,7 +1904,7 @@ defmodule GenStage do end end - defp init_producer(mod, opts, state, continue_or_hibernate) do + defp init_producer(mod, opts, state, additional_info) do with {:ok, dispatcher_mod, dispatcher_state, opts} <- init_dispatcher(opts), {:ok, buffer_size, opts} <- Utils.validate_integer(opts, :buffer_size, 10000, 0, :infinity, true), @@ -1924,7 +1924,7 @@ defmodule GenStage do dispatcher_state: dispatcher_state } - if continue_or_hibernate, do: {:ok, stage, continue_or_hibernate}, else: {:ok, stage} + if additional_info, do: {:ok, stage, additional_info}, else: {:ok, stage} else {:error, message} -> {:stop, {:bad_opts, message}} end @@ -1946,7 +1946,7 @@ defmodule GenStage do end end - defp init_producer_consumer(mod, opts, state, continue_or_hibernate) do + defp init_producer_consumer(mod, opts, state, additional_info) do with {:ok, dispatcher_mod, dispatcher_state, opts} <- init_dispatcher(opts), {:ok, subscribe_to, opts} <- Utils.validate_list(opts, :subscribe_to, []), {:ok, buffer_size, opts} <- @@ -1965,7 +1965,7 @@ defmodule GenStage do dispatcher_state: dispatcher_state } - case handle_gen_server_init_args(continue_or_hibernate, stage) do + case handle_gen_server_init_args(additional_info, stage) do {:ok, stage} -> consumer_init_subscribe(subscribe_to, stage) @@ -1981,12 +1981,12 @@ defmodule GenStage do end end - defp init_consumer(mod, opts, state, continue_or_hibernate) do + defp init_consumer(mod, opts, state, additional_info) do with {:ok, subscribe_to, opts} <- Utils.validate_list(opts, :subscribe_to, []), :ok <- Utils.validate_no_opts(opts) do stage = %GenStage{mod: mod, state: state, type: :consumer} - case handle_gen_server_init_args(continue_or_hibernate, stage) do + case handle_gen_server_init_args(additional_info, stage) do {:ok, stage} -> consumer_init_subscribe(subscribe_to, stage) From 117b8df893880705e8d76f3d2bc74c4641f4dd59 Mon Sep 17 00:00:00 2001 From: Maarten van Vliet Date: Mon, 18 Mar 2024 08:53:58 +0100 Subject: [PATCH 6/6] Add init_producer/4 --- lib/gen_stage.ex | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/lib/gen_stage.ex b/lib/gen_stage.ex index 9a031c6..f629375 100644 --- a/lib/gen_stage.ex +++ b/lib/gen_stage.ex @@ -1867,10 +1867,10 @@ defmodule GenStage do def init({mod, args}) do case mod.init(args) do {:producer, state} -> - init_producer(mod, [], state, nil) + init_producer(mod, [], state) {:producer, state, opts} when is_list(opts) -> - init_producer(mod, opts, state, nil) + init_producer(mod, opts, state) {:producer, state, opts, additional_info} when is_list(opts) -> init_producer(mod, opts, state, additional_info) @@ -1904,7 +1904,7 @@ defmodule GenStage do end end - defp init_producer(mod, opts, state, additional_info) do + defp init_producer(mod, opts, state) do with {:ok, dispatcher_mod, dispatcher_state, opts} <- init_dispatcher(opts), {:ok, buffer_size, opts} <- Utils.validate_integer(opts, :buffer_size, 10000, 0, :infinity, true), @@ -1923,13 +1923,18 @@ defmodule GenStage do dispatcher_mod: dispatcher_mod, dispatcher_state: dispatcher_state } - - if additional_info, do: {:ok, stage, additional_info}, else: {:ok, stage} + {:ok, stage} else {:error, message} -> {:stop, {:bad_opts, message}} end end + defp init_producer(mod, opts, state, additional_info) do + with {:ok, stage} <- init_producer(mod, opts, state) do + {:ok, stage, additional_info} + end + end + defp init_dispatcher(opts) do case Keyword.pop(opts, :dispatcher, GenStage.DemandDispatcher) do {dispatcher, opts} when is_atom(dispatcher) ->