diff --git a/.github/FUNDING.yml b/.github/FUNDING.yml deleted file mode 100644 index e12fce9f..00000000 --- a/.github/FUNDING.yml +++ /dev/null @@ -1,2 +0,0 @@ -github: slashdotdash -open_collective: commanded diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 468f9d79..d8a8b9c8 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -4,15 +4,15 @@ on: [push, pull_request] env: MIX_ENV: test - + jobs: build: name: Build and test runs-on: ubuntu-latest strategy: matrix: - otp: ['24.3', '25.0'] - elixir: ['1.14.0'] + otp: ['25.3'] + elixir: ['1.15.7'] steps: - uses: actions/checkout@v2 diff --git a/.tool-versions b/.tool-versions index 73654c14..c1cec712 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1,2 +1,2 @@ -elixir 1.14.0-otp-25 -erlang 25.0.4 +elixir 1.15.7-otp-25 +erlang 25.3.2.8 diff --git a/BACKERS.md b/BACKERS.md index 54b33bb9..5371a040 100644 --- a/BACKERS.md +++ b/BACKERS.md @@ -1,14 +1,5 @@ # Sponsors & Backers -Commanded and EventStore are MIT-licensed open source projects. Ongoing development is only made possible thanks to the following awesome sponsors and backers. - -You can join them and contribute to Commanded's future development. - -* [Become a sponsor on GitHub](https://github.com/sponsors/slashdotdash) -* [Become a backer or sponsor on OpenCollective](https://opencollective.com/commanded). - ---- - ## Sponsors via GitHub Thank you to all our sponsors! 🙏 diff --git a/CHANGELOG.md b/CHANGELOG.md index f6504560..4df18f56 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,8 +1,23 @@ # Changelog -## Next release +## v1.4.3 -- Record aggregate state while processing `Commanded.Aggregate.Multi` ([#507](https://github.com/commanded/commanded/pull/507)). +### Enhancements + +- Use `Logger.warning` to fix deprecation warnings ([#542](https://github.com/commanded/commanded/pull/542)). +- Add typespec to `CompositeRouter.dispatch/2` function ([#536](https://github.com/commanded/commanded/pull/536)). +- Support `opts` in `Commanded.EventStore.append_to_stream` function ([#528](https://github.com/commanded/commanded/pull/528)). +- Process manager metadata access ([#514](https://github.com/commanded/commanded/pull/514)). + +### Bug fixes + +- Correct parameter type in `ProcessManager.after_command/1` callback function ([#533](https://github.com/commanded/commanded/pull/533)). + +## v1.4.2 + +- Record aggregate state while processing `Commanded.Aggregate.Multi` ([#507](https://github.com/commanded/commanded/pull/507)). +- Properly handle EXIT signal in event handler ([#512](https://github.com/commanded/commanded/pull/512)). +- Separate logging a process managers error ([#513](https://github.com/commanded/commanded/pull/513)). ## v1.4.1 @@ -12,7 +27,7 @@ ### Bug fixes -- Remove duplicate apply function call when receiving missed events published to an aggregate's event stream ([364c877](https://github.com/commanded/commanded/commit/364c877e8f30a18d90544676fb58b94132d50720)). +- Remove duplicate apply function call when receiving missed events published to an aggregate's event stream ([364c877](https://github.com/commanded/commanded/commit/364c877e8f30a18d90544676fb58b94132d50720)). - Fix typespec typo in Commanded.Application ([#503](https://github.com/commanded/commanded/pull/503)). ## v1.4.0 diff --git a/README.md b/README.md index 88658a49..e8debf06 100644 --- a/README.md +++ b/README.md @@ -19,19 +19,14 @@ You can use Commanded with one of the following event stores for persistence: Please refer to the [CHANGELOG](CHANGELOG.md) for features, bug fixes, and any upgrade advice included for each release. -Requires Erlang/OTP v21.0 and Elixir v1.9 or later. +Requires Erlang/OTP v21.0 and Elixir v1.11 or later. --- -### Supporting Commanded - -You can help support Commanded by helping to fund its ongoing development, new features, and releases. +#### Sponsors -- [Become a GitHub sponsor](https://github.com/sponsors/slashdotdash) - [View sponsors & backers](BACKERS.md) -#### Sponsors - [![Alembic](https://user-images.githubusercontent.com/3167/177830256-26a74e82-60ff-4c20-bd84-64ee7c12512c.svg "Alembic")](https://alembic.com.au/) --- @@ -149,6 +144,7 @@ Commanded exists thanks to the following people who have contributed. - [David Carlin](https://github.com/davich) - [Damir Vandic](https://github.com/dvic) - [Danni Friedland](https://github.com/BlueHotDog) +- [Dilaksun Bavarajan](https://github.com/DilaksunB) - [Ernesto](https://github.com/lex-mala) - [Fernando Mendes](https://github.com/justmendes) - [Florian Ebeling](https://github.com/febeling) @@ -176,6 +172,7 @@ Commanded exists thanks to the following people who have contributed. - [Raphaël Lustin](https://github.com/rlustin) - [Štefan Ľupták](https://github.com/EskiMag) - [Tobiasz Małecki](https://github.com/amatalai) +- [Vladimir Drobyshevskiy](https://github.com/vheathen) - [Willy Wombat](https://github.com/octowombat) - [Yordis Prieto](https://github.com/yordis) - [Yuri de Figueiredo](https://github.com/y86) @@ -183,6 +180,4 @@ Commanded exists thanks to the following people who have contributed. ## Need help? -Please [open an issue](https://github.com/commanded/commanded/issues) if you encounter a problem, or need assistance. You can also seek help in the [Gitter chat room](https://gitter.im/commanded/Lobby) for Commanded. - -For commercial support, and consultancy, please contact [Ben Smith](mailto:ben@10consulting.com). +Please [open an issue](https://github.com/commanded/commanded/issues) if you encounter a problem, or need assistance. You can also seek help in the #commanded channel in the [official Elixir Slack](https://elixir-slackin.herokuapp.com/). diff --git a/config/test.exs b/config/test.exs index 2173b432..b125360d 100644 --- a/config/test.exs +++ b/config/test.exs @@ -3,11 +3,12 @@ import Config alias Commanded.EventStore.Adapters.InMemory alias Commanded.Serialization.JsonSerializer +config :logger, level: :debug config :logger, :console, level: :debug, format: "[$level] $message\n" config :ex_unit, assert_receive_timeout: 1_000, - capture_log: true, + capture_log: [level: :debug], exclude: [:distributed] config :commanded, diff --git a/lib/application.ex b/lib/application.ex index e3788c23..b166e3b3 100644 --- a/lib/application.ex +++ b/lib/application.ex @@ -3,6 +3,7 @@ defmodule Commanded.Application do alias Commanded.Aggregates.Aggregate alias Commanded.Application.Config + alias Commanded.Commands.Router telemetry_event(%{ event: [:commanded, :application, :dispatch, :start], @@ -286,7 +287,7 @@ defmodule Commanded.Application do `Commanded.Commands.Router` and included in the application. """ - @callback dispatch(command :: struct()) :: Commanded.Commands.Router.dispatch_resp() + @callback dispatch(command :: struct()) :: Router.dispatch_resp() @doc """ Dispatch a registered command. @@ -366,7 +367,7 @@ defmodule Commanded.Application do @callback dispatch( command :: struct(), timeout_or_opts :: non_neg_integer() | :infinity | Keyword.t() - ) :: Commanded.Commands.Router.dispatch_resp() + ) :: Router.dispatch_resp() @doc false def dispatch(application, command, opts \\ []) diff --git a/lib/commanded/aggregates/aggregate.ex b/lib/commanded/aggregates/aggregate.ex index 1bedbea5..517137ca 100644 --- a/lib/commanded/aggregates/aggregate.ex +++ b/lib/commanded/aggregates/aggregate.ex @@ -444,7 +444,7 @@ defmodule Commanded.Aggregates.Aggregate do timeout invalid -> - Logger.warn( + Logger.warning( "Invalid timeout for aggregate lifespan " <> inspect(lifespan) <> ", expected a non-negative integer, `:infinity`, `:hibernate`, `:stop`, or `{:stop, reason}` but got: " <> @@ -594,7 +594,7 @@ defmodule Commanded.Aggregates.Aggregate do %Aggregate{state | snapshotting: snapshotting} {:error, error} -> - Logger.warn(describe(state) <> " snapshot failed due to: " <> inspect(error)) + Logger.warning(describe(state) <> " snapshot failed due to: " <> inspect(error)) state end diff --git a/lib/commanded/aggregates/default_lifespan.ex b/lib/commanded/aggregates/default_lifespan.ex index 8e6e0ee6..f50c0192 100644 --- a/lib/commanded/aggregates/default_lifespan.ex +++ b/lib/commanded/aggregates/default_lifespan.ex @@ -29,7 +29,7 @@ defmodule Commanded.Aggregates.DefaultLifespan do """ @impl AggregateLifespan def after_error(error) do - if Exception.exception?(error) do + if Kernel.is_exception(error) do {:stop, error} else :infinity diff --git a/lib/commanded/commands/composite_router.ex b/lib/commanded/commands/composite_router.ex index 99c996b9..34ba8854 100644 --- a/lib/commanded/commands/composite_router.ex +++ b/lib/commanded/commands/composite_router.ex @@ -42,6 +42,8 @@ defmodule Commanded.Commands.CompositeRouter do A composite router can include composite routers. """ + alias Commanded.Commands.Router + defmacro __using__(opts) do quote do require Logger @@ -91,18 +93,21 @@ defmodule Commanded.Commands.CompositeRouter do Enum.map(@registered_commands, fn {command_module, _router} -> command_module end) end - @doc false + @doc """ + Dispatch a registered command. + """ + @spec dispatch( + command :: struct(), + timeout_or_opts :: non_neg_integer() | :infinity | Keyword.t() + ) :: Router.dispatch_resp() def dispatch(command, opts \\ []) - @doc false def dispatch(command, :infinity), do: do_dispatch(command, timeout: :infinity) - @doc false def dispatch(command, timeout) when is_integer(timeout), do: do_dispatch(command, timeout: timeout) - @doc false def dispatch(command, opts), do: do_dispatch(command, opts) diff --git a/lib/commanded/commands/dispatcher.ex b/lib/commanded/commands/dispatcher.ex index 9a25509c..6266bb9a 100644 --- a/lib/commanded/commands/dispatcher.ex +++ b/lib/commanded/commands/dispatcher.ex @@ -1,16 +1,18 @@ defmodule Commanded.Commands.Dispatcher do @moduledoc false - - require Logger - alias Commanded.Aggregates.Aggregate alias Commanded.Aggregates.ExecutionContext + alias Commanded.Commands.Router alias Commanded.Middleware.Pipeline alias Commanded.Telemetry + require Logger + defmodule Payload do @moduledoc false + @type t :: %Payload{} + defstruct [ :application, :command, @@ -35,13 +37,8 @@ defmodule Commanded.Commands.Dispatcher do # Dispatch the given command to the handler module for the aggregate as # identified. - @spec dispatch(payload :: struct) :: - :ok - | {:ok, aggregate_state :: struct} - | {:ok, aggregate_version :: non_neg_integer()} - | {:ok, events :: list(struct)} - | {:ok, Commanded.Commands.ExecutionResult.t()} - | {:error, error :: term} + @spec dispatch(payload :: Payload.t()) :: + Router.dispatch_resp() | {:ok, events :: list(struct())} def dispatch(%Payload{} = payload) do pipeline = to_pipeline(payload) telemetry_metadata = telemetry_metadata(pipeline, payload) diff --git a/lib/commanded/commands/router.ex b/lib/commanded/commands/router.ex index 42c7e044..60599301 100644 --- a/lib/commanded/commands/router.ex +++ b/lib/commanded/commands/router.ex @@ -206,7 +206,8 @@ defmodule Commanded.Commands.Router do """ - alias Commanded.Commands.Router + alias Commanded.Aggregates.DefaultLifespan + alias Commanded.Commands.{ExecutionResult, Router} alias Commanded.UUID defmacro __using__(opts) do @@ -216,7 +217,7 @@ defmodule Commanded.Commands.Router do import unquote(__MODULE__) @before_compile unquote(__MODULE__) - @behaviour Commanded.Commands.Router + @behaviour Router Module.register_attribute(__MODULE__, :registered_commands, accumulate: true) Module.register_attribute(__MODULE__, :registered_middleware, accumulate: true) @@ -227,7 +228,7 @@ defmodule Commanded.Commands.Router do consistency: Router.get_opt(unquote(opts), :default_consistency, :eventual), returning: Router.get_default_dispatch_return(unquote(opts)), timeout: 5_000, - lifespan: Commanded.Aggregates.DefaultLifespan, + lifespan: DefaultLifespan, metadata: %{}, retry_attempts: 10 ] @@ -369,7 +370,7 @@ defmodule Commanded.Commands.Router do :ok | {:ok, aggregate_state :: struct()} | {:ok, aggregate_version :: non_neg_integer()} - | {:ok, execution_result :: Commanded.Commands.ExecutionResult.t()} + | {:ok, execution_result :: ExecutionResult.t()} | {:error, :unregistered_command} | {:error, :consistency_timeout} | {:error, reason :: term()} @@ -386,7 +387,7 @@ defmodule Commanded.Commands.Router do :ok = BankRouter.dispatch(command) """ - @callback dispatch(command :: struct()) :: dispatch_resp + @callback dispatch(command :: struct()) :: dispatch_resp() @doc """ Dispatch the given command to the registered handler providing a timeout. @@ -462,7 +463,7 @@ defmodule Commanded.Commands.Router do @callback dispatch( command :: struct(), timeout_or_opts :: non_neg_integer() | :infinity | Keyword.t() - ) :: dispatch_resp + ) :: dispatch_resp() defmacro __before_compile__(_env) do quote generated: true do diff --git a/lib/commanded/event/handler.ex b/lib/commanded/event/handler.ex index 6e6076c7..8956c627 100644 --- a/lib/commanded/event/handler.ex +++ b/lib/commanded/event/handler.ex @@ -484,7 +484,7 @@ defmodule Commanded.Event.Handler do case Map.get(context, :failures) do too_many when too_many >= 3 -> # skip bad event after third failure - Logger.warn(fn -> "Skipping bad event, too many failures: " <> inspect(event) end) + Logger.warning("Skipping bad event, too many failures: " <> inspect(event)) :skip @@ -730,7 +730,7 @@ defmodule Commanded.Event.Handler do @impl GenServer def terminate(reason, state) do - Logger.debug(fn -> describe(state) <> " is shutting down due to #{inspect(reason)}" end) + Logger.debug(describe(state) <> " is shutting down due to #{inspect(reason)}") end @doc false @@ -756,10 +756,10 @@ defmodule Commanded.Event.Handler do end {:stop, reason} -> - Logger.debug(fn -> + Logger.debug( describe(state) <> " `before_reset/0` callback has requested to stop. (reason: #{inspect(reason)})" - end) + ) {:stop, reason, state} end @@ -778,7 +778,7 @@ defmodule Commanded.Event.Handler do {:subscribed, subscription}, %Handler{subscription: %Subscription{subscription_pid: subscription}} = state ) do - Logger.debug(fn -> describe(state) <> " has successfully subscribed to event store" end) + Logger.debug(describe(state) <> " has successfully subscribed to event store") %Handler{handler_module: handler_module} = state @@ -787,7 +787,7 @@ defmodule Commanded.Event.Handler do {:noreply, state} {:stop, reason} -> - Logger.debug(fn -> describe(state) <> " `init/0` callback has requested to stop" end) + Logger.debug(describe(state) <> " `init/0` callback has requested to stop") {:stop, reason, state} end @@ -798,7 +798,7 @@ defmodule Commanded.Event.Handler do def handle_info({:events, events}, %Handler{} = state) do %Handler{application: application} = state - Logger.debug(fn -> describe(state) <> " received events: #{inspect(events)}" end) + Logger.debug(describe(state) <> " received events: #{inspect(events)}") try do state = @@ -820,18 +820,30 @@ defmodule Commanded.Event.Handler do {:DOWN, ref, :process, _pid, reason}, %Handler{subscription: %Subscription{subscription_ref: ref}} = state ) do - Logger.debug(fn -> describe(state) <> " subscription DOWN due to: #{inspect(reason)}" end) + Logger.debug(describe(state) <> " subscription DOWN due to: #{inspect(reason)}") # Stop event handler when event store subscription process terminates. {:stop, reason, state} end + @doc false + @impl GenServer + def handle_info({:EXIT, _pid, :normal}, state) do + # linked process exited normally, don't shutdown + {:noreply, state} + end + + @impl GenServer + def handle_info({:EXIT, _pid, reason}, state) do + {:stop, reason, state} + end + @doc false @impl GenServer def handle_info(message, state) do - Logger.error(fn -> + Logger.error( describe(state) <> " received unexpected message: " <> inspect(message, pretty: true) - end) + ) {:noreply, state} end @@ -854,11 +866,11 @@ defmodule Commanded.Event.Handler do {:error, error} -> {backoff, subscription} = Subscription.backoff(subscription) - Logger.info(fn -> + Logger.info( describe(state) <> " failed to subscribe to event store due to: " <> inspect(error) <> ", retrying in " <> inspect(backoff) <> "ms" - end) + ) subscribe_timer = Process.send_after(self(), :subscribe_to_events, backoff) @@ -875,7 +887,7 @@ defmodule Commanded.Event.Handler do %Handler{last_seen_event: last_seen_event} = state ) when not is_nil(last_seen_event) and event_number <= last_seen_event do - Logger.debug(fn -> describe(state) <> " has already seen event ##{inspect(event_number)}" end) + Logger.debug(describe(state) <> " has already seen event ##{inspect(event_number)}") confirm_receipt(event, state) end @@ -910,7 +922,7 @@ defmodule Commanded.Event.Handler do handle_event_error(error, event, failure_context, state) {:error, reason, stacktrace} -> - log_event_error({:error, reason}, event, state) + log_event_error({:error, reason, stacktrace}, event, state) telemetry_exception(start_time, :error, reason, stacktrace, telemetry_metadata) failure_context = build_failure_context(event, context, stacktrace, state) @@ -918,14 +930,14 @@ defmodule Commanded.Event.Handler do handle_event_error({:error, reason}, event, failure_context, state) invalid -> - Logger.error(fn -> + Logger.error( describe(state) <> " failed to handle event " <> inspect(event, pretty: true) <> ", `handle/2` function returned an invalid value: " <> inspect(invalid, pretty: true) <> ", expected `:ok` or `{:error, term}`" - end) + ) telemetry_stop(start_time, Map.put(telemetry_metadata, :error, :invalid_return_value)) @@ -947,8 +959,6 @@ defmodule Commanded.Event.Handler do rescue error -> stacktrace = __STACKTRACE__ - Logger.error(fn -> Exception.format(:error, error, stacktrace) end) - {:error, error, stacktrace} end end @@ -1001,22 +1011,20 @@ defmodule Commanded.Event.Handler do case handler_module.error(error, data, failure_context) do {:retry, %FailureContext{context: context}} when is_map(context) -> # Retry the failed event - Logger.info(fn -> describe(state) <> " is retrying failed event" end) + Logger.info(describe(state) <> " is retrying failed event") handle_event(failed_event, context, state) {:retry, context} when is_map(context) -> # Retry the failed event - Logger.info(fn -> describe(state) <> " is retrying failed event" end) + Logger.info(describe(state) <> " is retrying failed event") handle_event(failed_event, context, state) {:retry, delay, %FailureContext{context: context}} when is_map(context) and is_integer(delay) and delay >= 0 -> # Retry the failed event after waiting for the given delay, in milliseconds - Logger.info(fn -> - describe(state) <> " is retrying failed event after #{inspect(delay)}ms" - end) + Logger.info(describe(state) <> " is retrying failed event after #{inspect(delay)}ms") :timer.sleep(delay) @@ -1024,9 +1032,7 @@ defmodule Commanded.Event.Handler do {:retry, delay, context} when is_map(context) and is_integer(delay) and delay >= 0 -> # Retry the failed event after waiting for the given delay, in milliseconds - Logger.info(fn -> - describe(state) <> " is retrying failed event after #{inspect(delay)}ms" - end) + Logger.info(describe(state) <> " is retrying failed event after #{inspect(delay)}ms") :timer.sleep(delay) @@ -1034,34 +1040,38 @@ defmodule Commanded.Event.Handler do :skip -> # Skip the failed event by confirming receipt - Logger.info(fn -> describe(state) <> " is skipping event" end) + Logger.info(describe(state) <> " is skipping event") confirm_receipt(failed_event, state) {:stop, reason} -> - Logger.warn(fn -> describe(state) <> " has requested to stop: #{inspect(reason)}" end) + Logger.warning(describe(state) <> " has requested to stop: #{inspect(reason)}") # Stop event handler with given reason throw({:error, reason}) invalid -> - Logger.warn(fn -> + Logger.warning( describe(state) <> " returned an invalid error response: #{inspect(invalid)}" - end) + ) # Stop event handler with original error throw(error) end end - defp log_event_error({:error, reason}, %RecordedEvent{} = failed_event, %Handler{} = state) do - Logger.error(fn -> + defp log_event_error(error, %RecordedEvent{} = failed_event, %Handler{} = state) do + reason = + case error do + {:error, reason} -> inspect(reason, pretty: true) + {:error, reason, stacktrace} -> Exception.format(:error, reason, stacktrace) + end + + Logger.error( describe(state) <> - " failed to handle event " <> - inspect(failed_event, pretty: true) <> - " due to: " <> - inspect(reason, pretty: true) - end) + " failed to handle event:\n" <> + inspect(failed_event, pretty: true) <> ", due to:\n" <> reason + ) end # Confirm receipt of event @@ -1075,9 +1085,7 @@ defmodule Commanded.Event.Handler do %RecordedEvent{event_number: event_number} = event - Logger.debug(fn -> - describe(state) <> " confirming receipt of event ##{inspect(event_number)}" - end) + Logger.debug(describe(state) <> " confirming receipt of event ##{inspect(event_number)}") :ok = Subscription.ack_event(subscription, event) :ok = Subscriptions.ack_event(application, handler_name, consistency, event) @@ -1110,7 +1118,7 @@ defmodule Commanded.Event.Handler do rescue error -> stacktrace = __STACKTRACE__ - Logger.error(fn -> Exception.format(:error, error, stacktrace) end) + Logger.error(Exception.format(:error, error, stacktrace)) 1 end diff --git a/lib/commanded/event/upcaster.ex b/lib/commanded/event/upcaster.ex index a4a57d0e..74689940 100644 --- a/lib/commanded/event/upcaster.ex +++ b/lib/commanded/event/upcaster.ex @@ -6,6 +6,8 @@ defprotocol Commanded.Event.Upcaster do You can use an upcaster to change the shape of an event (e.g. add a new field with a default, rename a field) or rename an event. + Upcaster will run for new events and for historical events. + Because the upcaster changes any historical event to the latest version, consumers (aggregates, event handlers, and process managers) only need to support the latest version. diff --git a/lib/commanded/event_store.ex b/lib/commanded/event_store.ex index 3e3e22c5..ec356f48 100644 --- a/lib/commanded/event_store.ex +++ b/lib/commanded/event_store.ex @@ -2,38 +2,38 @@ defmodule Commanded.EventStore do @moduledoc """ Use the event store configured for a Commanded application. - ### Telemetry Events + Adds telemetry events for the following functions. Events are emitted in the form `[:commanded, :event_store, event]` with their spannable postfixes (`start`, `stop`, `exception`) - * ack_event/3 - * adapter/2 - * append_to_stream/4 - * delete_snapshot/2 - * delete_subscription/3 - * read_snapshot/2 - * record_snapshot/2 - * stream_forward/2 - * stream_forward/3 - * stream_forward/4 - * subscribe/2 - * subscribe_to/5 - * subscribe_to/6 - * unsubscribe/2 - """ + * ack_event/3 + * adapter/2 + * append_to_stream/4 + * delete_snapshot/2 + * delete_subscription/3 + * read_snapshot/2 + * record_snapshot/2 + * stream_forward/2 + * stream_forward/3 + * stream_forward/4 + * subscribe/2 + * subscribe_to/5 + * subscribe_to/6 + * unsubscribe/2 + """ alias Commanded.Application alias Commanded.Event.Upcast - @type application :: Commanded.Application.t() + @type application :: Application.t() @type config :: Keyword.t() @doc """ Append one or more events to a stream atomically. """ - def append_to_stream(application, stream_uuid, expected_version, events) do + def append_to_stream(application, stream_uuid, expected_version, events, opts \\ []) do meta = %{ application: application, stream_uuid: stream_uuid, @@ -43,18 +43,21 @@ defmodule Commanded.EventStore do span(:append_to_stream, meta, fn -> {adapter, adapter_meta} = Application.event_store_adapter(application) - adapter.append_to_stream( - adapter_meta, - stream_uuid, - expected_version, - events - ) + if function_exported?(adapter, :append_to_stream, 5) do + adapter.append_to_stream(adapter_meta, stream_uuid, expected_version, events, opts) + else + adapter.append_to_stream( + adapter_meta, + stream_uuid, + expected_version, + events + ) + end end) end @doc """ - Streams events from the given stream, in the order in which they were - originally written. + Streams events from the given stream, in the order in which they were originally written. """ def stream_forward(application, stream_uuid, start_version \\ 0, read_batch_size \\ 1_000) do meta = %{ @@ -138,7 +141,7 @@ defmodule Commanded.EventStore do subscription_name, subscriber, start_from, - options \\ [] + opts \\ [] ) do meta = %{ application: application, @@ -158,7 +161,7 @@ defmodule Commanded.EventStore do subscription_name, subscriber, start_from, - options + opts ) else adapter.subscribe_to( diff --git a/lib/commanded/event_store/adapter.ex b/lib/commanded/event_store/adapter.ex index 8e618e69..5a215b08 100644 --- a/lib/commanded/event_store/adapter.ex +++ b/lib/commanded/event_store/adapter.ex @@ -16,7 +16,6 @@ defmodule Commanded.EventStore.Adapter do @type subscriber :: pid @type source_uuid :: String.t() @type error :: term - @type options :: Keyword.t() @doc """ Return a child spec defining all processes required by the event store. @@ -31,7 +30,8 @@ defmodule Commanded.EventStore.Adapter do adapter_meta, stream_uuid, expected_version, - events :: list(EventData.t()) + events :: list(EventData.t()), + opts :: Keyword.t() ) :: :ok | {:error, :wrong_expected_version} @@ -71,7 +71,7 @@ defmodule Commanded.EventStore.Adapter do subscription_name, subscriber, start_from, - options + opts :: Keyword.t() ) :: {:ok, subscription} | {:error, :subscription_already_exists} diff --git a/lib/commanded/event_store/adapters/in_memory.ex b/lib/commanded/event_store/adapters/in_memory.ex index 97254cd8..28b25e40 100644 --- a/lib/commanded/event_store/adapters/in_memory.ex +++ b/lib/commanded/event_store/adapters/in_memory.ex @@ -64,7 +64,7 @@ defmodule Commanded.EventStore.Adapters.InMemory do end @impl Commanded.EventStore.Adapter - def append_to_stream(adapter_meta, stream_uuid, expected_version, events) do + def append_to_stream(adapter_meta, stream_uuid, expected_version, events, _opts \\ []) do event_store = event_store_name(adapter_meta) GenServer.call(event_store, {:append, stream_uuid, expected_version, events}) diff --git a/lib/commanded/event_store/event_data.ex b/lib/commanded/event_store/event_data.ex index 25457d3e..3191b04e 100644 --- a/lib/commanded/event_store/event_data.ex +++ b/lib/commanded/event_store/event_data.ex @@ -7,7 +7,7 @@ defmodule Commanded.EventStore.EventData do @type uuid :: String.t() @type t :: %Commanded.EventStore.EventData{ - causation_id: uuid(), + causation_id: uuid() | nil, correlation_id: uuid(), event_type: String.t(), data: struct(), diff --git a/lib/commanded/event_store/recorded_event.ex b/lib/commanded/event_store/recorded_event.ex index a1693529..2177c327 100644 --- a/lib/commanded/event_store/recorded_event.ex +++ b/lib/commanded/event_store/recorded_event.ex @@ -32,19 +32,42 @@ defmodule Commanded.EventStore.RecordedEvent do alias Commanded.EventStore.RecordedEvent + @type causation_id :: uuid() | nil + @type correlation_id :: uuid() | nil + @type created_at :: DateTime.t() + @type data :: domain_event() + @type domain_event :: struct() + @type event_id :: uuid() + @type event_number :: non_neg_integer() + @type event_type :: String.t() + @type metadata :: map() + @type stream_id :: String.t() + @type stream_version :: non_neg_integer() @type uuid :: String.t() @type t :: %RecordedEvent{ - event_id: uuid(), - event_number: non_neg_integer(), - stream_id: String.t(), - stream_version: non_neg_integer(), - causation_id: uuid() | nil, - correlation_id: uuid() | nil, - event_type: String.t(), - data: struct(), - metadata: map(), - created_at: DateTime.t() + event_id: event_id(), + event_number: event_number(), + stream_id: stream_id(), + stream_version: stream_version(), + causation_id: causation_id(), + correlation_id: correlation_id(), + event_type: event_type(), + data: data(), + metadata: metadata(), + created_at: created_at() + } + + @type enriched_metadata :: %{ + :event_id => event_id(), + :event_number => event_number(), + :stream_id => stream_id(), + :stream_version => stream_version(), + :correlation_id => correlation_id(), + :causation_id => causation_id(), + :created_at => created_at(), + optional(atom()) => term(), + optional(String.t()) => term() } defstruct [ @@ -56,15 +79,15 @@ defmodule Commanded.EventStore.RecordedEvent do :correlation_id, :event_type, :data, - :metadata, - :created_at + :created_at, + metadata: %{} ] @doc """ Enrich the event's metadata with fields from the `RecordedEvent` struct and any additional metadata passed as an option. """ - @spec enrich_metadata(t(), [{:additional_metadata, map()}]) :: map() + @spec enrich_metadata(t(), [{:additional_metadata, map()}]) :: enriched_metadata() def enrich_metadata(%RecordedEvent{} = event, opts) do %RecordedEvent{ event_id: event_id, @@ -77,6 +100,8 @@ defmodule Commanded.EventStore.RecordedEvent do metadata: metadata } = event + additional_metadata = Keyword.get(opts, :additional_metadata, %{}) + %{ event_id: event_id, event_number: event_number, @@ -86,7 +111,7 @@ defmodule Commanded.EventStore.RecordedEvent do causation_id: causation_id, created_at: created_at } - |> Map.merge(Keyword.get(opts, :additional_metadata, %{})) |> Map.merge(metadata || %{}) + |> Map.merge(additional_metadata) end end diff --git a/lib/commanded/middleware/consistency_guarantee.ex b/lib/commanded/middleware/consistency_guarantee.ex index ea7b842d..d340a27e 100644 --- a/lib/commanded/middleware/consistency_guarantee.ex +++ b/lib/commanded/middleware/consistency_guarantee.ex @@ -44,7 +44,7 @@ defmodule Commanded.Middleware.ConsistencyGuarantee do pipeline {:error, :timeout} -> - Logger.warn(fn -> + Logger.warning(fn -> "Consistency timeout waiting for aggregate #{inspect(aggregate_uuid)} at version #{inspect(aggregate_version)}" end) diff --git a/lib/commanded/process_managers/failure_context.ex b/lib/commanded/process_managers/failure_context.ex index ef93e31a..8343ab38 100644 --- a/lib/commanded/process_managers/failure_context.ex +++ b/lib/commanded/process_managers/failure_context.ex @@ -7,28 +7,33 @@ defmodule Commanded.ProcessManagers.FailureContext do - `context` - the context map passed between each failure and may be used to track state between retries, such as to count failures. + - `enriched_metadata` - the enriched metadata associated with the event. + - `last_event` - the last event the process manager received. - `pending_commands` - the pending commands that were not executed yet. - `process_manager_state` - the state the process manager would be in - if the command would not fail. + if the event handling or command dispatch had not failed. - `stacktrace` - the stacktrace if the error was an unhandled exception. """ + alias Commanded.EventStore.RecordedEvent @type t :: %__MODULE__{ context: map(), - last_event: struct(), + enriched_metadata: RecordedEvent.enriched_metadata(), + last_event: RecordedEvent.t(), pending_commands: [struct()], process_manager_state: struct(), stacktrace: Exception.stacktrace() | nil } defstruct [ - :process_manager_state, + :enriched_metadata, :last_event, + :process_manager_state, :stacktrace, context: %{}, pending_commands: [] diff --git a/lib/commanded/process_managers/process_manager.ex b/lib/commanded/process_managers/process_manager.ex index 4b04943f..f4ee27a6 100644 --- a/lib/commanded/process_managers/process_manager.ex +++ b/lib/commanded/process_managers/process_manager.ex @@ -60,9 +60,12 @@ defmodule Commanded.ProcessManagers.ProcessManager do manager module and implement the callback functions defined in the behaviour: - `c:interested?/1` + - `c:interested?/2` - `c:handle/2` + - `c:handle/3` - `c:apply/2` - `c:after_command/2` + - `c:after_command/3` - `c:error/3` Please read the [Process managers](process-managers.html) guide for more @@ -79,27 +82,41 @@ defmodule Commanded.ProcessManagers.ProcessManager do def interested?(%AnEvent{uuid: uuid}), do: {:start, uuid} + def interested?(%AnotherEvent{}, metadata), + do: {:continue, Map.fetch!(metadata, :correlation_id)} + def handle(%ExampleProcessManager{}, %ExampleEvent{}) do [ %ExampleCommand{} ] end + def handle(%ExampleProcessManager{}, %AnotherEvent{}, _metadata) do + [ + %AnotherCommand{} + ] + end + def after_command(%ExampleProcessManager{}, %ExampleCommand{}) do + :continue + end + + def after_command(%ExampleProcessManager{}, %AnotherCommand{}, _metadata) do :stop end def error({:error, failure}, %ExampleEvent{}, _failure_context) do # Retry, skip, ignore, or stop process manager on error handling event + :skip end def error({:error, failure}, %ExampleCommand{}, _failure_context) do # Retry, skip, ignore, or stop process manager on error dispatching command + :skip end end - Start the process manager (or configure as a worker inside a - [Supervisor](supervision.html)) + Start the process manager (or configure as a worker inside a [Supervisor](supervision.html)) {:ok, process_manager} = ExampleProcessManager.start_link() @@ -279,11 +296,19 @@ defmodule Commanded.ProcessManagers.ProcessManager do """ - alias Commanded.ProcessManagers.FailureContext + alias Commanded.EventStore.RecordedEvent - @type domain_event :: struct - @type command :: struct - @type process_manager :: struct + alias Commanded.ProcessManagers.{ + FailureContext, + ProcessManager, + ProcessManagerInstance, + ProcessRouter + } + + @type domain_event :: struct() + @type enriched_metadata :: RecordedEvent.enriched_metadata() + @type command :: struct() + @type process_manager :: struct() @type process_uuid :: String.t() | [String.t()] @type consistency :: :eventual | :strong @@ -299,8 +324,21 @@ defmodule Commanded.ProcessManagers.ProcessManager do @doc """ Is the process manager interested in the given command? - The `c:interested?/1` function is used to indicate which events the process - manager receives. The response is used to route the event to an existing + See `c:interested?/2` for details. + """ + @callback interested?(domain_event) :: + {:start, process_uuid} + | {:start!, process_uuid} + | {:continue, process_uuid} + | {:continue!, process_uuid} + | {:stop, process_uuid} + | false + + @doc """ + Is the process manager interested in the given command? + + The `c:interested?/2` function is used to indicate which events + the process manager receives. The response is used to route the event to an existing instance or start a new process instance: - `{:start, process_uuid}` - create a new instance of the process manager. @@ -331,7 +369,7 @@ defmodule Commanded.ProcessManagers.ProcessManager do problematic event. """ - @callback interested?(domain_event) :: + @callback interested?(domain_event, enriched_metadata) :: {:start, process_uuid} | {:start!, process_uuid} | {:continue, process_uuid} @@ -343,37 +381,62 @@ defmodule Commanded.ProcessManagers.ProcessManager do Stop the process manager instance after a command is successfully dispatched. - The `c:after_command/2` function can be omitted if you do not need to stop - after a specific command or if you would instead use the `c:interested?/1` + See `c:after_command/3` for details. + """ + @callback after_command(process_manager, command) :: :continue | :stop + + @doc """ + Stop the process manager instance after a command is successfully + dispatched. + + The `c:after_command/3` function can be omitted if you do not need to stop + after a specific command or if you would instead use the `c:interested?/2` stop mechanism. """ - @callback after_command(process_manager, domain_event) :: :continue | :stop + @callback after_command(process_manager, command, enriched_metadata) :: :continue | :stop @doc """ Process manager instance handles a domain event, returning any commands to dispatch. - A `c:handle/2` function can be defined for each `:start` and `:continue` - tagged event previously specified. It receives the process manager's state and - the event to be handled. It must return the commands to be dispatched. This - may be none, a single command, or many commands. + See `c:handle/3` function for details. + """ + @callback handle(process_manager, domain_event) :: command | list(command) | {:error, term} + + @doc """ + Process manager instance handles a domain event, returning any commands to + dispatch. - The `c:handle/2` function can be omitted if you do not need to dispatch a + A `c:handle/3` function can be defined for each `:start` and `:continue` + tagged event previously specified. It receives the process manager's state, + event to be handled, and the event's enriched metadata. It must return the + commands to be dispatched. This may be none, a single command, or many + commands. + + The `c:handle/3` function can be omitted if you do not need to dispatch a command and are only mutating the process manager's state. """ - @callback handle(process_manager, domain_event) :: command | list(command) | {:error, term} + @callback handle(process_manager, domain_event, enriched_metadata) :: + command | list(command) | {:error, term} @doc """ Mutate the process manager's state by applying the domain event. - The `c:apply/2` function is used to mutate the process manager's state. It - receives the current state and the domain event, and must return the modified - state. + See `c:apply/3` function for details. + """ + @callback apply(process_manager, domain_event) :: process_manager + + @doc """ + Mutate the process manager's state by applying the domain event. + + The `c:apply/3` function is used to mutate the process manager's state. It + receives the current state, the domain event and the event metadata, and must + return the modified state. This callback function is optional, the default behaviour is to retain the process manager's current state. """ - @callback apply(process_manager, domain_event) :: process_manager + @callback apply(process_manager, domain_event, enriched_metadata) :: process_manager @doc """ Called when a command dispatch or event handling returns an error. @@ -438,10 +501,16 @@ defmodule Commanded.ProcessManagers.ProcessManager do | {:skip, :continue_pending} | {:stop, reason :: term()} - @optional_callbacks init: 1, handle: 2, apply: 2, error: 3, interested?: 1, after_command: 2 - - alias Commanded.ProcessManagers.ProcessManager - alias Commanded.ProcessManagers.ProcessRouter + @optional_callbacks init: 1, + handle: 2, + handle: 3, + apply: 2, + apply: 3, + error: 3, + interested?: 1, + interested?: 2, + after_command: 2, + after_command: 3 @doc false defmacro __using__(using_opts) do @@ -491,14 +560,30 @@ defmodule Commanded.ProcessManagers.ProcessManager do # Include default fallback functions at end, with lowest precedence quote generated: true do @doc false - def after_command(_process_manager, _event), do: :continue + def after_command(process_manager, command, _metadata), + do: after_command(process_manager, command) + + @doc false + def after_command(_process_manager, _command), do: :continue + + # @doc false + def interested?(event, _metadata), + do: interested?(event) @doc false def interested?(_event), do: false + @doc false + def handle(process_manager, event, _metadata), + do: handle(process_manager, event) + @doc false def handle(_process_manager, _event), do: [] + @doc false + def apply(process_manager, event, _metadata), + do: __MODULE__.apply(process_manager, event) + @doc false def apply(process_manager, _event), do: process_manager @@ -510,8 +595,8 @@ defmodule Commanded.ProcessManagers.ProcessManager do @doc """ Get the identity of the current process instance. - This must only be called within a process manager's `handle/2` or `apply/2` - callback function. + This must only be called within a process manager's `handle/2`, `handle/3`, `apply/2`, or + `apply/3` callback functions. ## Example @@ -532,7 +617,7 @@ defmodule Commanded.ProcessManagers.ProcessManager do end """ - defdelegate identity(), to: Commanded.ProcessManagers.ProcessManagerInstance + defdelegate identity, to: ProcessManagerInstance # GenServer start options @start_opts [:debug, :name, :timeout, :spawn_opt, :hibernate_after] @@ -566,6 +651,7 @@ defmodule Commanded.ProcessManagers.ProcessManager do end {name, config} = Keyword.pop(config, :name) + name = parse_name(name) unless name do diff --git a/lib/commanded/process_managers/process_manager_instance.ex b/lib/commanded/process_managers/process_manager_instance.ex index 56606f7b..37728830 100644 --- a/lib/commanded/process_managers/process_manager_instance.ex +++ b/lib/commanded/process_managers/process_manager_instance.ex @@ -91,10 +91,12 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstance do state = case EventStore.read_snapshot(application, snapshot_uuid(state)) do {:ok, snapshot} -> + %SnapshotData{data: data, source_version: source_version} = snapshot + %State{ state - | process_state: snapshot.data, - last_seen_event: snapshot.source_version + | process_state: data, + last_seen_event: source_version } {:error, :snapshot_not_found} -> @@ -179,8 +181,14 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstance do end defp process_unseen_event(%RecordedEvent{} = event, %State{} = state, context \\ %{}) do - %RecordedEvent{correlation_id: correlation_id, event_id: event_id, event_number: event_number} = - event + %RecordedEvent{ + correlation_id: correlation_id, + event_id: event_id, + event_number: event_number, + metadata: metadata + } = event + + %State{process_state: process_state} = state telemetry_metadata = telemetry_metadata(event, state) start_time = telemetry_start(telemetry_metadata) @@ -189,8 +197,9 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstance do {:error, error} -> failure_context = %FailureContext{ context: context, + enriched_metadata: enrich_metadata(event, state), last_event: event, - process_manager_state: state + process_manager_state: process_state } telemetry_stop(start_time, telemetry_metadata, {:error, error}) @@ -200,8 +209,9 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstance do {:error, error, stacktrace} -> failure_context = %FailureContext{ context: context, + enriched_metadata: enrich_metadata(event, state), last_event: event, - process_manager_state: state, + process_manager_state: process_state, stacktrace: stacktrace } @@ -213,7 +223,12 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstance do commands = List.wrap(commands) # Copy event id, as causation id, and correlation id from handled event. - opts = [causation_id: event_id, correlation_id: correlation_id, returning: false] + opts = [ + causation_id: event_id, + correlation_id: correlation_id, + metadata: metadata || %{}, + returning: false + ] with :ok <- dispatch_commands(commands, opts, state, event) do telemetry_stop(start_time, telemetry_metadata, {:ok, commands}) @@ -222,24 +237,27 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstance do {:error, error, stacktrace} -> failure_context = %FailureContext{ context: context, + enriched_metadata: enrich_metadata(event, state), last_event: event, - process_manager_state: state, + process_manager_state: process_state, stacktrace: stacktrace } handle_event_error({:error, error}, event, failure_context, state) - process_state -> + updated_process_state -> state = %State{ state - | process_state: process_state, + | process_state: updated_process_state, last_seen_event: event_number } :ok = persist_state(event_number, state) :ok = ack_event(event, state) - handle_after_command(commands, state) + enriched_metadata = enrich_metadata(event, state) + + handle_after_command(commands, enriched_metadata, state) end else {:stop, reason} -> @@ -260,8 +278,10 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstance do process_state: process_state } = state + enriched_metadata = enrich_metadata(event, state) + try do - process_manager_module.handle(process_state, data) + process_manager_module.handle(process_state, data, enriched_metadata) rescue error -> stacktrace = __STACKTRACE__ @@ -272,7 +292,7 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstance do end defp handle_event_error( - {:error, reason} = error, + {:error, _error} = error, %RecordedEvent{} = failed_event, %FailureContext{} = failure_context, %State{} = state @@ -280,13 +300,7 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstance do %RecordedEvent{data: data} = failed_event %State{idle_timeout: idle_timeout, process_manager_module: process_manager_module} = state - Logger.error(fn -> - describe(state) <> - " failed to handle event " <> - inspect(failed_event, pretty: true) <> - " due to: " <> - inspect(reason, pretty: true) - end) + log_event_error(error, failed_event, state) case process_manager_module.error(error, data, failure_context) do {:retry, %FailureContext{context: context}} when is_map(context) -> @@ -332,12 +346,12 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstance do {:stop, error} -> # Stop the process manager instance - Logger.warn(fn -> describe(state) <> " has requested to stop: #{inspect(error)}" end) + Logger.warning(fn -> describe(state) <> " has requested to stop: #{inspect(error)}" end) {:stop, error, state} invalid -> - Logger.warn(fn -> + Logger.warning(fn -> describe(state) <> " returned an invalid error response: #{inspect(invalid)}" end) @@ -346,19 +360,29 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstance do end end - defp handle_after_command([], %State{} = state) do + defp log_event_error({:error, reason}, %RecordedEvent{} = failed_event, %State{} = state) do + Logger.error(fn -> + describe(state) <> + " failed to handle event " <> + inspect(failed_event, pretty: true) <> + " due to: " <> + inspect(reason, pretty: true) + end) + end + + defp handle_after_command([], _metadata, %State{} = state) do %State{idle_timeout: idle_timeout} = state {:noreply, state, idle_timeout} end - defp handle_after_command([command | commands], %State{} = state) do + defp handle_after_command([command | commands], metadata, %State{} = state) do %State{ process_manager_module: process_manager_module, process_state: process_state } = state - case process_manager_module.after_command(process_state, command) do + case process_manager_module.after_command(process_state, command, metadata) do :stop -> Logger.debug(fn -> describe(state) <> " has been stopped by command " <> inspect(command) @@ -369,7 +393,7 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstance do {:stop, :normal, state} _ -> - handle_after_command(commands, state) + handle_after_command(commands, metadata, state) end end @@ -382,8 +406,10 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstance do process_state: process_state } = state + enriched_metadata = enrich_metadata(event, state) + try do - process_manager_module.apply(process_state, data) + process_manager_module.apply(process_state, data, enriched_metadata) rescue error -> stacktrace = __STACKTRACE__ @@ -397,33 +423,34 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstance do defp dispatch_commands([], _opts, _state, _last_event, _context), do: :ok defp dispatch_commands([command | pending_commands], opts, state, last_event, context) do - %State{application: application} = state + %State{application: application, process_state: initial_process_state} = state - Logger.debug(fn -> - describe(state) <> " attempting to dispatch command: " <> inspect(command) - end) + Logger.debug(describe(state) <> " attempting to dispatch command: " <> inspect(command)) case Application.dispatch(application, command, opts) do :ok -> dispatch_commands(pending_commands, opts, state, last_event) {:error, _error} = error -> - Logger.warn(fn -> + Logger.warning( describe(state) <> " failed to dispatch command " <> inspect(command) <> " due to: " <> inspect(error) - end) + ) process_manager_state = case mutate_state(last_event, state) do - {:error, _, _} -> state - process_manager_state -> process_manager_state + {:error, _, _} -> initial_process_state + updated_manager_state -> updated_manager_state end + enriched_metadata = enrich_metadata(last_event, state) + failure_context = %FailureContext{ - pending_commands: pending_commands, - process_manager_state: process_manager_state, + context: context, + enriched_metadata: enriched_metadata, last_event: last_event, - context: context + pending_commands: pending_commands, + process_manager_state: process_manager_state } dispatch_failure(error, command, opts, failure_context, state) @@ -504,12 +531,12 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstance do {:stop, reason} = reply -> # Stop process manager - Logger.warn(fn -> describe(state) <> " has requested to stop: #{inspect(reason)}" end) + Logger.warning(fn -> describe(state) <> " has requested to stop: #{inspect(reason)}" end) reply invalid -> - Logger.warn(fn -> + Logger.warning(fn -> describe(state) <> " returned an invalid error response: #{inspect(invalid)}" end) @@ -556,6 +583,16 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstance do inspect(process_manager_name) <> "-" <> inspect(process_uuid) end + defp enrich_metadata(%RecordedEvent{} = event, %State{} = state) do + %State{application: application} = state + + RecordedEvent.enrich_metadata(event, + additional_metadata: %{ + application: application + } + ) + end + defp telemetry_start(telemetry_metadata) do Telemetry.start([:commanded, :process_manager, :handle], telemetry_metadata) end diff --git a/lib/commanded/process_managers/process_router.ex b/lib/commanded/process_managers/process_router.ex index b7cd5549..cde836ed 100644 --- a/lib/commanded/process_managers/process_router.ex +++ b/lib/commanded/process_managers/process_router.ex @@ -257,7 +257,7 @@ defmodule Commanded.ProcessManagers.ProcessRouter do # Stop process router when a process manager instance terminates abnormally. @impl GenServer def handle_info({:DOWN, _ref, :process, _pid, reason}, %State{} = state) do - Logger.warn(fn -> describe(state) <> " is stopping due to: #{inspect(reason)}" end) + Logger.warning(fn -> describe(state) <> " is stopping due to: #{inspect(reason)}" end) {:stop, reason, state} end @@ -295,8 +295,13 @@ defmodule Commanded.ProcessManagers.ProcessRouter do %RecordedEvent{data: data} = event %State{process_manager_module: process_manager_module} = state + additional_metadata = Map.take(state, [:application]) + + enriched_metadata = + RecordedEvent.enrich_metadata(event, additional_metadata: additional_metadata) + try do - case process_manager_module.interested?(data) do + case process_manager_module.interested?(data, enriched_metadata) do {:start, []} -> ack_and_continue(event, state) @@ -417,12 +422,12 @@ defmodule Commanded.ProcessManagers.ProcessRouter do ack_and_continue(failed_event, state) {:stop, reason} -> - Logger.warn(fn -> describe(state) <> " has requested to stop: #{inspect(error)}" end) + Logger.warning(fn -> describe(state) <> " has requested to stop: #{inspect(error)}" end) {:stop, reason, state} invalid -> - Logger.warn(fn -> + Logger.warning(fn -> describe(state) <> " returned an invalid error response: #{inspect(invalid)}" end) diff --git a/lib/commanded/pubsub.ex b/lib/commanded/pubsub.ex index 0acaa586..6b58be9f 100644 --- a/lib/commanded/pubsub.ex +++ b/lib/commanded/pubsub.ex @@ -46,15 +46,7 @@ defmodule Commanded.PubSub do config -> if Keyword.keyword?(config) do - case Keyword.get(config, :phoenix_pubsub) do - nil -> - raise ArgumentError, - "invalid Phoenix pubsub configuration #{inspect(config)} for application " <> - inspect(application) - - phoenix_pubsub_config -> - {Commanded.PubSub.PhoenixPubSub, phoenix_pubsub_config} - end + phoenix_pubsub_config(application, config) else raise ArgumentError, "invalid pubsub configured for application " <> @@ -62,4 +54,16 @@ defmodule Commanded.PubSub do end end end + + defp phoenix_pubsub_config(application, config) do + case Keyword.get(config, :phoenix_pubsub) do + nil -> + raise ArgumentError, + "invalid Phoenix pubsub configuration #{inspect(config)} for application " <> + inspect(application) + + phoenix_pubsub_config -> + {Commanded.PubSub.PhoenixPubSub, phoenix_pubsub_config} + end + end end diff --git a/lib/commanded/registration/local_registry.ex b/lib/commanded/registration/local_registry.ex index 3016216f..d49ba16c 100644 --- a/lib/commanded/registration/local_registry.ex +++ b/lib/commanded/registration/local_registry.ex @@ -112,7 +112,7 @@ defmodule Commanded.Registration.LocalRegistry do @doc false def handle_info(message, state) do - Logger.debug(fn -> "received unexpected message in handle_info/2: " <> inspect(message) end) + Logger.debug("received unexpected message in handle_info/2: " <> inspect(message)) {:noreply, state} end diff --git a/mix.exs b/mix.exs index b5ed37a9..16146d70 100644 --- a/mix.exs +++ b/mix.exs @@ -1,13 +1,13 @@ defmodule Commanded.Mixfile do use Mix.Project - @version "1.4.1" + @version "1.4.3" def project do [ app: :commanded, version: @version, - elixir: "~> 1.10", + elixir: "~> 1.12", elixirc_paths: elixirc_paths(Mix.env()), deps: deps(), description: description(), @@ -62,13 +62,13 @@ defmodule Commanded.Mixfile do {:telemetry_registry, "~> 0.2 or ~> 0.3"}, # Optional dependencies - {:jason, "~> 1.3", optional: true}, + {:jason, "~> 1.4", optional: true}, {:phoenix_pubsub, "~> 2.1", optional: true}, # Build and test tools {:benchfella, "~> 0.3", only: :bench}, - {:credo, "~> 1.6", only: [:dev, :test], runtime: false}, - {:dialyxir, "~> 1.2", only: [:dev, :test], runtime: false}, + {:credo, "~> 1.7", only: [:dev, :test], runtime: false}, + {:dialyxir, "~> 1.3", only: [:dev, :test], runtime: false}, {:ex_doc, ">= 0.0.0", only: :dev}, {:local_cluster, "~> 1.2", only: :test, runtime: false}, {:mix_test_watch, "~> 1.1", only: :dev}, @@ -233,9 +233,7 @@ defmodule Commanded.Mixfile do maintainers: ["Ben Smith"], licenses: ["MIT"], links: %{ - "GitHub" => "https://github.com/commanded/commanded", - "Docs" => "https://hexdocs.pm/commanded/", - "Sponsor" => "https://opencollective.com/commanded" + "GitHub" => "https://github.com/commanded/commanded" } ] end diff --git a/mix.lock b/mix.lock index 6212c326..cec1b7b1 100644 --- a/mix.lock +++ b/mix.lock @@ -1,23 +1,23 @@ %{ "backoff": {:hex, :backoff, "1.1.6", "83b72ed2108ba1ee8f7d1c22e0b4a00cfe3593a67dbc792799e8cce9f42f796b", [:rebar3], [], "hexpm", "cf0cfff8995fb20562f822e5cc47d8ccf664c5ecdc26a684cbe85c225f9d7c39"}, "benchfella": {:hex, :benchfella, "0.3.5", "b2122c234117b3f91ed7b43b6e915e19e1ab216971154acd0a80ce0e9b8c05f5", [:mix], [], "hexpm", "23f27cbc482cbac03fc8926441eb60a5e111759c17642bac005c3225f5eb809d"}, - "bunt": {:hex, :bunt, "0.2.1", "e2d4792f7bc0ced7583ab54922808919518d0e57ee162901a16a1b6664ef3b14", [:mix], [], "hexpm", "a330bfb4245239787b15005e66ae6845c9cd524a288f0d141c148b02603777a5"}, - "credo": {:hex, :credo, "1.6.7", "323f5734350fd23a456f2688b9430e7d517afb313fbd38671b8a4449798a7854", [:mix], [{:bunt, "~> 0.2.1", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "41e110bfb007f7eda7f897c10bf019ceab9a0b269ce79f015d54b0dcf4fc7dd3"}, - "dialyxir": {:hex, :dialyxir, "1.2.0", "58344b3e87c2e7095304c81a9ae65cb68b613e28340690dfe1a5597fd08dec37", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "61072136427a851674cab81762be4dbeae7679f85b1272b6d25c3a839aff8463"}, - "earmark_parser": {:hex, :earmark_parser, "1.4.26", "f4291134583f373c7d8755566122908eb9662df4c4b63caa66a0eabe06569b0a", [:mix], [], "hexpm", "48d460899f8a0c52c5470676611c01f64f3337bad0b26ddab43648428d94aabc"}, + "bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"}, + "credo": {:hex, :credo, "1.7.3", "05bb11eaf2f2b8db370ecaa6a6bda2ec49b2acd5e0418bc106b73b07128c0436", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "35ea675a094c934c22fb1dca3696f3c31f2728ae6ef5a53b5d648c11180a4535"}, + "dialyxir": {:hex, :dialyxir, "1.4.3", "edd0124f358f0b9e95bfe53a9fcf806d615d8f838e2202a9f430d59566b6b53b", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "bf2cfb75cd5c5006bec30141b131663299c661a864ec7fbbc72dfa557487a986"}, + "earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"}, "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, - "ex_doc": {:hex, :ex_doc, "0.28.5", "3e52a6d2130ce74d096859e477b97080c156d0926701c13870a4e1f752363279", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "d2c4b07133113e9aa3e9ba27efb9088ba900e9e51caa383919676afdf09ab181"}, + "ex_doc": {:hex, :ex_doc, "0.31.1", "8a2355ac42b1cc7b2379da9e40243f2670143721dd50748bf6c3b1184dae2089", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.1", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "3178c3a407c557d8343479e1ff117a96fd31bafe52a039079593fb0524ef61b0"}, "file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"}, "global_flags": {:hex, :global_flags, "1.0.0", "ee6b864979a1fb38d1fbc67838565644baf632212bce864adca21042df036433", [:rebar3], [], "hexpm", "85d944cecd0f8f96b20ce70b5b16ebccedfcd25e744376b131e89ce61ba93176"}, - "jason": {:hex, :jason, "1.3.0", "fa6b82a934feb176263ad2df0dbd91bf633d4a46ebfdffea0c8ae82953714946", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "53fc1f51255390e0ec7e50f9cb41e751c260d065dcba2bf0d08dc51a4002c2ac"}, + "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, "local_cluster": {:hex, :local_cluster, "1.2.1", "8eab3b8a387680f0872eacfb1a8bd5a91cb1d4d61256eec6a655b07ac7030c73", [:mix], [{:global_flags, "~> 1.0", [hex: :global_flags, repo: "hexpm", optional: false]}], "hexpm", "aae80c9bc92c911cb0be085fdeea2a9f5b88f81b6bec2ff1fec244bb0acc232c"}, - "makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"}, - "makeup_elixir": {:hex, :makeup_elixir, "0.16.0", "f8c570a0d33f8039513fbccaf7108c5d750f47d8defd44088371191b76492b0b", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "28b2cbdc13960a46ae9a8858c4bebdec3c9a6d7b4b9e7f4ed1502f8159f338e7"}, - "makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"}, - "mix_test_watch": {:hex, :mix_test_watch, "1.1.0", "330bb91c8ed271fe408c42d07e0773340a7938d8a0d281d57a14243eae9dc8c3", [:mix], [{:file_system, "~> 0.2.1 or ~> 0.3", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm", "52b6b1c476cbb70fd899ca5394506482f12e5f6b0d6acff9df95c7f1e0812ec3"}, - "mox": {:hex, :mox, "1.0.2", "dc2057289ac478b35760ba74165b4b3f402f68803dd5aecd3bfd19c183815d64", [:mix], [], "hexpm", "f9864921b3aaf763c8741b5b8e6f908f44566f1e427b2630e89e9a73b981fef2"}, - "nimble_parsec": {:hex, :nimble_parsec, "1.2.3", "244836e6e3f1200c7f30cb56733fd808744eca61fd182f731eac4af635cc6d0b", [:mix], [], "hexpm", "c8d789e39b9131acf7b99291e93dae60ab48ef14a7ee9d58c6964f59efb570b0"}, - "phoenix_pubsub": {:hex, :phoenix_pubsub, "2.1.1", "ba04e489ef03763bf28a17eb2eaddc2c20c6d217e2150a61e3298b0f4c2012b5", [:mix], [], "hexpm", "81367c6d1eea5878ad726be80808eb5a787a23dee699f96e72b1109c57cdd8d9"}, - "telemetry": {:hex, :telemetry, "1.1.0", "a589817034a27eab11144ad24d5c0f9fab1f58173274b1e9bae7074af9cbee51", [:rebar3], [], "hexpm", "b727b2a1f75614774cff2d7565b64d0dfa5bd52ba517f16543e6fc7efcc0df48"}, - "telemetry_registry": {:hex, :telemetry_registry, "0.3.0", "6768f151ea53fc0fbca70dbff5b20a8d663ee4e0c0b2ae589590e08658e76f1e", [:mix, :rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "492e2adbc609f3e79ece7f29fec363a97a2c484ac78a83098535d6564781e917"}, + "makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"}, + "makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"}, + "makeup_erlang": {:hex, :makeup_erlang, "0.1.3", "d684f4bac8690e70b06eb52dad65d26de2eefa44cd19d64a8095e1417df7c8fd", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "b78dc853d2e670ff6390b605d807263bf606da3c82be37f9d7f68635bd886fc9"}, + "mix_test_watch": {:hex, :mix_test_watch, "1.1.1", "eee6fc570d77ad6851c7bc08de420a47fd1e449ef5ccfa6a77ef68b72e7e51ad", [:mix], [{:file_system, "~> 0.2.1 or ~> 0.3", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm", "f82262b54dee533467021723892e15c3267349849f1f737526523ecba4e6baae"}, + "mox": {:hex, :mox, "1.1.0", "0f5e399649ce9ab7602f72e718305c0f9cdc351190f72844599545e4996af73c", [:mix], [], "hexpm", "d44474c50be02d5b72131070281a5d3895c0e7a95c780e90bc0cfe712f633a13"}, + "nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"}, + "phoenix_pubsub": {:hex, :phoenix_pubsub, "2.1.3", "3168d78ba41835aecad272d5e8cd51aa87a7ac9eb836eabc42f6e57538e3731d", [:mix], [], "hexpm", "bba06bc1dcfd8cb086759f0edc94a8ba2bc8896d5331a1e2c2902bf8e36ee502"}, + "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, + "telemetry_registry": {:hex, :telemetry_registry, "0.3.1", "14a3319a7d9027bdbff7ebcacf1a438f5f5c903057b93aee484cca26f05bdcba", [:mix, :rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "6d0ca77b691cf854ed074b459a93b87f4c7f5512f8f7743c635ca83da81f939e"}, } diff --git a/test/aggregates/aggregate_concurrency_test.exs b/test/aggregates/aggregate_concurrency_test.exs index 446524a5..bf302907 100644 --- a/test/aggregates/aggregate_concurrency_test.exs +++ b/test/aggregates/aggregate_concurrency_test.exs @@ -47,42 +47,36 @@ defmodule Commanded.Aggregates.AggregateConcurrencyTest do } # Fail to append once - expect(MockEventStore, :append_to_stream, fn _event_store_meta, - ^account_number, - 1, - _event_data -> - {:error, :wrong_expected_version} + expect(MockEventStore, :append_to_stream, fn + _event_store_meta, ^account_number, 1, _event_data, _opts -> + {:error, :wrong_expected_version} end) # Return "missing" event - expect(MockEventStore, :stream_forward, fn _event_store_meta, - ^account_number, - 2, - _batch_size -> - [ - %RecordedEvent{ - event_id: UUID.uuid4(), - event_number: 2, - stream_id: account_number, - stream_version: 2, - event_type: "Elixir.Commanded.ExampleDomain.BankAccount.Events.MoneyDeposited", - data: %MoneyDeposited{ - account_number: account_number, - transfer_uuid: UUID.uuid4(), - amount: 500, - balance: 1_500 - }, - metadata: %{} - } - ] + expect(MockEventStore, :stream_forward, fn + _event_store_meta, ^account_number, 2, _batch_size -> + [ + %RecordedEvent{ + event_id: UUID.uuid4(), + event_number: 2, + stream_id: account_number, + stream_version: 2, + event_type: "Elixir.Commanded.ExampleDomain.BankAccount.Events.MoneyDeposited", + data: %MoneyDeposited{ + account_number: account_number, + transfer_uuid: UUID.uuid4(), + amount: 500, + balance: 1_500 + }, + metadata: %{} + } + ] end) # Succeed on second attempt - expect(MockEventStore, :append_to_stream, fn _event_store_meta, - ^account_number, - 2, - _event_data -> - :ok + expect(MockEventStore, :append_to_stream, fn + _event_store_meta, ^account_number, 2, _event_data, _opts -> + :ok end) assert {:ok, 3, _events} = @@ -99,18 +93,14 @@ defmodule Commanded.Aggregates.AggregateConcurrencyTest do test "should error after too many attempts", %{account_number: account_number} do # Fail to append to stream - expect(MockEventStore, :append_to_stream, 6, fn _event_store_meta, - ^account_number, - 1, - _event_data -> - {:error, :wrong_expected_version} + expect(MockEventStore, :append_to_stream, 6, fn + _event_store_meta, ^account_number, 1, _event_data, _opts -> + {:error, :wrong_expected_version} end) - expect(MockEventStore, :stream_forward, 6, fn _event_store_meta, - ^account_number, - 2, - _batch_size -> - [] + expect(MockEventStore, :stream_forward, 6, fn + _event_store_meta, ^account_number, 2, _batch_size -> + [] end) command = %DepositMoney{ @@ -133,18 +123,14 @@ defmodule Commanded.Aggregates.AggregateConcurrencyTest do defp open_account(_context) do account_number = UUID.uuid4() - expect(MockEventStore, :stream_forward, fn _event_store_meta, - ^account_number, - 1, - _batch_size -> - [] + expect(MockEventStore, :stream_forward, fn + _event_store_meta, ^account_number, 1, _batch_size -> + [] end) - expect(MockEventStore, :append_to_stream, fn _event_store_meta, - ^account_number, - 0, - _event_data -> - :ok + expect(MockEventStore, :append_to_stream, fn + _event_store_meta, ^account_number, 0, _event_data, _opts -> + :ok end) {:ok, ^account_number} = diff --git a/test/event/handle_event_test.exs b/test/event/handle_event_test.exs index 0da0eb52..4b1828b8 100644 --- a/test/event/handle_event_test.exs +++ b/test/event/handle_event_test.exs @@ -85,6 +85,25 @@ defmodule Commanded.Event.HandleEventTest do refute_receive {:event, _handler, _event, _metadata} end + test "should handle exit signals", %{handler: handler} do + import ExUnit.CaptureLog + + ref = Process.monitor(handler) + + spawn(fn -> + Process.exit(handler, :shutdown) + end) + + send_unexpected_mesage = fn -> + send(handler, :unexpected_message) + assert_receive {:DOWN, ^ref, :process, ^handler, :shutdown} + end + + log = capture_log(send_unexpected_mesage) + refute log =~ "received unexpected message" + refute log =~ ":EXIT" + end + test "should ignore unexpected messages", %{handler: handler} do import ExUnit.CaptureLog diff --git a/test/event_store/recorded_event_test.exs b/test/event_store/recorded_event_test.exs new file mode 100644 index 00000000..65d3439e --- /dev/null +++ b/test/event_store/recorded_event_test.exs @@ -0,0 +1,61 @@ +defmodule Commanded.EventStore.RecordedEventTest do + use ExUnit.Case + + alias Commanded.EventStore.RecordedEvent + alias Commanded.Helpers.EventFactory + + defmodule BankAccountOpened do + @derive Jason.Encoder + defstruct [:account_number, :initial_balance] + end + + setup do + [event] = + EventFactory.map_to_recorded_events( + [ + %BankAccountOpened{account_number: "123", initial_balance: 1_000} + ], + 1, + metadata: %{"key1" => "value1", "key2" => "value2"} + ) + + [event: event] + end + + describe "RecordedEvent struct" do + test "enrich_metadata/2 should add a number of fields to the metadata", %{event: event} do + %RecordedEvent{ + event_id: event_id, + event_number: event_number, + stream_id: stream_id, + stream_version: stream_version, + correlation_id: correlation_id, + causation_id: causation_id, + created_at: created_at + } = event + + enriched_metadata = + RecordedEvent.enrich_metadata(event, + additional_metadata: %{ + application: ExampleApplication + } + ) + + assert enriched_metadata == %{ + # Event string-keyed metadata + "key1" => "value1", + "key2" => "value2", + # Standard event fields + event_id: event_id, + event_number: event_number, + stream_id: stream_id, + stream_version: stream_version, + correlation_id: correlation_id, + causation_id: causation_id, + created_at: created_at, + # Additional field + application: ExampleApplication + } + end + end +end diff --git a/test/event_store/support/subscriber.ex b/test/event_store/support/subscriber.ex index 9432147d..51d0a6d4 100644 --- a/test/event_store/support/subscriber.ex +++ b/test/event_store/support/subscriber.ex @@ -29,8 +29,12 @@ defmodule Commanded.EventStore.Subscriber do end def init(%State{} = state) do - %State{event_store: event_store, event_store_meta: event_store_meta, subscription_opts: opts} = - state + %State{ + event_store: event_store, + event_store_meta: event_store_meta, + owner: owner, + subscription_opts: opts + } = state case event_store.subscribe_to(event_store_meta, :all, "subscriber", self(), :origin, opts) do {:ok, subscription} -> @@ -39,7 +43,9 @@ defmodule Commanded.EventStore.Subscriber do {:ok, state} {:error, error} -> - {:stop, error} + send(owner, {:subscribe_error, error, self()}) + + {:ok, state} end end @@ -79,7 +85,7 @@ defmodule Commanded.EventStore.Subscriber do def handle_info({:subscribed, subscription}, %State{subscription: subscription} = state) do %State{owner: owner} = state - send(owner, {:subscribed, subscription}) + send(owner, {:subscribed, self(), subscription}) {:noreply, %State{state | subscribed?: true}} end diff --git a/test/event_store/support/subscription_test_case.ex b/test/event_store/support/subscription_test_case.ex index 917669e6..d7a2d8db 100644 --- a/test/event_store/support/subscription_test_case.ex +++ b/test/event_store/support/subscription_test_case.ex @@ -327,60 +327,69 @@ defmodule Commanded.EventStore.SubscriptionTestCase do event_store: event_store, event_store_meta: event_store_meta } do - {:ok, _subscriber1} = + {:ok, subscriber1} = Subscriber.start_link(event_store, event_store_meta, self(), concurrency_limit: 2) - {:ok, _subscriber2} = + {:ok, subscriber2} = Subscriber.start_link(event_store, event_store_meta, self(), concurrency_limit: 2) - assert_receive {:subscribed, _subscription1} - assert_receive {:subscribed, _subscription2} - refute_receive {:subscribed, _subscription} + assert_receive {:subscribed, ^subscriber1, _subscription1} + assert_receive {:subscribed, ^subscriber2, _subscription2} + refute_receive {:subscribed, _subscriber, _subscription} end test "should prevent too many subscribers to single subscription", %{ event_store: event_store, event_store_meta: event_store_meta } do - {:ok, _subscriber} = Subscriber.start_link(event_store, event_store_meta, self()) + {:ok, subscriber1} = Subscriber.start_link(event_store, event_store_meta, self()) - assert {:error, :subscription_already_exists} = - Subscriber.start_link(event_store, event_store_meta, self()) + {:ok, subscriber2} = Subscriber.start_link(event_store, event_store_meta, self()) + + assert_receive {:subscribed, ^subscriber1, _subscription1} + assert_receive {:subscribe_error, :subscription_already_exists, ^subscriber2} + refute_receive {:subscribed, _subscriber, _subscription} end test "should prevent too many subscribers to subscription with concurrency limit", %{ event_store: event_store, event_store_meta: event_store_meta } do - {:ok, _subscriber1} = + {:ok, subscriber1} = Subscriber.start_link(event_store, event_store_meta, self(), concurrency_limit: 3) - {:ok, _subscriber2} = + {:ok, subscriber2} = + Subscriber.start_link(event_store, event_store_meta, self(), concurrency_limit: 3) + + {:ok, subscriber3} = Subscriber.start_link(event_store, event_store_meta, self(), concurrency_limit: 3) - {:ok, _subscriber3} = + {:ok, subscriber4} = Subscriber.start_link(event_store, event_store_meta, self(), concurrency_limit: 3) - assert {:error, :too_many_subscribers} = - Subscriber.start_link(event_store, event_store_meta, self(), concurrency_limit: 3) + assert_receive {:subscribed, ^subscriber1, _subscription1} + assert_receive {:subscribed, ^subscriber2, _subscription1} + assert_receive {:subscribed, ^subscriber3, _subscription1} + assert_receive {:subscribe_error, :too_many_subscribers, ^subscriber4} + refute_receive {:subscribed, _subscriber, _subscription} end test "should distribute events amongst subscribers", %{ event_store: event_store, event_store_meta: event_store_meta } do - {:ok, _subscriber1} = + {:ok, subscriber1} = Subscriber.start_link(event_store, event_store_meta, self(), concurrency_limit: 3) - {:ok, _subscriber2} = + {:ok, subscriber2} = Subscriber.start_link(event_store, event_store_meta, self(), concurrency_limit: 3) - {:ok, _subscriber2} = + {:ok, subscriber3} = Subscriber.start_link(event_store, event_store_meta, self(), concurrency_limit: 3) - assert_receive {:subscribed, _subscription1} - assert_receive {:subscribed, _subscription2} - assert_receive {:subscribed, _subscription3} + assert_receive {:subscribed, ^subscriber1, _subscription1} + assert_receive {:subscribed, ^subscriber2, _subscription2} + assert_receive {:subscribed, ^subscriber3, _subscription3} :ok = event_store.append_to_stream(event_store_meta, "stream1", 0, build_events(6)) @@ -408,13 +417,13 @@ defmodule Commanded.EventStore.SubscriptionTestCase do partition_by: fn %RecordedEvent{stream_id: stream_id} -> stream_id end ] - {:ok, _subscriber1} = Subscriber.start_link(event_store, event_store_meta, self(), opts) - {:ok, _subscriber2} = Subscriber.start_link(event_store, event_store_meta, self(), opts) - {:ok, _subscriber3} = Subscriber.start_link(event_store, event_store_meta, self(), opts) + {:ok, subscriber1} = Subscriber.start_link(event_store, event_store_meta, self(), opts) + {:ok, subscriber2} = Subscriber.start_link(event_store, event_store_meta, self(), opts) + {:ok, subscriber3} = Subscriber.start_link(event_store, event_store_meta, self(), opts) - assert_receive {:subscribed, _subscription1} - assert_receive {:subscribed, _subscription2} - assert_receive {:subscribed, _subscription3} + assert_receive {:subscribed, ^subscriber1, _subscription1} + assert_receive {:subscribed, ^subscriber2, _subscription2} + assert_receive {:subscribed, ^subscriber3, _subscription3} :ok = event_store.append_to_stream(event_store_meta, "stream0", 0, build_events(2)) :ok = event_store.append_to_stream(event_store_meta, "stream1", 0, build_events(2)) @@ -516,8 +525,8 @@ defmodule Commanded.EventStore.SubscriptionTestCase do {:ok, subscriber2} = Subscriber.start_link(event_store, event_store_meta, self(), concurrency_limit: 2) - assert_receive {:subscribed, _subscription1} - assert_receive {:subscribed, _subscription2} + assert_receive {:subscribed, ^subscriber1, _subscription1} + assert_receive {:subscribed, ^subscriber2, _subscription2} :ok = event_store.append_to_stream(event_store_meta, "stream1", 0, build_events(2)) @@ -649,7 +658,7 @@ defmodule Commanded.EventStore.SubscriptionTestCase do {:ok, subscriber1} = Subscriber.start_link(event_store, event_store_meta, self()) - assert_receive {:subscribed, _subscription} + assert_receive {:subscribed, ^subscriber1, _subscription} assert_receive {:events, ^subscriber1, received_events} assert length(received_events) == 1 assert Enum.map(received_events, & &1.stream_id) == ["stream1"] @@ -666,7 +675,7 @@ defmodule Commanded.EventStore.SubscriptionTestCase do {:ok, subscriber2} = Subscriber.start_link(event_store, event_store_meta, self()) - assert_receive {:subscribed, _subscription} + assert_receive {:subscribed, ^subscriber2, _subscription} refute_receive {:events, _subscriber, _received_events} :ok = event_store.append_to_stream(event_store_meta, "stream3", 0, build_events(1)) @@ -689,7 +698,7 @@ defmodule Commanded.EventStore.SubscriptionTestCase do {:ok, subscriber1} = Subscriber.start_link(event_store, event_store_meta, self()) - assert_receive {:subscribed, _subscription} + assert_receive {:subscribed, ^subscriber1, _subscription} assert_receive {:events, ^subscriber1, [%RecordedEvent{event_number: 1, stream_id: "stream1"}] = received_events} @@ -703,7 +712,7 @@ defmodule Commanded.EventStore.SubscriptionTestCase do {:ok, subscriber2} = Subscriber.start_link(event_store, event_store_meta, self()) - assert_receive {:subscribed, _subscription} + assert_receive {:subscribed, ^subscriber2, _subscription} # Receive event #2 again because it wasn't ack'd assert_receive {:events, ^subscriber2, @@ -731,7 +740,7 @@ defmodule Commanded.EventStore.SubscriptionTestCase do ref = Process.monitor(subscriber) - assert_receive {:subscribed, subscription} + assert_receive {:subscribed, ^subscriber, subscription} ProcessHelper.shutdown(subscription) @@ -746,7 +755,7 @@ defmodule Commanded.EventStore.SubscriptionTestCase do } do {:ok, subscriber} = Subscriber.start_link(event_store, event_store_meta, self()) - assert_receive {:subscribed, subscription} + assert_receive {:subscribed, ^subscriber, subscription} ref = Process.monitor(subscription) diff --git a/test/example_domain/bank_account/bank_account.ex b/test/example_domain/bank_account/bank_account.ex index 62cf9e56..338e3b6b 100644 --- a/test/example_domain/bank_account/bank_account.ex +++ b/test/example_domain/bank_account/bank_account.ex @@ -16,7 +16,7 @@ defmodule Commanded.ExampleDomain.BankAccount do end defmodule WithdrawMoney do - defstruct [:account_number, :transfer_uuid, :amount] + defstruct [:account_number, :transfer_uuid, :amount, :by_user] end defmodule CloseAccount do diff --git a/test/example_domain/money_transfer/transfer_money_process_manager.ex b/test/example_domain/money_transfer/transfer_money_process_manager.ex index 236f2f9e..95f951a8 100644 --- a/test/example_domain/money_transfer/transfer_money_process_manager.ex +++ b/test/example_domain/money_transfer/transfer_money_process_manager.ex @@ -11,7 +11,7 @@ defmodule Commanded.ExampleDomain.TransferMoneyProcessManager do alias Commanded.ExampleDomain.TransferMoneyProcessManager @derive Jason.Encoder - defstruct [:transfer_uuid, :debit_account, :credit_account, :amount, :status] + defstruct [:transfer_uuid, :debit_account, :credit_account, :amount, :status, :user_uuid] def interested?(%MoneyTransferRequested{transfer_uuid: transfer_uuid}), do: {:start, transfer_uuid} @@ -22,6 +22,25 @@ defmodule Commanded.ExampleDomain.TransferMoneyProcessManager do def interested?(%MoneyDeposited{transfer_uuid: transfer_uuid}), do: {:continue, transfer_uuid} + def handle( + %TransferMoneyProcessManager{}, + %MoneyTransferRequested{} = event, + %{"user_uuid" => by_user} = _metadata + ) do + %MoneyTransferRequested{ + transfer_uuid: transfer_uuid, + debit_account: debit_account, + amount: amount + } = event + + %WithdrawMoney{ + account_number: debit_account, + transfer_uuid: transfer_uuid, + amount: amount, + by_user: by_user + } + end + def handle(%TransferMoneyProcessManager{}, %MoneyTransferRequested{} = event) do %MoneyTransferRequested{ transfer_uuid: transfer_uuid, @@ -29,7 +48,11 @@ defmodule Commanded.ExampleDomain.TransferMoneyProcessManager do amount: amount } = event - %WithdrawMoney{account_number: debit_account, transfer_uuid: transfer_uuid, amount: amount} + %WithdrawMoney{ + account_number: debit_account, + transfer_uuid: transfer_uuid, + amount: amount + } end def handle(%TransferMoneyProcessManager{} = pm, %MoneyWithdrawn{}) do @@ -44,6 +67,29 @@ defmodule Commanded.ExampleDomain.TransferMoneyProcessManager do ## State mutators + def apply( + %TransferMoneyProcessManager{} = transfer, + %MoneyTransferRequested{} = event, + %{"user_uuid" => user_uuid} = _metadata + ) do + %MoneyTransferRequested{ + transfer_uuid: transfer_uuid, + debit_account: debit_account, + credit_account: credit_account, + amount: amount + } = event + + %TransferMoneyProcessManager{ + transfer + | transfer_uuid: transfer_uuid, + debit_account: debit_account, + credit_account: credit_account, + amount: amount, + status: :withdraw_money_from_debit_account, + user_uuid: user_uuid + } + end + def apply(%TransferMoneyProcessManager{} = transfer, %MoneyTransferRequested{} = event) do %MoneyTransferRequested{ transfer_uuid: transfer_uuid, diff --git a/test/process_managers/process_manager_after_command_text.exs b/test/process_managers/process_manager_after_command_test.exs similarity index 92% rename from test/process_managers/process_manager_after_command_text.exs rename to test/process_managers/process_manager_after_command_test.exs index db150f48..f4ef7f69 100644 --- a/test/process_managers/process_manager_after_command_text.exs +++ b/test/process_managers/process_manager_after_command_test.exs @@ -24,10 +24,14 @@ defmodule Commanded.ProcessManagers.ProcessManagerAfterCommandTest do :ok end - test "should stop process manager instance after specified command" do + test "should stop process manager instance after specified command and work with both after_command/2 and after_command/3 callbacks" do aggregate_uuid = UUID.uuid4() source_uuid = "\"AfterCommandProcessManager\"-\"#{aggregate_uuid}\"" + notify_to = self() |> :erlang.pid_to_list() + + metadata = %{"notify_to" => notify_to} + {:ok, process_router} = AfterCommandProcessManager.start_link() :ok = ExampleRouter.dispatch(%Start{aggregate_uuid: aggregate_uuid}, application: ExampleApp) @@ -40,7 +44,8 @@ defmodule Commanded.ProcessManagers.ProcessManagerAfterCommandTest do interesting: 10, uninteresting: 1 }, - application: ExampleApp + application: ExampleApp, + metadata: metadata ) # Process state snapshot should be created @@ -68,6 +73,8 @@ defmodule Commanded.ProcessManagers.ProcessManagerAfterCommandTest do %Stopped{aggregate_uuid: aggregate_uuid} ] + assert_receive :metadata_available + Wait.until(fn -> # Process instance should be stopped assert ProcessRouter.process_instance(process_router, aggregate_uuid) == diff --git a/test/process_managers/process_manager_error_handling_state_test.exs b/test/process_managers/process_manager_error_handling_state_test.exs index d4ad3847..844286c2 100644 --- a/test/process_managers/process_manager_error_handling_state_test.exs +++ b/test/process_managers/process_manager_error_handling_state_test.exs @@ -1,28 +1,71 @@ defmodule Commanded.ProcessManager.ProcessManagerErrorHandlingStateTest do use ExUnit.Case - alias Commanded.ProcessManagers.ErrorAggregate.Commands.StartProcess - alias Commanded.ProcessManagers.ErrorRouter - alias Commanded.ProcessManagers.ExampleApp - alias Commanded.ProcessManagers.StateErrorHandlingProcessManager + alias Commanded.ProcessManagers.ErrorAggregate.Commands.{AttemptProcess, StartProcess} + alias Commanded.ProcessManagers.ErrorAggregate.Events.ProcessStarted + + alias Commanded.ProcessManagers.{ + ErrorApp, + ErrorRouter, + FailureContext, + StateErrorHandlingProcessManager + } + + alias Commanded.EventStore.RecordedEvent alias Commanded.UUID setup do - start_supervised!(ExampleApp) + start_supervised!(ErrorApp) start_supervised!(StateErrorHandlingProcessManager) :ok end test "should receive the process instance state in the context" do - command = %StartProcess{ - process_uuid: UUID.uuid4(), - reply_to: reply_to() - } + process_uuid = UUID.uuid4() + reply_to = reply_to() + + command = %StartProcess{process_uuid: process_uuid, reply_to: reply_to} + + assert :ok = + ErrorRouter.dispatch(command, + application: ErrorApp, + metadata: %{"user_id" => 1234} + ) + + assert_receive {:error, :failed, failed_command, failure_context} - assert :ok = ErrorRouter.dispatch(command, application: ExampleApp) + assert match?(%AttemptProcess{process_uuid: ^process_uuid}, failed_command) - assert_receive :got_from_context + assert match?( + %FailureContext{ + enriched_metadata: %{ + "user_id" => 1234, + application: ErrorApp, + causation_id: _causation_id, + correlation_id: _correlation_id, + created_at: %DateTime{}, + event_id: _event_id, + event_number: _event_number, + stream_id: _stream_id, + stream_version: _stream_version + }, + last_event: %RecordedEvent{ + data: %ProcessStarted{ + process_uuid: ^process_uuid, + reply_to: ^reply_to + } + }, + process_manager_state: %StateErrorHandlingProcessManager{ + process_uuid: ^process_uuid, + reply_to: ^reply_to + }, + stacktrace: nil, + context: %{}, + pending_commands: [] + }, + failure_context + ) end defp reply_to, do: :erlang.pid_to_list(self()) diff --git a/test/process_managers/process_manager_instance_test.exs b/test/process_managers/process_manager_instance_test.exs index 14367e11..7389efbf 100644 --- a/test/process_managers/process_manager_instance_test.exs +++ b/test/process_managers/process_manager_instance_test.exs @@ -33,11 +33,14 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstanceTest do end describe "process manager instance" do - test "handles an event and dispatches a command" do + test "handles an event and dispatches a command copying existing metadata" do transfer_uuid = UUID.uuid4() debit_account = UUID.uuid4() credit_account = UUID.uuid4() expected_source_uuid = "\"TransferMoneyProcessManager\"-\"#{transfer_uuid}\"" + user_uuid = UUID.uuid4() + + metadata = %{"user_uuid" => user_uuid} expect(MockEventStore, :read_snapshot, fn _adapter_meta, ^expected_source_uuid -> {:error, :snapshot_not_found} @@ -50,7 +53,8 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstanceTest do credit_account: ^credit_account, debit_account: ^debit_account, status: :withdraw_money_from_debit_account, - transfer_uuid: ^transfer_uuid + transfer_uuid: ^transfer_uuid, + user_uuid: ^user_uuid }, source_type: "Elixir.Commanded.ExampleDomain.TransferMoneyProcessManager", source_uuid: ^expected_source_uuid, @@ -60,25 +64,31 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstanceTest do :ok end) - expect(MockApplication, :dispatch, fn command, _opts -> + expect(MockApplication, :dispatch, fn command, opts -> assert %WithdrawMoney{ account_number: ^debit_account, transfer_uuid: ^transfer_uuid, - amount: 100 + amount: 100, + by_user: ^user_uuid } = command + assert metadata == Keyword.get(opts, :metadata) + :ok end) {:ok, instance} = start_process_manager_instance(transfer_uuid) event = - to_recorded_event(%MoneyTransferRequested{ - transfer_uuid: transfer_uuid, - debit_account: debit_account, - credit_account: credit_account, - amount: 100 - }) + to_recorded_event( + %MoneyTransferRequested{ + transfer_uuid: transfer_uuid, + debit_account: debit_account, + credit_account: credit_account, + amount: 100 + }, + metadata + ) :ok = ProcessManagerInstance.process_event(instance, event) @@ -190,6 +200,10 @@ defmodule Commanded.ProcessManagers.ProcessManagerInstanceTest do %RecordedEvent{event_number: 1, stream_id: "stream-id", stream_version: 1, data: event} end + defp to_recorded_event(event, metadata) do + %{to_recorded_event(event) | metadata: metadata} + end + defp mock_event_store do stub(MockEventStore, :subscribe_to, fn _event_store, :all, name, pid, :origin, _opts -> diff --git a/test/process_managers/process_manager_routing_test.exs b/test/process_managers/process_manager_routing_test.exs index 80834f45..8b58c327 100644 --- a/test/process_managers/process_manager_routing_test.exs +++ b/test/process_managers/process_manager_routing_test.exs @@ -8,6 +8,7 @@ defmodule Commanded.ProcessManagers.ProcessManagerRoutingTest do alias Commanded.ProcessManagers.RoutingProcessManager.Continued alias Commanded.ProcessManagers.RoutingProcessManager.Errored alias Commanded.ProcessManagers.RoutingProcessManager.Started + alias Commanded.ProcessManagers.RoutingProcessManager.StartedFromMetadata alias Commanded.ProcessManagers.RoutingProcessManager.Stopped alias Commanded.UUID @@ -28,6 +29,19 @@ defmodule Commanded.ProcessManagers.ProcessManagerRoutingTest do assert_receive {:started, ^instance} end + test "should start instance on `:start` and be able to access metadata", + %{ + pid: pid, + process_uuid: process_uuid + } do + metadata = %{"process_uuid" => process_uuid} + send_events(pid, [%StartedFromMetadata{reply_to: self()}], 1, metadata: metadata) + + instance = wait_for_instance(pid, process_uuid) + + assert_receive {:started, ^instance} + end + test "should continue existing instance on `:start`", %{pid: pid, process_uuid: process_uuid} do send_events(pid, [ %Started{process_uuid: process_uuid, reply_to: self()}, @@ -199,8 +213,8 @@ defmodule Commanded.ProcessManagers.ProcessManagerRoutingTest do stub(MockEventStore, :ack_event, fn _event_store, _pid, _event -> :ok end) end - defp send_events(pid, events, initial_event_number \\ 1) do - recorded_events = EventFactory.map_to_recorded_events(events, initial_event_number) + defp send_events(pid, events, initial_event_number \\ 1, opts \\ []) do + recorded_events = EventFactory.map_to_recorded_events(events, initial_event_number, opts) send(pid, {:events, recorded_events}) end diff --git a/test/process_managers/support/after_command_process_manager.ex b/test/process_managers/support/after_command_process_manager.ex index eab88a52..a51026f8 100644 --- a/test/process_managers/support/after_command_process_manager.ex +++ b/test/process_managers/support/after_command_process_manager.ex @@ -2,7 +2,7 @@ defmodule Commanded.ProcessManagers.AfterCommandProcessManager do @moduledoc false alias Commanded.ProcessManagers.AfterCommandProcessManager - alias Commanded.ProcessManagers.ExampleAggregate.Commands.Stop + alias Commanded.ProcessManagers.ExampleAggregate.Commands.{Continue, Stop} alias Commanded.ProcessManagers.ExampleAggregate.Events.{Interested, Started} alias Commanded.ProcessManagers.ExampleApp @@ -13,8 +13,15 @@ defmodule Commanded.ProcessManagers.AfterCommandProcessManager do @derive Jason.Encoder defstruct [:status, items: []] - def interested?(%Started{aggregate_uuid: aggregate_uuid}), do: {:start, aggregate_uuid} - def interested?(%Interested{aggregate_uuid: aggregate_uuid}), do: {:continue, aggregate_uuid} + def interested?(%Started{aggregate_uuid: aggregate_uuid}, _metadata), + do: {:start, aggregate_uuid} + + def interested?(%Interested{aggregate_uuid: aggregate_uuid}, _metadata), + do: {:continue, aggregate_uuid} + + def handle(%AfterCommandProcessManager{}, %Interested{index: 1, aggregate_uuid: aggregate_uuid}) do + %Continue{aggregate_uuid: aggregate_uuid} + end def handle(%AfterCommandProcessManager{}, %Interested{index: 10, aggregate_uuid: aggregate_uuid}) do %Stop{aggregate_uuid: aggregate_uuid} @@ -30,7 +37,19 @@ defmodule Commanded.ProcessManagers.AfterCommandProcessManager do %AfterCommandProcessManager{process_manager | items: items ++ [index]} end + # after_command/2 callback def after_command(%AfterCommandProcessManager{}, %Stop{}) do :stop end + + # after_command/3 callback + def after_command(%AfterCommandProcessManager{}, %Continue{}, metadata) do + %{"notify_to" => notify_to} = metadata + + notify_to + |> :erlang.list_to_pid() + |> Process.send(:metadata_available, []) + + :continue + end end diff --git a/test/process_managers/support/error/state_error_handling_process_manager.ex b/test/process_managers/support/error/state_error_handling_process_manager.ex index 527d7e54..130d697f 100644 --- a/test/process_managers/support/error/state_error_handling_process_manager.ex +++ b/test/process_managers/support/error/state_error_handling_process_manager.ex @@ -3,11 +3,11 @@ defmodule Commanded.ProcessManagers.StateErrorHandlingProcessManager do alias Commanded.ProcessManagers.ErrorAggregate.Commands.AttemptProcess alias Commanded.ProcessManagers.ErrorAggregate.Events.ProcessStarted - alias Commanded.ProcessManagers.{ExampleApp, FailureContext} + alias Commanded.ProcessManagers.{ErrorApp, FailureContext} alias Commanded.ProcessManagers.StateErrorHandlingProcessManager use Commanded.ProcessManagers.ProcessManager, - application: ExampleApp, + application: ErrorApp, name: "StateErrorHandlingProcessManager" defstruct [:process_uuid, :reply_to] @@ -24,12 +24,12 @@ defmodule Commanded.ProcessManagers.StateErrorHandlingProcessManager do %StateErrorHandlingProcessManager{reply_to: reply_to, process_uuid: process_uuid} end - def error(_, _, %FailureContext{} = failure_context) do + def error({:error, error}, event, %FailureContext{} = failure_context) do %FailureContext{process_manager_state: %{reply_to: reply_to}} = failure_context pid = :erlang.list_to_pid(reply_to) - send(pid, :got_from_context) + send(pid, {:error, error, event, failure_context}) {:stop, :stopping} end diff --git a/test/process_managers/support/example_aggregate.ex b/test/process_managers/support/example_aggregate.ex index d0b748dc..efcfe465 100644 --- a/test/process_managers/support/example_aggregate.ex +++ b/test/process_managers/support/example_aggregate.ex @@ -1,15 +1,15 @@ defmodule Commanded.ProcessManagers.ExampleAggregate do @moduledoc false alias Commanded.ProcessManagers.ExampleAggregate + @derive Jason.Encoder - defstruct uuid: nil, - state: nil, - items: [] + defstruct [:uuid, state: nil, items: []] defmodule Commands do defmodule(Start, do: defstruct([:aggregate_uuid])) defmodule(Publish, do: defstruct([:aggregate_uuid, :interesting, :uninteresting])) defmodule(Pause, do: defstruct([:aggregate_uuid])) + defmodule(Continue, do: defstruct([:aggregate_uuid])) defmodule(Stop, do: defstruct([:aggregate_uuid])) defmodule(Error, do: defstruct([:aggregate_uuid])) defmodule(Raise, do: defstruct([:aggregate_uuid])) @@ -71,6 +71,10 @@ defmodule Commanded.ProcessManagers.ExampleAggregate do %Events.Stopped{aggregate_uuid: aggregate_uuid} end + def continue(%ExampleAggregate{}) do + [] + end + def error(%ExampleAggregate{uuid: aggregate_uuid}) do %Events.Errored{aggregate_uuid: aggregate_uuid} end diff --git a/test/process_managers/support/example_command_handler.ex b/test/process_managers/support/example_command_handler.ex index 1c347962..05cb0c73 100644 --- a/test/process_managers/support/example_command_handler.ex +++ b/test/process_managers/support/example_command_handler.ex @@ -5,6 +5,7 @@ defmodule Commanded.ProcessManagers.ExampleCommandHandler do alias Commanded.ProcessManagers.ExampleAggregate alias Commanded.ProcessManagers.ExampleAggregate.Commands.{ + Continue, Error, Pause, Publish, @@ -28,6 +29,9 @@ defmodule Commanded.ProcessManagers.ExampleCommandHandler do def handle(%ExampleAggregate{} = aggregate, %Stop{}), do: ExampleAggregate.stop(aggregate) + def handle(%ExampleAggregate{} = aggregate, %Continue{}), + do: ExampleAggregate.continue(aggregate) + def handle(%ExampleAggregate{} = aggregate, %Error{}), do: ExampleAggregate.error(aggregate) diff --git a/test/process_managers/support/example_router.ex b/test/process_managers/support/example_router.ex index 2fb3a489..286a7c9f 100644 --- a/test/process_managers/support/example_router.ex +++ b/test/process_managers/support/example_router.ex @@ -6,6 +6,7 @@ defmodule Commanded.ProcessManagers.ExampleRouter do alias Commanded.ProcessManagers.{ExampleAggregate, ExampleCommandHandler} alias Commanded.ProcessManagers.ExampleAggregate.Commands.{ + Continue, Error, Pause, Publish, @@ -14,7 +15,7 @@ defmodule Commanded.ProcessManagers.ExampleRouter do Stop } - dispatch [Error, Pause, Publish, Raise, Start, Stop], + dispatch [Error, Pause, Publish, Raise, Start, Stop, Continue], to: ExampleCommandHandler, aggregate: ExampleAggregate, identity: :aggregate_uuid diff --git a/test/process_managers/support/routing/routing_process_manager.ex b/test/process_managers/support/routing/routing_process_manager.ex index b6724c27..6648ab6f 100644 --- a/test/process_managers/support/routing/routing_process_manager.ex +++ b/test/process_managers/support/routing/routing_process_manager.ex @@ -8,6 +8,10 @@ defmodule Commanded.ProcessManagers.RoutingProcessManager do defstruct [:process_uuid, :reply_to, strict?: false] end + defmodule StartedFromMetadata do + defstruct [:process_uuid, :reply_to, strict?: false] + end + defmodule Continued do @enforce_keys [:process_uuid] defstruct [:process_uuid, :reply_to, strict?: false] @@ -28,6 +32,9 @@ defmodule Commanded.ProcessManagers.RoutingProcessManager do defstruct [:processes] + def interested?(%StartedFromMetadata{}, %{"process_uuid" => process_uuid} = _metadata), + do: {:start, process_uuid} + def interested?(%Started{process_uuid: process_uuid, strict?: true}), do: {:start!, process_uuid} @@ -43,6 +50,14 @@ defmodule Commanded.ProcessManagers.RoutingProcessManager do def interested?(%Errored{}), do: raise("error") + def handle(%RoutingProcessManager{}, %StartedFromMetadata{} = event) do + %StartedFromMetadata{reply_to: reply_to} = event + + send(reply_to, {:started, self()}) + + [] + end + def handle(%RoutingProcessManager{}, %Started{} = event) do %Started{reply_to: reply_to} = event diff --git a/test/subscriptions/distributed_subscriptions_test.exs b/test/subscriptions/distributed_subscriptions_test.exs index 1764ae6b..0ea8f6b1 100644 --- a/test/subscriptions/distributed_subscriptions_test.exs +++ b/test/subscriptions/distributed_subscriptions_test.exs @@ -8,7 +8,7 @@ defmodule Commanded.DistributedSubscriptionsTest do setup do :ok = LocalCluster.start() - nodes = LocalCluster.start_nodes("commanded", 3) + nodes = LocalCluster.start_nodes("commanded", 3, applications: [:commanded]) [nodes: nodes] end