Skip to content

Commit

Permalink
Merge pull request #1 from elbow-jason/try2
Browse files Browse the repository at this point in the history
Fix concurrent async jobs not returning correctly.
  • Loading branch information
Jason Goldberger authored Aug 17, 2016
2 parents 96ed233 + 052c430 commit 486e53e
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 29 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ If [available in Hex](https://hex.pm/docs/publish), the package can be installed

```elixir
def deps do
[{:jobbit, "~> 0.2.0"}]
[{:jobbit, "~> 0.3.0"}]
end
```

Expand Down
65 changes: 39 additions & 26 deletions lib/jobbit.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ defmodule Jobbit do
owner: nil,
]

def async(fun) do
async(:erlang, :apply, [fun, []])
def async(func) do
async(:erlang, :apply, [func, []])
end
def async(mod, func, args) do
def async(mod, func_atom, args) do
owner = self
pid = spawn(fn -> run(owner, fn -> apply(mod, func, args) end) end)
pid = spawn(fn -> run(owner, make_func(mod, func_atom, args)) end)
ref = Process.monitor(pid)
send(pid, {owner, ref})
%Jobbit{
Expand All @@ -22,28 +22,37 @@ defmodule Jobbit do
}
end

def await(task, timeout \\ 5000)
def await(%Jobbit{owner: owner}, _) when owner != self do
def await(job, timeout \\ 5000, seen \\ [])
def await(%Jobbit{owner: owner}, _, _) when owner != self do
raise "Invalid Jobbit owner"
end
def await(%Jobbit{ref: ref}, timeout) do
receive do
{^ref, reply} ->
Process.demonitor(ref, [:flush])
{:ok, reply}
{:DOWN, ^ref, _, _, reason} ->
{:error, reason}
{other_ref, reply} when other_ref |> is_reference ->
Logger.error("Unknown Reference:\n\tExpected: #{inspect ref}\n\tGot: #{inspect other_ref}")
Process.demonitor(ref, [:flush])
{:error, {:failed_job, reply}}
x ->
Logger.error("Unknown Error #{inspect x}")
{:error, {:unknown_failure, x}}
after
timeout ->
Process.demonitor(ref, [:flush])
{:error, :timeout}
def await(%Jobbit{ref: ref} = job, timeout, seen) do
if Enum.member?(seen, ref) do
{:error, :internal_error}
else
receive do
{^ref, reply} ->
Process.demonitor(ref, [:flush])
{:ok, reply}
{:DOWN, ^ref, _, _, {reason, _details}} ->
{:error, reason}
{:DOWN, ^ref, _, _, :normal} ->
await(job, timeout, seen)
{:DOWN, ^ref, _, _, reason} ->
{:error, reason}
{other_ref, reply} when other_ref |> is_reference ->
send self, {other_ref, reply}
await(job, timeout, [other_ref|seen])
{:DOWN, other_ref, some_atom, pid, reason} when other_ref |> is_reference ->
send self, {:DOWN, other_ref, some_atom, pid, reason}
await(job, timeout, [other_ref|seen])
x ->
{:error, :internal_error}
after
timeout ->
Process.demonitor(ref, [:flush])
{:error, :timeout}
end
end
end

Expand All @@ -57,8 +66,12 @@ defmodule Jobbit do
end
end

defp invalid_owner_error(err, owner) do
Logger.error("Jobbit - Invalid Owner: Expected {#{inspect owner}, ref}. Got #{inspect err}")
defp make_func(mod, func_atom, args) do
fn -> apply(mod, func_atom, args) end
end

defp invalid_owner_error(err, master) do
Logger.error("Jobbit - Invalid :owner\n\tExpected {#{inspect master}, ref}\n\tGot #{inspect err}")
exit(:shutdown)
end

Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Jobbit.Mixfile do
def project do
[
app: :jobbit,
version: "0.2.0",
version: "0.3.0",
elixir: "~> 1.3",
build_embedded: Mix.env == :prod,
start_permanent: Mix.env == :prod,
Expand Down
24 changes: 23 additions & 1 deletion test/jobbit_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,33 @@ defmodule JobbitTest do
end

# despite red text appearing this test passes
test "async await does not crash parent process when an exception occurs" do
test "async/await does not crash parent process when an exception occurs" do
error_message = "This is an intentional exception for testing purposes"
result = Jobbit.async(fn -> raise error_message end) |> Jobbit.await
assert result |> is_tuple
assert result |> elem(0) == :error
end

test "async/await works for multiple jobs and keeps them in order" do
result = 1..5
|> Enum.map(fn num ->
Jobbit.async(fn ->
time = (33 * num)
:timer.sleep(300 - time)
time
end)
end)
|> Enum.map(fn job -> Jobbit.await(job) end)
assert result == [ok: 33, ok: 66, ok: 99, ok: 132, ok: 165]
# make sure there are no lingering messages...
# if so we may be leaking mem...
thing = receive do
x -> x
after
1 ->
nil
end
assert thing == nil
end

end

0 comments on commit 486e53e

Please sign in to comment.