diff --git a/.formatter.exs b/.formatter.exs new file mode 100644 index 0000000..405433c --- /dev/null +++ b/.formatter.exs @@ -0,0 +1,4 @@ +# Used by "mix format" +[ + inputs: ["*.{ex,exs}", "{lib,test}/**/*.{ex,exs}"] +] diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..141a07b --- /dev/null +++ b/.gitignore @@ -0,0 +1,23 @@ +# The directory Mix will write compiled artifacts to. +/_build/ + +# If you run "mix test --cover", coverage assets end up here. +/cover/ + +# The directory Mix downloads your dependencies sources to. +/deps/ + +# Where third-party dependencies like ExDoc output generated docs. +/doc/ + +# Ignore .fetch files in case you like to edit your project deps locally. +/.fetch + +# If the VM crashes, it generates a dump, let's ignore it too. +erl_crash.dump + +# Also ignore archive artifacts (built via "mix archive.build"). +*.ez + +# Ignore package tarball (built via "mix hex.build"). +phoenix_pubsub_nats-*.tar diff --git a/README.md b/README.md index 9fbfc99..d585d64 100644 --- a/README.md +++ b/README.md @@ -9,19 +9,10 @@ Add `phoenix_pubsub_nats` as a dependency in your `mix.exs` file. ```elixir def deps do - [{:phoenix_pubsub_nats, git: "https://github.com/mtokioka/phoenix_pubsub_nats.git"}] + [{:phoenix_pubsub_nats, git: "https://github.com/xflagstudio/phoenix_pubsub_nats.git"}] end ``` -You should also update your application list to include `:phoenix_pubsub_nats`: - -```elixir -def application do - [applications: [:phoenix_pubsub_nats]] -end - -``` - Edit your Phoenix application Endpoint configuration: config :my_app, MyApp.Endpoint, @@ -50,4 +41,3 @@ The following options are supported: {:noreply, socket} end ``` - diff --git a/lib/phoenix/pubsub/nats.ex b/lib/phoenix/pubsub/nats.ex index c654382..886ca6d 100644 --- a/lib/phoenix/pubsub/nats.ex +++ b/lib/phoenix/pubsub/nats.ex @@ -21,7 +21,7 @@ defmodule Phoenix.PubSub.Nats do options = opts[:options] || [] hosts = options[:hosts] || ["localhost"] shard_num = length(hosts) - host_ring = HashRing.new + host_ring = HashRing.new() host_ring = HashRing.add_nodes(host_ring, hosts) # to make state smaller @@ -31,38 +31,63 @@ defmodule Phoenix.PubSub.Nats do ## TODO: set various options from config nats_opt = %{ - tcp_opts: [:binary, nodelay: true], + tcp_opts: [:binary, nodelay: true] } - pub_conn_pools = hosts |> Enum.map(fn(host) -> - conn_pool_name = create_pool_name(pub_conn_pool_base, host) - supervisor(Phoenix.PubSub.NatsPubConnSupervisor, [conn_pool_name, pub_pool_size, [Map.merge(nats_opt, extract_host(host))]], id: conn_pool_name) - end) - conn_pools = hosts |> Enum.map(fn(host) -> - conn_pool_name = create_pool_name(conn_pool_base, host) - conn_pool_opts = [ - name: {:local, conn_pool_name}, - worker_module: Phoenix.PubSub.NatsConn, - size: sub_pool_size, - strategy: :fifo, - max_overflow: 0 - ] - :poolboy.child_spec(conn_pool_name, conn_pool_opts, [Map.merge(nats_opt, extract_host(host))]) - end) + pub_conn_pools = + hosts + |> Enum.map(fn host -> + conn_pool_name = create_pool_name(pub_conn_pool_base, host) + + supervisor( + Phoenix.PubSub.NatsPubConnSupervisor, + [conn_pool_name, pub_pool_size, [Map.merge(nats_opt, extract_host(host))]], + id: conn_pool_name + ) + end) + + conn_pools = + hosts + |> Enum.map(fn host -> + conn_pool_name = create_pool_name(conn_pool_base, host) + + conn_pool_opts = [ + name: {:local, conn_pool_name}, + worker_module: Phoenix.PubSub.NatsConn, + size: sub_pool_size, + strategy: :fifo, + max_overflow: 0 + ] + + :poolboy.child_spec(conn_pool_name, conn_pool_opts, [ + Map.merge(nats_opt, extract_host(host)) + ]) + end) dispatch_rules = [ - {:broadcast, Phoenix.PubSub.NatsServer, [name]}, - {:subscribe, Phoenix.PubSub.NatsServer, [name]}, - {:unsubscribe, Phoenix.PubSub.NatsServer, [name]}, - ] - - children = pub_conn_pools ++ conn_pools ++ [ - supervisor(Phoenix.PubSub.LocalSupervisor, [name, 1, dispatch_rules]), - worker(Phoenix.PubSub.NatsServer, - [name, pub_conn_pool_base, pub_pool_size, conn_pool_base, - options ++ [shard_num: shard_num, host_ring: host_ring]]) + {:broadcast, Phoenix.PubSub.NatsServer, [name]}, + {:subscribe, Phoenix.PubSub.NatsServer, [name]}, + {:unsubscribe, Phoenix.PubSub.NatsServer, [name]} ] - supervise children, strategy: :one_for_one + + children = + pub_conn_pools ++ + conn_pools ++ + [ + supervisor(Phoenix.PubSub.LocalSupervisor, [name, 1, dispatch_rules]), + worker( + Phoenix.PubSub.NatsServer, + [ + name, + pub_conn_pool_base, + pub_pool_size, + conn_pool_base, + options ++ [shard_num: shard_num, host_ring: host_ring] + ] + ) + ] + + supervise(children, strategy: :one_for_one) end def target_shard_host(host_ring, topic) do @@ -85,27 +110,31 @@ defmodule Phoenix.PubSub.Nats do defp extract_host(host_config) do split = String.split(host_config, ":") + if Enum.count(split) == 1 do - %{host: to_char_list(List.first(split))} + %{host: to_charlist(List.first(split))} else - %{host: to_char_list(List.first(split)), port: String.to_integer(List.last(split))} + %{host: to_charlist(List.first(split)), port: String.to_integer(List.last(split))} end end def with_conn(pool_name, fun) when is_function(fun, 1) do case get_conn(pool_name, 0, @pool_size) do - {:ok, conn} -> fun.(conn) + {:ok, conn} -> fun.(conn) {:error, reason} -> {:error, reason} end end defp get_conn(pool_name, retry_count, max_retry_count) do case :poolboy.transaction(pool_name, &GenServer.call(&1, :conn)) do - {:ok, conn} -> {:ok, conn} + {:ok, conn} -> + {:ok, conn} + {:error, _reason} when retry_count < max_retry_count -> get_conn(pool_name, retry_count + 1, max_retry_count) - {:error, reason} -> {:error, reason} + + {:error, reason} -> + {:error, reason} end end - end diff --git a/lib/phoenix/pubsub/nats_conn.ex b/lib/phoenix/pubsub/nats_conn.ex index 60c8c97..6986fb1 100644 --- a/lib/phoenix/pubsub/nats_conn.ex +++ b/lib/phoenix/pubsub/nats_conn.ex @@ -14,6 +14,7 @@ defmodule Phoenix.PubSub.NatsConn do def start_link(opts) do GenServer.start_link(__MODULE__, opts) end + def start_link(opts, name) do GenServer.start_link(__MODULE__, opts, name: name) end @@ -37,6 +38,7 @@ defmodule Phoenix.PubSub.NatsConn do case Gnat.start_link(state.opts) do {:ok, pid} -> {:noreply, %{state | conn: pid, status: :connected}} + {:error, _reason} -> :timer.send_after(@reconnect_after_ms, :connect) {:noreply, state} @@ -44,7 +46,7 @@ defmodule Phoenix.PubSub.NatsConn do end def handle_info({:EXIT, _ref, _reason}, %{conn: _pid, status: :connected} = state) do - Logger.error "lost Nats connection. Attempting to reconnect..." + Logger.error("lost Nats connection. Attempting to reconnect...") :timer.send_after(@reconnect_after_ms, :connect) {:noreply, %{state | conn: nil, status: :disconnected}} end @@ -56,6 +58,7 @@ defmodule Phoenix.PubSub.NatsConn do _, _ -> :ok end end + def terminate(_reason, _state) do :ok end diff --git a/lib/phoenix/pubsub/nats_consumer.ex b/lib/phoenix/pubsub/nats_consumer.ex index a2a9003..daf6a5e 100644 --- a/lib/phoenix/pubsub/nats_consumer.ex +++ b/lib/phoenix/pubsub/nats_consumer.ex @@ -17,13 +17,14 @@ defmodule Phoenix.PubSub.NatsConsumer do if link, do: Process.link(pid) case Nats.with_conn(conn_pool, fn conn -> - {:ok, ref} = Gnat.sub(conn, self(), topic) - Process.monitor(conn) - Process.monitor(pid) - {:ok, conn, ref} - end) do + {:ok, ref} = Gnat.sub(conn, self(), topic) + Process.monitor(conn) + Process.monitor(pid) + {:ok, conn, ref} + end) do {:ok, conn, ref} -> {:ok, %{conn: conn, pid: pid, sub_ref: ref, node_ref: node_ref}} + {:error, :disconnected} -> {:stop, :disconnected} end @@ -40,9 +41,11 @@ defmodule Phoenix.PubSub.NatsConsumer do def handle_info({:msg, %{body: payload, topic: _, reply_to: _}}, state) do {remote_node_ref, from_pid, msg} = :erlang.binary_to_term(payload) + if from_pid == :none or remote_node_ref != state.node_ref or from_pid != state.pid do send(state.pid, msg) end + {:noreply, state} end @@ -62,5 +65,4 @@ defmodule Phoenix.PubSub.NatsConsumer do _, _ -> :ok end end - end diff --git a/lib/phoenix/pubsub/nats_pub_conn_supervisor.ex b/lib/phoenix/pubsub/nats_pub_conn_supervisor.ex index 2174515..e6fb7e9 100644 --- a/lib/phoenix/pubsub/nats_pub_conn_supervisor.ex +++ b/lib/phoenix/pubsub/nats_pub_conn_supervisor.ex @@ -7,11 +7,12 @@ defmodule Phoenix.PubSub.NatsPubConnSupervisor do end def init([server, pool_size, opts]) do - children = for shard <- 0..(pool_size - 1) do - name = Nats.create_pub_conn_name(server, shard) - worker(Phoenix.PubSub.NatsConn, [opts, name], id: name) - end + children = + for shard <- 0..(pool_size - 1) do + name = Nats.create_pub_conn_name(server, shard) + worker(Phoenix.PubSub.NatsConn, [opts, name], id: name) + end + supervise(children, strategy: :one_for_one) end - end diff --git a/lib/phoenix/pubsub/nats_server.ex b/lib/phoenix/pubsub/nats_server.ex index 2a8be5a..62292a0 100644 --- a/lib/phoenix/pubsub/nats_server.ex +++ b/lib/phoenix/pubsub/nats_server.ex @@ -9,7 +9,11 @@ defmodule Phoenix.PubSub.NatsServer do """ def start_link(server_name, pub_conn_pool_base, pub_conn_pool_size, conn_pool_base, opts) do - GenServer.start_link(__MODULE__, [server_name, pub_conn_pool_base, pub_conn_pool_size, conn_pool_base, opts], name: server_name) + GenServer.start_link( + __MODULE__, + [server_name, pub_conn_pool_base, pub_conn_pool_size, conn_pool_base, opts], + name: server_name + ) end @doc """ @@ -19,22 +23,27 @@ defmodule Phoenix.PubSub.NatsServer do def init([_server_name, pub_conn_pool_base, pub_conn_pool_size, conn_pool_base, opts]) do Process.flag(:trap_exit, true) ## TODO: make state compact - {:ok, %{cons: :ets.new(:rmq_cons, [:set, :private]), - subs: :ets.new(:rmq_subs, [:set, :private]), - pub_conn_pool_base: pub_conn_pool_base, - pub_conn_pool_size: pub_conn_pool_size, - conn_pool_base: conn_pool_base, - node_ref: :crypto.strong_rand_bytes(16), - opts: opts}} + {:ok, + %{ + cons: :ets.new(:rmq_cons, [:set, :private]), + subs: :ets.new(:rmq_subs, [:set, :private]), + pub_conn_pool_base: pub_conn_pool_base, + pub_conn_pool_size: pub_conn_pool_size, + conn_pool_base: conn_pool_base, + node_ref: :crypto.strong_rand_bytes(16), + opts: opts + }} end def subscribe(server_name, pid, topic, opts) do GenServer.call(server_name, {:subscribe, pid, topic, opts}) end + def unsubscribe(server_name, pid, topic) do GenServer.call(server_name, {:unsubscribe, pid, topic}) end - def broadcast(server_name,from_pid, topic, msg) do + + def broadcast(server_name, from_pid, topic, msg) do GenServer.call(server_name, {:broadcast, from_pid, topic, msg}) end @@ -42,28 +51,38 @@ defmodule Phoenix.PubSub.NatsServer do link = Keyword.get(opts, :link, false) subs_list = :ets.lookup(state.subs, topic) - has_key = case subs_list do - [] -> false - [{^topic, pids}] -> Enum.find_value(pids, false, fn(x) -> elem(x, 0) == pid end) - end + + has_key = + case subs_list do + [] -> false + [{^topic, pids}] -> Enum.find_value(pids, false, fn x -> elem(x, 0) == pid end) + end unless has_key do - pool_host = Nats.target_shard_host(state.opts[:host_ring], topic) + pool_host = Nats.target_shard_host(state.opts[:host_ring], topic) conn_pool_name = Nats.create_pool_name(state.conn_pool_base, pool_host) - {:ok, consumer_pid} = Consumer.start(conn_pool_name, - topic, - pid, - state.node_ref, - link) + + {:ok, consumer_pid} = + Consumer.start( + conn_pool_name, + topic, + pid, + state.node_ref, + link + ) + Process.monitor(consumer_pid) if link, do: Process.link(pid) :ets.insert(state.cons, {consumer_pid, {topic, pid}}) - pids = case subs_list do - [] -> [] - [{^topic, pids}] -> pids - end + + pids = + case subs_list do + [] -> [] + [{^topic, pids}] -> pids + end + :ets.insert(state.subs, {topic, pids ++ [{pid, consumer_pid}]}) {:reply, :ok, state} @@ -74,10 +93,12 @@ defmodule Phoenix.PubSub.NatsServer do case :ets.lookup(state.subs, topic) do [] -> {:reply, :ok, state} + [{^topic, pids}] -> - case Enum.find(pids, false, fn(x) -> elem(x, 0) == pid end) do + case Enum.find(pids, false, fn x -> elem(x, 0) == pid end) do nil -> {:reply, :ok, state} + {^pid, consumer_pid} -> :ok = Consumer.stop(consumer_pid) delete_subscriber(state.subs, pid, topic) @@ -93,8 +114,8 @@ defmodule Phoenix.PubSub.NatsServer do def handle_call({:subscribers, topic}, _from, state) do case :ets.lookup(state.subs, topic) do - [] -> {:reply, [], state} - [{^topic, pids}] -> {:reply, Enum.map(pids, fn(x) -> elem(x, 0) end), state} + [] -> {:reply, [], state} + [{^topic, pids}] -> {:reply, Enum.map(pids, fn x -> elem(x, 0) end), state} end end @@ -102,26 +123,32 @@ defmodule Phoenix.PubSub.NatsServer do pool_host = Nats.target_shard_host(state.opts[:host_ring], topic) pool_name = Nats.create_pool_name(state.pub_conn_pool_base, pool_host) conn_name = Nats.get_pub_conn_name(pool_name, topic, state.pub_conn_pool_size) + case GenServer.call(conn_name, :conn) do - {:ok, conn} -> + {:ok, conn} -> case Gnat.pub(conn, topic, :erlang.term_to_binary({state.node_ref, from_pid, msg})) do - :ok -> {:reply, :ok, state} - {:error, reason} -> {:reply, {:error, reason}, state} + :ok -> {:reply, :ok, state} + {:error, reason} -> {:reply, {:error, reason}, state} end - {:error, reason} -> {:reply, {:error, reason}, state} + + {:error, reason} -> + {:reply, {:error, reason}, state} end end - def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do + def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do state = case :ets.lookup(state.cons, pid) do - [] -> state + [] -> + state + [{^pid, {topic, sub_pid}}] -> :ets.delete(state.cons, pid) delete_subscriber(state.subs, sub_pid, topic) state end + {:noreply, state} end @@ -132,17 +159,19 @@ defmodule Phoenix.PubSub.NatsServer do defp delete_subscriber(subs, pid, topic) do case :ets.lookup(subs, topic) do - [] -> + [] -> subs - [{^topic, pids}] -> + + [{^topic, pids}] -> remain_pids = List.keydelete(pids, pid, 0) + if length(remain_pids) > 0 do :ets.insert(subs, {topic, remain_pids}) else :ets.delete(subs, topic) end + subs end end - end diff --git a/mix.exs b/mix.exs index 2a2f35d..30fdc26 100644 --- a/mix.exs +++ b/mix.exs @@ -2,25 +2,28 @@ defmodule Phoenix.PubSub.Nats.Mixfile do use Mix.Project def project do - [app: :phoenix_pubsub_nats, - version: "0.0.1", - elixir: "~> 1.2", - description: description(), - package: package(), - source_url: "https://github.com/mtokioka/phoenix_pubsub_nats", - deps: deps(), - docs: [readme: "README.md", main: "README"]] + [ + app: :phoenix_pubsub_nats, + version: "0.2.0", + elixir: "~> 1.6", + description: description(), + package: package(), + source_url: "https://github.com/xflagstudio/phoenix_pubsub_nats", + deps: deps(), + docs: [readme: "README.md", main: "README"] + ] end def application do - [applications: [:logger, :poolboy, :phoenix_pubsub, :libring, :gnat]] + [extra_applications: [:logger]] end defp deps do - [{:poolboy, ">= 1.4.2"}, - {:phoenix_pubsub, ">= 1.0.0"}, - {:gnat, ">= 0.3.0"}, - {:libring, "~> 1.0"}, + [ + {:poolboy, ">= 1.4.2"}, + {:phoenix_pubsub, ">= 1.0.0"}, + {:gnat, ">= 0.7.0"}, + {:libring, "~> 1.0"} ] end @@ -31,9 +34,11 @@ defmodule Phoenix.PubSub.Nats.Mixfile do end defp package do - [files: ["lib", "mix.exs", "README.md", "LICENSE"], - contributors: ["Masahiro Tokioka"], - licenses: ["MIT"], - links: %{"GitHub" => "https://github.com/mtokioka/phoenix_pubsub_nats"}] + [ + files: ["lib", "mix.exs", "README.md", "LICENSE"], + contributors: ["Masahiro Tokioka"], + licenses: ["MIT"], + links: %{"GitHub" => "https://github.com/xflagstudio/phoenix_pubsub_nats"} + ] end end diff --git a/mix.lock b/mix.lock index 71161bd..4d87c6e 100644 --- a/mix.lock +++ b/mix.lock @@ -1,5 +1,9 @@ -%{"gnat": {:hex, :gnat, "0.3.3", "7b33c5564001a0c02bc8908f8507f3deb96a3187c6bad7f5ed84283d3584ce70", [:mix], [{:poison, "~> 3.0", [hex: :poison, repo: "hexpm", optional: false]}], "hexpm"}, - "libring": {:hex, :libring, "1.0.2", "95986f0aad553a340f34b898d146474c7a76fdf6e608ad9e0626e07514d8cdf4", [:mix], [], "hexpm"}, - "phoenix_pubsub": {:hex, :phoenix_pubsub, "1.0.1", "c10ddf6237007c804bf2b8f3c4d5b99009b42eca3a0dfac04ea2d8001186056a", [:mix], [], "hexpm"}, - "poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"}, - "poolboy": {:hex, :poolboy, "1.5.1", "6b46163901cfd0a1b43d692657ed9d7e599853b3b21b95ae5ae0a777cf9b6ca8", [:rebar], [], "hexpm"}} +%{ + "gnat": {:hex, :gnat, "0.7.0", "a6e17a6f5a2b66afc6e677fbdbd4452ab425400e756badb746c01594d3780126", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 0.5", [hex: :nimble_parsec, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm"}, + "jason": {:hex, :jason, "1.1.2", "b03dedea67a99223a2eaf9f1264ce37154564de899fd3d8b9a21b1a6fd64afe7", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm"}, + "libring": {:hex, :libring, "1.4.0", "41246ba2f3fbc76b3971f6bce83119dfec1eee17e977a48d8a9cfaaf58c2a8d6", [:mix], [], "hexpm"}, + "nimble_parsec": {:hex, :nimble_parsec, "0.5.1", "c90796ecee0289dbb5ad16d3ad06f957b0cd1199769641c961cfe0b97db190e0", [:mix], [], "hexpm"}, + "phoenix_pubsub": {:hex, :phoenix_pubsub, "1.1.2", "496c303bdf1b2e98a9d26e89af5bba3ab487ba3a3735f74bf1f4064d2a845a3e", [:mix], [], "hexpm"}, + "poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm"}, + "telemetry": {:hex, :telemetry, "0.4.0", "8339bee3fa8b91cb84d14c2935f8ecf399ccd87301ad6da6b71c09553834b2ab", [:rebar3], [], "hexpm"}, +} diff --git a/test/phoenix_pubsub_nats_test.exs b/test/phoenix_pubsub_nats_test.exs index 04dbf28..4f6a7be 100644 --- a/test/phoenix_pubsub_nats_test.exs +++ b/test/phoenix_pubsub_nats_test.exs @@ -43,7 +43,6 @@ defmodule PhoenixPubSubNatsTest do {:error, reason} -> raise BroadcastError, message: reason end end - end @adapters [ @@ -51,11 +50,11 @@ defmodule PhoenixPubSubNatsTest do ] def spawn_pid do - spawn fn -> :timer.sleep(:infinity) end + spawn(fn -> :timer.sleep(:infinity) end) end def rand_server_name do - :crypto.strong_rand_bytes(16) |> Base.encode16 |> String.to_atom + :crypto.strong_rand_bytes(16) |> Base.encode16() |> String.to_atom() end defmodule FailedBroadcaster do @@ -72,16 +71,16 @@ defmodule PhoenixPubSubNatsTest do {:ok, server: server_name} end - test "#{inspect @adapter} #subscribers, #subscribe, #unsubscribe", context do + test "#{inspect(@adapter)} #subscribers, #subscribe, #unsubscribe", context do pid = spawn_pid() assert Enum.empty?(PubSub.subscribers(context[:server], "topic4")) assert PubSub.subscribe(context[:server], pid, "topic4") - assert PubSub.subscribers(context[:server], "topic4") |> Enum.to_list == [pid] + assert PubSub.subscribers(context[:server], "topic4") |> Enum.to_list() == [pid] assert PubSub.unsubscribe(context[:server], pid, "topic4") assert Enum.empty?(PubSub.subscribers(context[:server], "topic4")) end - test "#{inspect @adapter} subscribe/3 with link does not down adapter", context do + test "#{inspect(@adapter)} subscribe/3 with link does not down adapter", context do server_pid = Process.whereis(context[:server]) assert Process.alive?(server_pid) pid = spawn_pid() @@ -93,7 +92,7 @@ defmodule PhoenixPubSubNatsTest do assert Process.alive?(server_pid) end - test "#{inspect @adapter} subscribe/3 with link downs subscriber", context do + test "#{inspect(@adapter)} subscribe/3 with link downs subscriber", context do server_pid = Process.whereis(context[:server]) assert Process.alive?(server_pid) @@ -112,30 +111,35 @@ defmodule PhoenixPubSubNatsTest do assert Process.alive?(non_linked_pid2) end - test "#{inspect @adapter} broadcast/3 and broadcast!/3 publishes message to each subscriber", context do - assert PubSub.subscribers(context[:server], "topic9") |> Enum.to_list == [] + test "#{inspect(@adapter)} broadcast/3 and broadcast!/3 publishes message to each subscriber", + context do + assert PubSub.subscribers(context[:server], "topic9") |> Enum.to_list() == [] PubSub.subscribe(context[:server], self(), "topic9") - assert PubSub.subscribers(context[:server], "topic9") |> Enum.to_list == [self()] + assert PubSub.subscribers(context[:server], "topic9") |> Enum.to_list() == [self()] :ok = PubSub.broadcast(context[:server], "topic9", :ping) assert_receive :ping :ok = PubSub.broadcast!(context[:server], "topic9", :ping) assert_receive :ping - assert PubSub.subscribers(context[:server], "topic9") |> Enum.to_list == [self()] + assert PubSub.subscribers(context[:server], "topic9") |> Enum.to_list() == [self()] end - test "#{inspect @adapter} broadcast!/3 and broadcast_from!/4 raise if broadcast fails", context do + test "#{inspect(@adapter)} broadcast!/3 and broadcast_from!/4 raise if broadcast fails", + context do PubSub.subscribe(context[:server], self(), "topic9") - assert PubSub.subscribers(context[:server], "topic9") |> Enum.to_list == [self()] + assert PubSub.subscribers(context[:server], "topic9") |> Enum.to_list() == [self()] + assert_raise PubSub.BroadcastError, fn -> PubSub.broadcast!(context[:server], "topic9", :ping, FailedBroadcaster) end + assert_raise PubSub.BroadcastError, fn -> PubSub.broadcast_from!(context[:server], self(), "topic9", :ping, FailedBroadcaster) end + refute_receive :ping end - test "#{inspect @adapter} broadcast_from/4 and broadcast_from!/4 skips sender", context do + test "#{inspect(@adapter)} broadcast_from/4 and broadcast_from!/4 skips sender", context do PubSub.subscribe(context[:server], self(), "topic11") PubSub.broadcast_from(context[:server], self(), "topic11", :ping) refute_receive :ping @@ -144,13 +148,14 @@ defmodule PhoenixPubSubNatsTest do refute_receive :ping end - test "#{inspect @adapter} processes automatically removed from topic when killed", context do + test "#{inspect(@adapter)} processes automatically removed from topic when killed", context do pid = spawn_pid() assert PubSub.subscribe(context[:server], pid, "topic12") - assert PubSub.subscribers(context[:server], "topic12") |> Enum.to_list == [pid] - Process.exit pid, :kill - :timer.sleep 50 # wait until adapter removes dead pid - assert PubSub.subscribers(context[:server], "topic12") |> Enum.to_list == [] + assert PubSub.subscribers(context[:server], "topic12") |> Enum.to_list() == [pid] + Process.exit(pid, :kill) + # wait until adapter removes dead pid + :timer.sleep(50) + assert PubSub.subscribers(context[:server], "topic12") |> Enum.to_list() == [] end end end