Skip to content

Commit

Permalink
feat: Support Invoke
Browse files Browse the repository at this point in the history
Add support for `step.invoke`

---------
Co-authored-by: Darwin D Wu <[email protected]>
  • Loading branch information
darwin67 authored May 28, 2024
1 parent 3d0de75 commit 70a08f2
Show file tree
Hide file tree
Showing 12 changed files with 227 additions and 7 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@ All notable changes to this project will be documented in this file.

## [unreleased]

### Features

- Feat: Add support for invoke

### Miscellaneous Tasks

- Chore: replace nix shell with flake
- Chore: replace nix shell with flake (#81)

## [0.2.1] - 2024-05-15

Expand Down
1 change: 1 addition & 0 deletions flake.nix
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
nativeBuildInputs = with pkgs; [
elixir
erlang_27
nodejs_18 # need this to install inngest-cli for now

# LSPs
elixir-ls
Expand Down
19 changes: 19 additions & 0 deletions lib/inngest/error.ex
Original file line number Diff line number Diff line change
@@ -1,3 +1,22 @@
defmodule Inngest.Error do
@moduledoc """
A generic Inngest Error.
Used to format errors into a structure that can be parsed by the UI
"""

defstruct [:error, stack: nil]
end

defimpl Jason.Encoder, for: Inngest.Error do
def encode(value, opts) do
error = Map.get(value, :error)
stacktrace = Exception.format(:error, error, Map.get(value, :stack))

%{name: error.__struct__, message: error.message, stack: stacktrace}
|> Jason.Encode.map(opts)
end
end

defmodule Inngest.NonRetriableError do
@moduledoc """
Error signaling to not retry
Expand Down
3 changes: 2 additions & 1 deletion lib/inngest/response.ex
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ defmodule Inngest.SdkResponse do
status = if retry, do: 500, else: 400

encoded =
case Exception.format(:error, error, stacktrace) |> Jason.encode() do
case %Inngest.Error{error: error, stack: stacktrace}
|> Jason.encode() do
{:ok, encoded} -> encoded
{:error, _} -> "Failed to encode error: #{error}"
end
Expand Down
2 changes: 1 addition & 1 deletion lib/inngest/router/invoke.ex
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ defmodule Inngest.Router.Invoke do
retry in Inngest.RetryAfterError ->
delay = Map.get(retry, :seconds)

SdkResponse.from_result({:error, retry.message},
SdkResponse.from_result({:error, retry},
retry: delay,
stacktrace: __STACKTRACE__
)
Expand Down
49 changes: 49 additions & 0 deletions lib/inngest/step_tool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,55 @@ defmodule Inngest.StepTool do
end
end

@spec invoke(Context.t(), binary(), map()) :: map()
def invoke(%{steps: steps} = ctx, step_id, opts) do
op = UnhashedOp.new(ctx, "InvokeFunction", step_id, opts)
hashed_id = UnhashedOp.hash(op)

case Map.get(steps, hashed_id) do
nil ->
func = Map.get(opts, :function)
data = Map.get(opts, :data)
timeout = Map.get(opts, :timeout)
v = Map.get(opts, :v)

generator_otps =
if Map.has_key?(opts, :timeout) do
%{
function_id: func.slug(),
payload: %{data: data, v: v},
timeout: timeout
}
else
%{
function_id: func.slug(),
payload: %{data: data, v: v}
}
end

throw(%GeneratorOpCode{
id: hashed_id,
name: step_id,
display_name: step_id,
op: op.op,
opts: generator_otps
})

%{
"error" => %{
"name" => "InngestInvokeTimeoutError",
"error" => message,
"message" => _
}
} ->
raise Inngest.NonRetriableError, message: message

# return value if found
val ->
val
end
end

def send_event(%{steps: steps} = ctx, step_id, events) do
op = UnhashedOp.new(ctx, "Step", step_id)
hashed_id = UnhashedOp.hash(op)
Expand Down
8 changes: 6 additions & 2 deletions test/inngest/function/cases/failure_handler_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ defmodule Inngest.Function.Cases.RetriableTest do
"data" => [
%{
"run_id" => _run_id,
"output" => stacktrace,
"output" => %{
"message" => message,
"name" => "Elixir.Inngest.RetryAfterError",
"stack" => _
},
"status" => "Failed"
}
]
}} = DevServer.run_ids(event_id)

assert stacktrace =~ "YOLO!!!"
assert message == "YOLO!!!"

{:ok, %{"data" => events}} = DevServer.list_events()

Expand Down
25 changes: 25 additions & 0 deletions test/inngest/function/cases/invoke_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
defmodule Inngest.Function.Cases.InvokeTest do
use ExUnit.Case, async: true

alias Inngest.Test.DevServer
import Inngest.Test.Helper

@default_sleep 5_000

@tag :integration
test "should run successfully" do
event_id = send_test_event("test/invoke.caller")
Process.sleep(@default_sleep)

assert {:ok,
%{
"data" => [
%{
"output" => %{"data" => "INVOKED!"},
"run_id" => _,
"status" => "Completed"
}
]
}} = DevServer.run_ids(event_id)
end
end
36 changes: 36 additions & 0 deletions test/inngest/function/cases/invoke_timeout_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
defmodule Inngest.Function.Cases.InvokeTimeoutTest do
use ExUnit.Case, async: true

alias Inngest.Test.DevServer
import Inngest.Test.Helper

@default_sleep 5_000

@tag :integration
test "should fail with timeout error" do
event_id = send_test_event("test/invoke.timeout.caller")
Process.sleep(@default_sleep)

assert {
:ok,
%{
"data" => [
%{
"output" => %{
"name" => error,
"message" => message,
"stack" => _
},
"run_id" => _,
"status" => "Failed"
}
]
}
} = DevServer.run_ids(event_id)

assert error == "Elixir.Inngest.NonRetriableError"

assert message ==
"InngestInvokeTimeoutError: Timed out waiting for invoked function to complete"
end
end
8 changes: 6 additions & 2 deletions test/inngest/function/cases/no_retry_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@ defmodule Inngest.Function.Cases.NoRetryTest do
%{
"data" => [
%{
"output" => stacktrace,
"output" => %{
"message" => message,
"name" => "Elixir.Inngest.NonRetriableError",
"stack" => _
},
"run_id" => _run_id,
"status" => "Failed"
}
]
}} = DevServer.run_ids(event_id)

assert stacktrace =~ "not retrying!"
assert message == "not retrying!"
end
end
38 changes: 38 additions & 0 deletions test/support/cases/invoke_fn.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
defmodule Inngest.Test.Case.InvokeCallerFn do
@moduledoc false

use Inngest.Function

@func %FnOpts{id: "invoke-caller", name: "Invoke Caller"}
@trigger %Trigger{event: "test/invoke.caller"}

@impl true
def exec(ctx, %{step: step} = _args) do
_ = step.run(ctx, "step-1", fn -> %{hello: "world"} end)

res =
step.invoke(ctx, "caller", %{
function: Inngest.Test.Case.InvokedFn,
data: %{yolo: true},
timeout: "10s"
})

{:ok, res}
end
end

defmodule Inngest.Test.Case.InvokedFn do
@moduledoc false

use Inngest.Function

@func %FnOpts{id: "invoke-target", name: "Invoked"}
@trigger %Trigger{event: "test/invoked"}

@impl true
def exec(ctx, %{step: step} = _args) do
_ = step.run(ctx, "invoked", fn -> "YO!" end)

{:ok, "INVOKED!"}
end
end
39 changes: 39 additions & 0 deletions test/support/cases/invoke_timeout_fn.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
defmodule Inngest.Test.Case.InvokeTimeoutCallerFn do
@moduledoc false

use Inngest.Function

@func %FnOpts{id: "invoke-timeout-caller", name: "Invoke Timeout Caller"}
@trigger %Trigger{event: "test/invoke.timeout.caller"}

@impl true
def exec(ctx, %{step: step} = _args) do
_ = step.run(ctx, "step-1", fn -> %{hello: "world"} end)

_ =
step.invoke(ctx, "caller", %{
function: Inngest.Test.Case.InvokedLongFn,
data: %{yolo: true},
timeout: "1s"
})

{:ok, "TIMED OUT"}
end
end

defmodule Inngest.Test.Case.InvokedLongFn do
@moduledoc false

use Inngest.Function

@func %FnOpts{id: "invoke-long-target", name: "Invoked Long"}
@trigger %Trigger{event: "test/invoked.long"}

@impl true
def exec(ctx, %{step: step} = _args) do
_ = step.sleep(ctx, "sleep", "5s")
_ = step.run(ctx, "invoked", fn -> "YO!" end)

{:ok, "INVOKED!"}
end
end

0 comments on commit 70a08f2

Please sign in to comment.