diff --git a/lib/inngest/function.ex b/lib/inngest/function.ex index 99dd1f1..f890367 100644 --- a/lib/inngest/function.ex +++ b/lib/inngest/function.ex @@ -76,11 +76,6 @@ defmodule Inngest.Function do """ @callback exec(Context.t(), Input.t()) :: {:ok, any()} | {:error, any()} - @doc """ - The method to be callbed when the Inngest function fails - """ - @callback on_failure(Context.t(), Input.t()) :: {:ok, any()} | {:error, any()} - defmacro __using__(_opts) do quote location: :keep do alias Inngest.{Client, Trigger} @@ -120,31 +115,63 @@ defmodule Inngest.Function do |> List.first() end - @impl true - def on_failure(_ctx, _input), do: {:ok, "noop"} - - def step(path), - do: %{ - step: %Step{ - id: :step, - name: "step", - runtime: %Step.RunTime{ - url: "#{Config.app_host() <> path}?fnId=#{slug()}&step=step" - }, - retries: %Step.Retry{ - attempts: retries() - } - } - } + def slugs() do + failure = if failure_handler_defined?(__MODULE__), do: [failure_slug()], else: [] + [slug()] ++ failure + end def serve(path) do - %{ - id: slug(), - name: name(), - triggers: [trigger()], - steps: step(path), - mod: __MODULE__ - } + handler = + if failure_handler_defined?(__MODULE__) do + id = failure_slug() + + [ + %{ + id: id, + name: "#{name()} (failure)", + triggers: [ + %Trigger{ + event: "inngest/function.failed", + expression: "event.data.function_id == \"#{slug()}\"" + } + ], + steps: %{ + step: %Step{ + id: :step, + name: "step", + runtime: %Step.RunTime{ + url: "#{Config.app_host() <> path}?fnId=#{id}&step=step" + }, + retries: %Step.Retry{ + attempts: 0 + } + } + } + } + ] + else + [] + end + + [ + %{ + id: slug(), + name: name(), + triggers: [trigger()], + steps: %{ + step: %Step{ + id: :step, + name: "step", + runtime: %Step.RunTime{ + url: "#{Config.app_host() <> path}?fnId=#{slug()}&step=step" + }, + retries: %Step.Retry{ + attempts: retries() + } + } + } + } + ] ++ handler end defp retries() do @@ -156,6 +183,12 @@ defmodule Inngest.Function do retry -> retry end end + + defp failure_handler_defined?(mod) do + mod.__info__(:functions) |> Keyword.get(:handle_failure) == 2 + end + + defp failure_slug(), do: "#{slug()}-failure" end end diff --git a/lib/inngest/router/helper.ex b/lib/inngest/router/helper.ex index e8c7a1c..1d5587a 100644 --- a/lib/inngest/router/helper.ex +++ b/lib/inngest/router/helper.ex @@ -3,15 +3,6 @@ defmodule Inngest.Router.Helper do alias Inngest.Config - @spec func_map(list(), binary()) :: map() - def func_map(funcs, path) do - funcs - |> Enum.reduce(%{}, fn func, x -> - slug = func.slug() - Map.put(x, slug, func.serve(path)) - end) - end - def load_functions(params) do if Config.path_runtime_eval() do %{funcs: funcs} = load_functions_from_path(params) diff --git a/lib/inngest/router/invoke.ex b/lib/inngest/router/invoke.ex index 6bcfa4d..a23b3e7 100644 --- a/lib/inngest/router/invoke.ex +++ b/lib/inngest/router/invoke.ex @@ -35,14 +35,15 @@ defmodule Inngest.Router.Invoke do end defp exec( - %{request_path: path, private: %{raw_body: [body]}} = conn, + %{private: %{raw_body: [body]}} = conn, %{"event" => event, "events" => events, "ctx" => ctx, "fnId" => fn_slug} = params ) do func = params |> load_functions() - |> func_map(path) - |> Map.get(fn_slug) + |> Enum.find(fn func -> + Enum.member?(func.slugs(), fn_slug) + end) ctx = %Inngest.Function.Context{ attempt: Map.get(ctx, "attempt", 0), @@ -83,7 +84,11 @@ defmodule Inngest.Router.Invoke do defp invoke(func, ctx, input) do try do - func.mod.exec(ctx, input) |> SdkResponse.from_result([]) + if is_failure?(input) do + func.handle_failure(ctx, input) |> SdkResponse.from_result([]) + else + func.exec(ctx, input) |> SdkResponse.from_result([]) + end rescue non_retry in Inngest.NonRetriableError -> SdkResponse.from_result({:error, non_retry}, retry: false, stacktrace: __STACKTRACE__) @@ -140,4 +145,7 @@ defmodule Inngest.Router.Invoke do {:error, error} end end + + defp is_failure?(%{event: %{name: "inngest/function.failed"}} = _input), do: true + defp is_failure?(_), do: false end diff --git a/lib/inngest/router/register.ex b/lib/inngest/router/register.ex index 863e979..df9ea88 100644 --- a/lib/inngest/router/register.ex +++ b/lib/inngest/router/register.ex @@ -22,7 +22,7 @@ defmodule Inngest.Router.Register do funcs = params |> load_functions() - |> func_map(path) + |> Enum.flat_map(& &1.serve(path)) {status, resp} = case register(path, funcs, framework: framework) do @@ -54,7 +54,7 @@ defmodule Inngest.Router.Register do sdk: Config.sdk_version(), framework: framework, appName: Config.app_name(), - functions: functions |> Enum.map(fn {_, v} -> v.mod.serve(path) end) + functions: functions } key = Inngest.Signature.hashed_signing_key(Config.signing_key()) diff --git a/test/inngest/function/cases/failure_handler_test.exs b/test/inngest/function/cases/failure_handler_test.exs new file mode 100644 index 0000000..5c06ef8 --- /dev/null +++ b/test/inngest/function/cases/failure_handler_test.exs @@ -0,0 +1,51 @@ +defmodule Inngest.Function.Cases.RetriableTest do + use ExUnit.Case, async: true + + alias Inngest.Test.DevServer + import Inngest.Test.Helper + + @default_sleep 10_000 + @event_name "test/plug.retriable" + + @tag :integration + test "should fail after retrying and failure is handled" do + event_id = send_test_event(@event_name) + Process.sleep(@default_sleep) + + assert {:ok, + %{ + "data" => [ + %{ + "run_id" => _run_id, + "output" => %{ + "error" => "invalid status code: 500", + "message" => stacktrace + }, + "status" => "Failed" + } + ] + }} = DevServer.run_ids(event_id) + + assert stacktrace =~ "YOLO!!!" + + {:ok, %{"data" => events}} = DevServer.list_events() + + assert %{"id" => failed_id} = + events + |> Enum.find(fn evt -> + Map.get(evt, "name") == "inngest/function.failed" && + get_in(evt, ["data", "event", "name"]) == @event_name + end) + + assert {:ok, + %{ + "data" => [ + %{ + "run_id" => _, + "output" => "error handled", + "status" => "Completed" + } + ] + }} = DevServer.run_ids(failed_id) + end +end diff --git a/test/inngest/function/cases/retriable_test.exs b/test/inngest/function/cases/retriable_test.exs deleted file mode 100644 index 3de337b..0000000 --- a/test/inngest/function/cases/retriable_test.exs +++ /dev/null @@ -1,30 +0,0 @@ -defmodule Inngest.Function.Cases.RetriableTest do - use ExUnit.Case, async: true - - alias Inngest.Test.DevServer - import Inngest.Test.Helper - - @default_sleep 10_000 - - @tag :integration - test "should fail after retrying" do - event_id = send_test_event("test/plug.retriable") - Process.sleep(@default_sleep) - - assert {:ok, - %{ - "data" => [ - %{ - "run_id" => _run_id, - "output" => %{ - "error" => "invalid status code: 500", - "message" => stacktrace - }, - "status" => "Failed" - } - ] - }} = DevServer.run_ids(event_id) - - assert stacktrace =~ "YOLO!!!" - end -end diff --git a/test/inngest/function_test.exs b/test/inngest/function_test.exs index cbbfa32..6a90693 100644 --- a/test/inngest/function_test.exs +++ b/test/inngest/function_test.exs @@ -29,36 +29,40 @@ defmodule Inngest.FunctionTest do describe "serve/1" do test "event function should return approprivate map" do - assert %{ - id: "test-event", - name: "App / Email: Awesome Event Func", - triggers: [ - %Trigger{event: "my/awesome.event"} - ], - steps: %{ - step: %{ - id: _, - name: _, - runtime: %{ - type: "http", - url: _ - }, - retries: %{ - attempts: _ + assert [ + %{ + id: "test-event", + name: "App / Email: Awesome Event Func", + triggers: [ + %Trigger{event: "my/awesome.event"} + ], + steps: %{ + step: %{ + id: _, + name: _, + runtime: %{ + type: "http", + url: _ + }, + retries: %{ + attempts: _ + } } } } - } = TestEventFn.serve("/api/inngest") + ] = TestEventFn.serve("/api/inngest") end test "cron function should return appropriate map" do - assert %{ - id: "test-cron", - name: "Awesome Cron Func", - triggers: [ - %Trigger{cron: "TZ=America/Los_Angeles * * * * *"} - ] - } = TestCronFn.serve("/api/inngest") + assert [ + %{ + id: "test-cron", + name: "Awesome Cron Func", + triggers: [ + %Trigger{cron: "TZ=America/Los_Angeles * * * * *"} + ] + } + ] = TestCronFn.serve("/api/inngest") end end diff --git a/test/inngest/router/helper_test.exs b/test/inngest/router/helper_test.exs index aff0ab6..fd345c9 100644 --- a/test/inngest/router/helper_test.exs +++ b/test/inngest/router/helper_test.exs @@ -3,33 +3,6 @@ defmodule Inngest.Router.HelperTest do alias Inngest.Router.Helper - describe "func_map/2" do - test "should return a function map" do - path = "/api/inngest" - funcs = [Inngest.TestEventFn] - - assert %{ - "test-event" => %{ - id: "test-event", - mod: Inngest.TestEventFn, - steps: %{ - step: %Inngest.Function.Step{ - id: :step, - name: "step" - } - }, - triggers: [ - %Inngest.Trigger{ - event: "my/awesome.event", - expression: nil, - cron: nil - } - ] - } - } = Helper.func_map(funcs, path) - end - end - describe "load_functions_from_path/1" do @path "test/support/**/*.ex" @paths ["dev/**/*.ex", @path] diff --git a/test/support/cases/retriable_error.ex b/test/support/cases/retriable_error.ex index 2e54883..35232d6 100644 --- a/test/support/cases/retriable_error.ex +++ b/test/support/cases/retriable_error.ex @@ -24,4 +24,13 @@ defmodule Inngest.Test.Case.RetriableError do {:ok, "completed"} end + + def handle_failure(ctx, %{step: step} = _args) do + _ = + step.run(ctx, "handle-failure", fn -> + "CATCH ERROR!!!" + end) + + {:ok, "error handled"} + end end diff --git a/test/support/dev_server.ex b/test/support/dev_server.ex index dd55691..db68eea 100644 --- a/test/support/dev_server.ex +++ b/test/support/dev_server.ex @@ -40,6 +40,12 @@ defmodule Inngest.Test.DevServer do |> parse_resp() end + def list_events() do + client() + |> Tesla.get("/v1/events") + |> parse_resp() + end + defp client() do middleware = [ {Tesla.Middleware.BaseUrl, @base_url},