From 8d886e67ecaea9a74799c5f90f614f3fd3dc0ced Mon Sep 17 00:00:00 2001 From: Jason Goldberger Date: Tue, 16 Aug 2016 22:03:40 -0700 Subject: [PATCH 1/2] fixed multiple async job issues --- lib/jobbit.ex | 65 ++++++++++++++++++++++++++------------------ test/jobbit_test.exs | 24 +++++++++++++++- 2 files changed, 62 insertions(+), 27 deletions(-) diff --git a/lib/jobbit.ex b/lib/jobbit.ex index 38b90db..b1aea95 100644 --- a/lib/jobbit.ex +++ b/lib/jobbit.ex @@ -7,12 +7,12 @@ defmodule Jobbit do owner: nil, ] - def async(fun) do - async(:erlang, :apply, [fun, []]) + def async(func) do + async(:erlang, :apply, [func, []]) end - def async(mod, func, args) do + def async(mod, func_atom, args) do owner = self - pid = spawn(fn -> run(owner, fn -> apply(mod, func, args) end) end) + pid = spawn(fn -> run(owner, make_func(mod, func_atom, args)) end) ref = Process.monitor(pid) send(pid, {owner, ref}) %Jobbit{ @@ -22,28 +22,37 @@ defmodule Jobbit do } end - def await(task, timeout \\ 5000) - def await(%Jobbit{owner: owner}, _) when owner != self do + def await(job, timeout \\ 5000, seen \\ []) + def await(%Jobbit{owner: owner}, _, _) when owner != self do raise "Invalid Jobbit owner" end - def await(%Jobbit{ref: ref}, timeout) do - receive do - {^ref, reply} -> - Process.demonitor(ref, [:flush]) - {:ok, reply} - {:DOWN, ^ref, _, _, reason} -> - {:error, reason} - {other_ref, reply} when other_ref |> is_reference -> - Logger.error("Unknown Reference:\n\tExpected: #{inspect ref}\n\tGot: #{inspect other_ref}") - Process.demonitor(ref, [:flush]) - {:error, {:failed_job, reply}} - x -> - Logger.error("Unknown Error #{inspect x}") - {:error, {:unknown_failure, x}} - after - timeout -> - Process.demonitor(ref, [:flush]) - {:error, :timeout} + def await(%Jobbit{ref: ref} = job, timeout, seen) do + if Enum.member?(seen, ref) do + {:error, :internal_error} + else + receive do + {^ref, reply} -> + Process.demonitor(ref, [:flush]) + {:ok, reply} + {:DOWN, ^ref, _, _, {reason, _details}} -> + {:error, reason} + {:DOWN, ^ref, _, _, :normal} -> + await(job, timeout, seen) + {:DOWN, ^ref, _, _, reason} -> + {:error, reason} + {other_ref, reply} when other_ref |> is_reference -> + send self, {other_ref, reply} + await(job, timeout, [other_ref|seen]) + {:DOWN, other_ref, some_atom, pid, reason} when other_ref |> is_reference -> + send self, {:DOWN, other_ref, some_atom, pid, reason} + await(job, timeout, [other_ref|seen]) + x -> + {:error, :internal_error} + after + timeout -> + Process.demonitor(ref, [:flush]) + {:error, :timeout} + end end end @@ -57,8 +66,12 @@ defmodule Jobbit do end end - defp invalid_owner_error(err, owner) do - Logger.error("Jobbit - Invalid Owner: Expected {#{inspect owner}, ref}. Got #{inspect err}") + defp make_func(mod, func_atom, args) do + fn -> apply(mod, func_atom, args) end + end + + defp invalid_owner_error(err, master) do + Logger.error("Jobbit - Invalid :owner\n\tExpected {#{inspect master}, ref}\n\tGot #{inspect err}") exit(:shutdown) end diff --git a/test/jobbit_test.exs b/test/jobbit_test.exs index 4a1dd37..2ff3cd5 100644 --- a/test/jobbit_test.exs +++ b/test/jobbit_test.exs @@ -11,11 +11,33 @@ defmodule JobbitTest do end # despite red text appearing this test passes - test "async await does not crash parent process when an exception occurs" do + test "async/await does not crash parent process when an exception occurs" do error_message = "This is an intentional exception for testing purposes" result = Jobbit.async(fn -> raise error_message end) |> Jobbit.await assert result |> is_tuple assert result |> elem(0) == :error end + test "async/await works for multiple jobs and keeps them in order" do + result = 1..5 + |> Enum.map(fn num -> + Jobbit.async(fn -> + time = (33 * num) + :timer.sleep(300 - time) + time + end) + end) + |> Enum.map(fn job -> Jobbit.await(job) end) + assert result == [ok: 33, ok: 66, ok: 99, ok: 132, ok: 165] + # make sure there are no lingering messages... + # if so we may be leaking mem... + thing = receive do + x -> x + after + 1 -> + nil + end + assert thing == nil + end + end From 052c430042ce80f25262a5f006a1378a75af5546 Mon Sep 17 00:00:00 2001 From: Jason Goldberger Date: Tue, 16 Aug 2016 22:15:42 -0700 Subject: [PATCH 2/2] updated readme and bumped minor version --- README.md | 2 +- mix.exs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index ffef781..efdd2fd 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ If [available in Hex](https://hex.pm/docs/publish), the package can be installed ```elixir def deps do - [{:jobbit, "~> 0.2.0"}] + [{:jobbit, "~> 0.3.0"}] end ``` diff --git a/mix.exs b/mix.exs index 04d0722..a460a49 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule Jobbit.Mixfile do def project do [ app: :jobbit, - version: "0.2.0", + version: "0.3.0", elixir: "~> 1.3", build_embedded: Mix.env == :prod, start_permanent: Mix.env == :prod,