Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

for elixir 1.6 #1

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .formatter.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Used by "mix format"
[
inputs: ["*.{ex,exs}", "{lib,test}/**/*.{ex,exs}"]
]
23 changes: 23 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# The directory Mix will write compiled artifacts to.
/_build/

# If you run "mix test --cover", coverage assets end up here.
/cover/

# The directory Mix downloads your dependencies sources to.
/deps/

# Where third-party dependencies like ExDoc output generated docs.
/doc/

# Ignore .fetch files in case you like to edit your project deps locally.
/.fetch

# If the VM crashes, it generates a dump, let's ignore it too.
erl_crash.dump

# Also ignore archive artifacts (built via "mix archive.build").
*.ez

# Ignore package tarball (built via "mix hex.build").
phoenix_pubsub_nats-*.tar
12 changes: 1 addition & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,10 @@ Add `phoenix_pubsub_nats` as a dependency in your `mix.exs` file.

```elixir
def deps do
[{:phoenix_pubsub_nats, git: "https://github.com/mtokioka/phoenix_pubsub_nats.git"}]
[{:phoenix_pubsub_nats, git: "https://github.com/xflagstudio/phoenix_pubsub_nats.git"}]
end
```

You should also update your application list to include `:phoenix_pubsub_nats`:

```elixir
def application do
[applications: [:phoenix_pubsub_nats]]
end

```

Edit your Phoenix application Endpoint configuration:

config :my_app, MyApp.Endpoint,
Expand Down Expand Up @@ -50,4 +41,3 @@ The following options are supported:
{:noreply, socket}
end
```

97 changes: 63 additions & 34 deletions lib/phoenix/pubsub/nats.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ defmodule Phoenix.PubSub.Nats do
options = opts[:options] || []
hosts = options[:hosts] || ["localhost"]
shard_num = length(hosts)
host_ring = HashRing.new
host_ring = HashRing.new()
host_ring = HashRing.add_nodes(host_ring, hosts)

# to make state smaller
Expand All @@ -31,38 +31,63 @@ defmodule Phoenix.PubSub.Nats do

## TODO: set various options from config
nats_opt = %{
tcp_opts: [:binary, nodelay: true],
tcp_opts: [:binary, nodelay: true]
}

pub_conn_pools = hosts |> Enum.map(fn(host) ->
conn_pool_name = create_pool_name(pub_conn_pool_base, host)
supervisor(Phoenix.PubSub.NatsPubConnSupervisor, [conn_pool_name, pub_pool_size, [Map.merge(nats_opt, extract_host(host))]], id: conn_pool_name)
end)
conn_pools = hosts |> Enum.map(fn(host) ->
conn_pool_name = create_pool_name(conn_pool_base, host)
conn_pool_opts = [
name: {:local, conn_pool_name},
worker_module: Phoenix.PubSub.NatsConn,
size: sub_pool_size,
strategy: :fifo,
max_overflow: 0
]
:poolboy.child_spec(conn_pool_name, conn_pool_opts, [Map.merge(nats_opt, extract_host(host))])
end)
pub_conn_pools =
hosts
|> Enum.map(fn host ->
conn_pool_name = create_pool_name(pub_conn_pool_base, host)

supervisor(
Phoenix.PubSub.NatsPubConnSupervisor,
[conn_pool_name, pub_pool_size, [Map.merge(nats_opt, extract_host(host))]],
id: conn_pool_name
)
end)

conn_pools =
hosts
|> Enum.map(fn host ->
conn_pool_name = create_pool_name(conn_pool_base, host)

conn_pool_opts = [
name: {:local, conn_pool_name},
worker_module: Phoenix.PubSub.NatsConn,
size: sub_pool_size,
strategy: :fifo,
max_overflow: 0
]

:poolboy.child_spec(conn_pool_name, conn_pool_opts, [
Map.merge(nats_opt, extract_host(host))
])
end)

dispatch_rules = [
{:broadcast, Phoenix.PubSub.NatsServer, [name]},
{:subscribe, Phoenix.PubSub.NatsServer, [name]},
{:unsubscribe, Phoenix.PubSub.NatsServer, [name]},
]

children = pub_conn_pools ++ conn_pools ++ [
supervisor(Phoenix.PubSub.LocalSupervisor, [name, 1, dispatch_rules]),
worker(Phoenix.PubSub.NatsServer,
[name, pub_conn_pool_base, pub_pool_size, conn_pool_base,
options ++ [shard_num: shard_num, host_ring: host_ring]])
{:broadcast, Phoenix.PubSub.NatsServer, [name]},
{:subscribe, Phoenix.PubSub.NatsServer, [name]},
{:unsubscribe, Phoenix.PubSub.NatsServer, [name]}
]
supervise children, strategy: :one_for_one

children =
pub_conn_pools ++
conn_pools ++
[
supervisor(Phoenix.PubSub.LocalSupervisor, [name, 1, dispatch_rules]),
worker(
Phoenix.PubSub.NatsServer,
[
name,
pub_conn_pool_base,
pub_pool_size,
conn_pool_base,
options ++ [shard_num: shard_num, host_ring: host_ring]
]
)
]

supervise(children, strategy: :one_for_one)
end

def target_shard_host(host_ring, topic) do
Expand All @@ -85,27 +110,31 @@ defmodule Phoenix.PubSub.Nats do

defp extract_host(host_config) do
split = String.split(host_config, ":")

if Enum.count(split) == 1 do
%{host: to_char_list(List.first(split))}
%{host: to_charlist(List.first(split))}
else
%{host: to_char_list(List.first(split)), port: String.to_integer(List.last(split))}
%{host: to_charlist(List.first(split)), port: String.to_integer(List.last(split))}
end
end

def with_conn(pool_name, fun) when is_function(fun, 1) do
case get_conn(pool_name, 0, @pool_size) do
{:ok, conn} -> fun.(conn)
{:ok, conn} -> fun.(conn)
{:error, reason} -> {:error, reason}
end
end

defp get_conn(pool_name, retry_count, max_retry_count) do
case :poolboy.transaction(pool_name, &GenServer.call(&1, :conn)) do
{:ok, conn} -> {:ok, conn}
{:ok, conn} ->
{:ok, conn}

{:error, _reason} when retry_count < max_retry_count ->
get_conn(pool_name, retry_count + 1, max_retry_count)
{:error, reason} -> {:error, reason}

{:error, reason} ->
{:error, reason}
end
end

end
5 changes: 4 additions & 1 deletion lib/phoenix/pubsub/nats_conn.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ defmodule Phoenix.PubSub.NatsConn do
def start_link(opts) do
GenServer.start_link(__MODULE__, opts)
end

def start_link(opts, name) do
GenServer.start_link(__MODULE__, opts, name: name)
end
Expand All @@ -37,14 +38,15 @@ defmodule Phoenix.PubSub.NatsConn do
case Gnat.start_link(state.opts) do
{:ok, pid} ->
{:noreply, %{state | conn: pid, status: :connected}}

{:error, _reason} ->
:timer.send_after(@reconnect_after_ms, :connect)
{:noreply, state}
end
end

def handle_info({:EXIT, _ref, _reason}, %{conn: _pid, status: :connected} = state) do
Logger.error "lost Nats connection. Attempting to reconnect..."
Logger.error("lost Nats connection. Attempting to reconnect...")
:timer.send_after(@reconnect_after_ms, :connect)
{:noreply, %{state | conn: nil, status: :disconnected}}
end
Expand All @@ -56,6 +58,7 @@ defmodule Phoenix.PubSub.NatsConn do
_, _ -> :ok
end
end

def terminate(_reason, _state) do
:ok
end
Expand Down
14 changes: 8 additions & 6 deletions lib/phoenix/pubsub/nats_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ defmodule Phoenix.PubSub.NatsConsumer do
if link, do: Process.link(pid)

case Nats.with_conn(conn_pool, fn conn ->
{:ok, ref} = Gnat.sub(conn, self(), topic)
Process.monitor(conn)
Process.monitor(pid)
{:ok, conn, ref}
end) do
{:ok, ref} = Gnat.sub(conn, self(), topic)
Process.monitor(conn)
Process.monitor(pid)
{:ok, conn, ref}
end) do
{:ok, conn, ref} ->
{:ok, %{conn: conn, pid: pid, sub_ref: ref, node_ref: node_ref}}

{:error, :disconnected} ->
{:stop, :disconnected}
end
Expand All @@ -40,9 +41,11 @@ defmodule Phoenix.PubSub.NatsConsumer do

def handle_info({:msg, %{body: payload, topic: _, reply_to: _}}, state) do
{remote_node_ref, from_pid, msg} = :erlang.binary_to_term(payload)

if from_pid == :none or remote_node_ref != state.node_ref or from_pid != state.pid do
send(state.pid, msg)
end

{:noreply, state}
end

Expand All @@ -62,5 +65,4 @@ defmodule Phoenix.PubSub.NatsConsumer do
_, _ -> :ok
end
end

end
11 changes: 6 additions & 5 deletions lib/phoenix/pubsub/nats_pub_conn_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ defmodule Phoenix.PubSub.NatsPubConnSupervisor do
end

def init([server, pool_size, opts]) do
children = for shard <- 0..(pool_size - 1) do
name = Nats.create_pub_conn_name(server, shard)
worker(Phoenix.PubSub.NatsConn, [opts, name], id: name)
end
children =
for shard <- 0..(pool_size - 1) do
name = Nats.create_pub_conn_name(server, shard)
worker(Phoenix.PubSub.NatsConn, [opts, name], id: name)
end

supervise(children, strategy: :one_for_one)
end

end
Loading