From 70a08f27a7b78e40c1c0d50e121751e0fd8ce12a Mon Sep 17 00:00:00 2001 From: Darwin <5746693+darwin67@users.noreply.github.com> Date: Mon, 27 May 2024 23:41:14 -0700 Subject: [PATCH] feat: Support Invoke Add support for `step.invoke` --------- Co-authored-by: Darwin D Wu --- CHANGELOG.md | 6 ++- flake.nix | 1 + lib/inngest/error.ex | 19 +++++++ lib/inngest/response.ex | 3 +- lib/inngest/router/invoke.ex | 2 +- lib/inngest/step_tool.ex | 49 +++++++++++++++++++ .../function/cases/failure_handler_test.exs | 8 ++- test/inngest/function/cases/invoke_test.exs | 25 ++++++++++ .../function/cases/invoke_timeout_test.exs | 36 ++++++++++++++ test/inngest/function/cases/no_retry_test.exs | 8 ++- test/support/cases/invoke_fn.ex | 38 ++++++++++++++ test/support/cases/invoke_timeout_fn.ex | 39 +++++++++++++++ 12 files changed, 227 insertions(+), 7 deletions(-) create mode 100644 test/inngest/function/cases/invoke_test.exs create mode 100644 test/inngest/function/cases/invoke_timeout_test.exs create mode 100644 test/support/cases/invoke_fn.ex create mode 100644 test/support/cases/invoke_timeout_fn.ex diff --git a/CHANGELOG.md b/CHANGELOG.md index 9482c7e..5c050d9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,9 +4,13 @@ All notable changes to this project will be documented in this file. ## [unreleased] +### Features + +- Feat: Add support for invoke + ### Miscellaneous Tasks -- Chore: replace nix shell with flake +- Chore: replace nix shell with flake (#81) ## [0.2.1] - 2024-05-15 diff --git a/flake.nix b/flake.nix index 846e075..30a314e 100644 --- a/flake.nix +++ b/flake.nix @@ -14,6 +14,7 @@ nativeBuildInputs = with pkgs; [ elixir erlang_27 + nodejs_18 # need this to install inngest-cli for now # LSPs elixir-ls diff --git a/lib/inngest/error.ex b/lib/inngest/error.ex index f917cbe..eb9879f 100644 --- a/lib/inngest/error.ex +++ b/lib/inngest/error.ex @@ -1,3 +1,22 @@ +defmodule Inngest.Error do + @moduledoc """ + A generic Inngest Error. + Used to format errors into a structure that can be parsed by the UI + """ + + defstruct [:error, stack: nil] +end + +defimpl Jason.Encoder, for: Inngest.Error do + def encode(value, opts) do + error = Map.get(value, :error) + stacktrace = Exception.format(:error, error, Map.get(value, :stack)) + + %{name: error.__struct__, message: error.message, stack: stacktrace} + |> Jason.Encode.map(opts) + end +end + defmodule Inngest.NonRetriableError do @moduledoc """ Error signaling to not retry diff --git a/lib/inngest/response.ex b/lib/inngest/response.ex index 9ad1c43..5ef320f 100644 --- a/lib/inngest/response.ex +++ b/lib/inngest/response.ex @@ -54,7 +54,8 @@ defmodule Inngest.SdkResponse do status = if retry, do: 500, else: 400 encoded = - case Exception.format(:error, error, stacktrace) |> Jason.encode() do + case %Inngest.Error{error: error, stack: stacktrace} + |> Jason.encode() do {:ok, encoded} -> encoded {:error, _} -> "Failed to encode error: #{error}" end diff --git a/lib/inngest/router/invoke.ex b/lib/inngest/router/invoke.ex index 80df954..f7b0e2d 100644 --- a/lib/inngest/router/invoke.ex +++ b/lib/inngest/router/invoke.ex @@ -97,7 +97,7 @@ defmodule Inngest.Router.Invoke do retry in Inngest.RetryAfterError -> delay = Map.get(retry, :seconds) - SdkResponse.from_result({:error, retry.message}, + SdkResponse.from_result({:error, retry}, retry: delay, stacktrace: __STACKTRACE__ ) diff --git a/lib/inngest/step_tool.ex b/lib/inngest/step_tool.ex index b17be20..4460af5 100644 --- a/lib/inngest/step_tool.ex +++ b/lib/inngest/step_tool.ex @@ -117,6 +117,55 @@ defmodule Inngest.StepTool do end end + @spec invoke(Context.t(), binary(), map()) :: map() + def invoke(%{steps: steps} = ctx, step_id, opts) do + op = UnhashedOp.new(ctx, "InvokeFunction", step_id, opts) + hashed_id = UnhashedOp.hash(op) + + case Map.get(steps, hashed_id) do + nil -> + func = Map.get(opts, :function) + data = Map.get(opts, :data) + timeout = Map.get(opts, :timeout) + v = Map.get(opts, :v) + + generator_otps = + if Map.has_key?(opts, :timeout) do + %{ + function_id: func.slug(), + payload: %{data: data, v: v}, + timeout: timeout + } + else + %{ + function_id: func.slug(), + payload: %{data: data, v: v} + } + end + + throw(%GeneratorOpCode{ + id: hashed_id, + name: step_id, + display_name: step_id, + op: op.op, + opts: generator_otps + }) + + %{ + "error" => %{ + "name" => "InngestInvokeTimeoutError", + "error" => message, + "message" => _ + } + } -> + raise Inngest.NonRetriableError, message: message + + # return value if found + val -> + val + end + end + def send_event(%{steps: steps} = ctx, step_id, events) do op = UnhashedOp.new(ctx, "Step", step_id) hashed_id = UnhashedOp.hash(op) diff --git a/test/inngest/function/cases/failure_handler_test.exs b/test/inngest/function/cases/failure_handler_test.exs index 0b1b6b5..1ff476b 100644 --- a/test/inngest/function/cases/failure_handler_test.exs +++ b/test/inngest/function/cases/failure_handler_test.exs @@ -17,13 +17,17 @@ defmodule Inngest.Function.Cases.RetriableTest do "data" => [ %{ "run_id" => _run_id, - "output" => stacktrace, + "output" => %{ + "message" => message, + "name" => "Elixir.Inngest.RetryAfterError", + "stack" => _ + }, "status" => "Failed" } ] }} = DevServer.run_ids(event_id) - assert stacktrace =~ "YOLO!!!" + assert message == "YOLO!!!" {:ok, %{"data" => events}} = DevServer.list_events() diff --git a/test/inngest/function/cases/invoke_test.exs b/test/inngest/function/cases/invoke_test.exs new file mode 100644 index 0000000..3ba4622 --- /dev/null +++ b/test/inngest/function/cases/invoke_test.exs @@ -0,0 +1,25 @@ +defmodule Inngest.Function.Cases.InvokeTest do + use ExUnit.Case, async: true + + alias Inngest.Test.DevServer + import Inngest.Test.Helper + + @default_sleep 5_000 + + @tag :integration + test "should run successfully" do + event_id = send_test_event("test/invoke.caller") + Process.sleep(@default_sleep) + + assert {:ok, + %{ + "data" => [ + %{ + "output" => %{"data" => "INVOKED!"}, + "run_id" => _, + "status" => "Completed" + } + ] + }} = DevServer.run_ids(event_id) + end +end diff --git a/test/inngest/function/cases/invoke_timeout_test.exs b/test/inngest/function/cases/invoke_timeout_test.exs new file mode 100644 index 0000000..39d6cbf --- /dev/null +++ b/test/inngest/function/cases/invoke_timeout_test.exs @@ -0,0 +1,36 @@ +defmodule Inngest.Function.Cases.InvokeTimeoutTest do + use ExUnit.Case, async: true + + alias Inngest.Test.DevServer + import Inngest.Test.Helper + + @default_sleep 5_000 + + @tag :integration + test "should fail with timeout error" do + event_id = send_test_event("test/invoke.timeout.caller") + Process.sleep(@default_sleep) + + assert { + :ok, + %{ + "data" => [ + %{ + "output" => %{ + "name" => error, + "message" => message, + "stack" => _ + }, + "run_id" => _, + "status" => "Failed" + } + ] + } + } = DevServer.run_ids(event_id) + + assert error == "Elixir.Inngest.NonRetriableError" + + assert message == + "InngestInvokeTimeoutError: Timed out waiting for invoked function to complete" + end +end diff --git a/test/inngest/function/cases/no_retry_test.exs b/test/inngest/function/cases/no_retry_test.exs index 4cd8372..2441f73 100644 --- a/test/inngest/function/cases/no_retry_test.exs +++ b/test/inngest/function/cases/no_retry_test.exs @@ -15,13 +15,17 @@ defmodule Inngest.Function.Cases.NoRetryTest do %{ "data" => [ %{ - "output" => stacktrace, + "output" => %{ + "message" => message, + "name" => "Elixir.Inngest.NonRetriableError", + "stack" => _ + }, "run_id" => _run_id, "status" => "Failed" } ] }} = DevServer.run_ids(event_id) - assert stacktrace =~ "not retrying!" + assert message == "not retrying!" end end diff --git a/test/support/cases/invoke_fn.ex b/test/support/cases/invoke_fn.ex new file mode 100644 index 0000000..bb496e4 --- /dev/null +++ b/test/support/cases/invoke_fn.ex @@ -0,0 +1,38 @@ +defmodule Inngest.Test.Case.InvokeCallerFn do + @moduledoc false + + use Inngest.Function + + @func %FnOpts{id: "invoke-caller", name: "Invoke Caller"} + @trigger %Trigger{event: "test/invoke.caller"} + + @impl true + def exec(ctx, %{step: step} = _args) do + _ = step.run(ctx, "step-1", fn -> %{hello: "world"} end) + + res = + step.invoke(ctx, "caller", %{ + function: Inngest.Test.Case.InvokedFn, + data: %{yolo: true}, + timeout: "10s" + }) + + {:ok, res} + end +end + +defmodule Inngest.Test.Case.InvokedFn do + @moduledoc false + + use Inngest.Function + + @func %FnOpts{id: "invoke-target", name: "Invoked"} + @trigger %Trigger{event: "test/invoked"} + + @impl true + def exec(ctx, %{step: step} = _args) do + _ = step.run(ctx, "invoked", fn -> "YO!" end) + + {:ok, "INVOKED!"} + end +end diff --git a/test/support/cases/invoke_timeout_fn.ex b/test/support/cases/invoke_timeout_fn.ex new file mode 100644 index 0000000..7bd02c7 --- /dev/null +++ b/test/support/cases/invoke_timeout_fn.ex @@ -0,0 +1,39 @@ +defmodule Inngest.Test.Case.InvokeTimeoutCallerFn do + @moduledoc false + + use Inngest.Function + + @func %FnOpts{id: "invoke-timeout-caller", name: "Invoke Timeout Caller"} + @trigger %Trigger{event: "test/invoke.timeout.caller"} + + @impl true + def exec(ctx, %{step: step} = _args) do + _ = step.run(ctx, "step-1", fn -> %{hello: "world"} end) + + _ = + step.invoke(ctx, "caller", %{ + function: Inngest.Test.Case.InvokedLongFn, + data: %{yolo: true}, + timeout: "1s" + }) + + {:ok, "TIMED OUT"} + end +end + +defmodule Inngest.Test.Case.InvokedLongFn do + @moduledoc false + + use Inngest.Function + + @func %FnOpts{id: "invoke-long-target", name: "Invoked Long"} + @trigger %Trigger{event: "test/invoked.long"} + + @impl true + def exec(ctx, %{step: step} = _args) do + _ = step.sleep(ctx, "sleep", "5s") + _ = step.run(ctx, "invoked", fn -> "YO!" end) + + {:ok, "INVOKED!"} + end +end