diff --git a/lib/event_bus.ex b/lib/event_bus.ex index d7c4dbf..6e78b9e 100644 --- a/lib/event_bus.ex +++ b/lib/event_bus.ex @@ -80,6 +80,23 @@ defmodule EventBus do to: Notification, as: :notify + @doc """ + Send an event to all subscribers, returning the results of all the computation + for each subscriber. I called this `declare` as a stronger "notify" - stronger + because we get the results returned from each Subscriber. + + ## Examples + + event = %Event{id: 1, topic: :webhook_received, + data: %{"message" => "Hi all!"}} + EventBus.declare(event) + {:ok, [{FirstSubscriber, results}, {SecondSubscriber, results}]} + + """ + defdelegate declare(event), + to: Notification, + as: :declare + @doc """ Check if a topic registered. diff --git a/lib/event_bus/managers/notification.ex b/lib/event_bus/managers/notification.ex index c078793..c0e655f 100644 --- a/lib/event_bus/managers/notification.ex +++ b/lib/event_bus/managers/notification.ex @@ -33,6 +33,15 @@ defmodule EventBus.Manager.Notification do GenServer.cast(__MODULE__, {:notify, event}) end + @doc """ + Notify event to event.topic subscribers in the current node, while + also returning the results of the `process/2` function of each + subscriber. + """ + def declare(%Event{} = event) do + GenServer.call(__MODULE__, {:declare, event}) + end + ########################################################################### # PRIVATE API ########################################################################### @@ -40,7 +49,11 @@ defmodule EventBus.Manager.Notification do @doc false @spec handle_cast({:notify, event()}, term()) :: no_return() def handle_cast({:notify, event}, state) do - @backend.notify(event) + @backend.notify(event) # results discarded... {:noreply, state} end + + def handle_call({:declare, event}, _from, state) do + {:reply, @backend.notify(event), state} + end end diff --git a/lib/event_bus/services/notification.ex b/lib/event_bus/services/notification.ex index 2911fc6..f5c5547 100644 --- a/lib/event_bus/services/notification.ex +++ b/lib/event_bus/services/notification.ex @@ -20,23 +20,21 @@ defmodule EventBus.Service.Notification do subscribers = SubscriptionManager.subscribers(topic) if subscribers == [] do - warn_missing_topic_subscription(topic) + {:error, warn_missing_topic_subscription(topic)} else :ok = StoreManager.create(event) :ok = ObservationManager.create({subscribers, {topic, id}}) notify_subscribers(subscribers, {topic, id}) end - - :ok end @spec notify_subscribers(subscribers(), event_shadow()) :: :ok defp notify_subscribers(subscribers, event_shadow) do - Enum.each(subscribers, fn subscriber -> - notify_subscriber(subscriber, event_shadow) + Enum.map(subscribers, fn subscriber -> + res = notify_subscriber(subscriber, event_shadow) + {subscriber, res} end) - :ok end @spec notify_subscriber(subscriber(), event_shadow()) :: no_return() @@ -44,16 +42,18 @@ defmodule EventBus.Service.Notification do subscriber.process({config, topic, id}) rescue error -> - log_error(subscriber, error) + log_error(subscriber, error, __STACKTRACE__) ObservationManager.mark_as_skipped({{subscriber, config}, {topic, id}}) + {subscriber, {:error, error}} end defp notify_subscriber(subscriber, {topic, id}) do subscriber.process({topic, id}) rescue error -> - log_error(subscriber, error) + log_error(subscriber, error, __STACKTRACE__) ObservationManager.mark_as_skipped({subscriber, {topic, id}}) + {subscriber, {:error, error}} end @spec registration_status(topic()) :: String.t() @@ -66,12 +66,13 @@ defmodule EventBus.Service.Notification do msg = "Topic(:#{topic}#{registration_status(topic)}) doesn't have subscribers" - Logger.warn(msg) + :ok = Logger.warn(msg) + msg end - @spec log_error(module(), any()) :: no_return() - defp log_error(subscriber, error) do - msg = "#{subscriber}.process/1 raised an error!\n#{inspect(error)}" - Logger.info(msg) + @spec log_error(module(), any(), any()) :: no_return() + defp log_error(subscriber, error, stacktrace) do + msg = "#{subscriber}.process/1 raised an error!\n" <> Exception.format(:error, error, stacktrace) + Logger.error(msg) end end