Skip to content

Commit

Permalink
Fix bug handling error response in tool submission
Browse files Browse the repository at this point in the history
If openai returns an error response when we submit tool output, we will
retry up to 2 more times and then publish a failure event.
  • Loading branch information
jwilger committed Mar 2, 2024
1 parent c8bd228 commit d1c08c0
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 10 deletions.
4 changes: 4 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ case config_env() do
config :gpt_agent, :rate_limit_retry_delay, 30_000
config :gpt_agent, :rate_limit_max_retries, 10

config :gpt_agent, :tool_output_retry_delay, 1_000

:test ->
config :open_ai_client, :openai_api_key, "test"
config :open_ai_client, :openai_organization_id, "test"
Expand All @@ -20,6 +22,8 @@ case config_env() do
config :gpt_agent, :rate_limit_retry_delay, 100
config :gpt_agent, :rate_limit_max_retries, 2

config :gpt_agent, :tool_output_retry_delay, 0

_ ->
nil
end
44 changes: 35 additions & 9 deletions lib/gpt_agent.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,18 @@ defmodule GptAgent do
RunFailed,
RunStarted,
ToolCallOutputRecorded,
ToolCallOutputSubmissionFailed,
ToolCallRequested,
UserMessageAdded
}

# two minutes
@timeout_ms 120_000

@rate_limit_max_retries Application.compile_env(:gpt_agent, :rate_limit_max_retries, 0)
@rate_limit_retry_delay Application.compile_env(:gpt_agent, :rate_limit_retry_delay, 0)
@rate_limit_max_retries Application.compile_env(:gpt_agent, :rate_limit_max_retries, 10)
@rate_limit_retry_delay Application.compile_env(:gpt_agent, :rate_limit_retry_delay, 30_000)

@tool_output_retry_delay Application.compile_env(:gpt_agent, :tool_output_retry_delay, 1000)

typedstruct do
field :assistant_id, Types.assistant_id(), enforce: true
Expand Down Expand Up @@ -308,23 +311,46 @@ defmodule GptAgent do
end
end

defp possibly_send_outputs_to_openai(state, failure_count \\ 0)

defp possibly_send_outputs_to_openai(state, failure_count) when failure_count >= 3 do
log("Failed to send tool outputs to OpenAI after 3 attempts, giving up", :warning)

state
|> publish_event(
ToolCallOutputSubmissionFailed.new!(
thread_id: state.thread_id,
run_id: state.run_id
)
)
end

defp possibly_send_outputs_to_openai(
%__MODULE__{running?: true, tool_calls: [], tool_outputs: [_ | _]} = state
%__MODULE__{running?: true, tool_calls: [], tool_outputs: [_ | _]} = state,
failure_count
) do
log("Sending tool outputs to OpenAI")

{:ok, %{body: %{"object" => "thread.run", "cancelled_at" => nil, "failed_at" => nil}}} =
OpenAiClient.post("/v1/threads/#{state.thread_id}/runs/#{state.run_id}/submit_tool_outputs",
json: %{tool_outputs: state.tool_outputs},
receive_timeout: receive_timeout_ms(state)
)
try do
{:ok, %{body: %{"object" => "thread.run", "cancelled_at" => nil, "failed_at" => nil}}} =
OpenAiClient.post(
"/v1/threads/#{state.thread_id}/runs/#{state.run_id}/submit_tool_outputs",
json: %{tool_outputs: state.tool_outputs},
receive_timeout: receive_timeout_ms(state)
)
rescue
exception ->
log("Failed to send tool outputs to OpenAI: #{inspect(exception)}", :warning)
:timer.sleep(@tool_output_retry_delay)
possibly_send_outputs_to_openai(state, failure_count + 1)
end

Process.send_after(self(), {:check_run_status, state.run_id}, heartbeat_interval_ms())

%{state | tool_outputs: []}
end

defp possibly_send_outputs_to_openai(%__MODULE__{} = state), do: state
defp possibly_send_outputs_to_openai(%__MODULE__{} = state, _failure_count), do: state

@impl true
def handle_call(:run_in_progress?, _caller, %__MODULE__{} = state) do
Expand Down
14 changes: 14 additions & 0 deletions lib/gpt_agent/events/tool_call_output_submission_failed.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
defmodule GptAgent.Events.ToolCallOutputSubmissionFailed do
@moduledoc """
The GPT Assistant has recorded a tool call output
"""

use GptAgent.Types
alias GptAgent.Types

@derive Jason.Encoder
typedstruct enforce: true do
field :thread_id, Types.thread_id()
field :run_id, Types.run_id()
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule GptAgent.MixProject do
def project do
[
app: :gpt_agent,
version: "9.2.2",
version: "9.2.3",
elixir: "~> 1.16",
start_permanent: Mix.env() == :prod,
aliases: aliases(),
Expand Down
104 changes: 104 additions & 0 deletions test/gpt_agent_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ defmodule GptAgentTest do
RunFailed,
RunStarted,
ToolCallOutputRecorded,
ToolCallOutputSubmissionFailed,
ToolCallRequested,
UserMessageAdded
}
Expand Down Expand Up @@ -1635,6 +1636,109 @@ defmodule GptAgentTest do

assert_receive {^pid, %RunCompleted{}}, 5_000
end

@tag capture_log: true
test "error response from OpenAI will be retried up to 3 times",
%{
bypass: bypass,
assistant_id: assistant_id,
thread_id: thread_id,
run_id: run_id
} do
{:ok, pid} =
GptAgent.connect(thread_id: thread_id, last_message_id: nil, assistant_id: assistant_id)

tool_id = UUID.uuid4()

Bypass.stub(bypass, "GET", "/v1/threads/#{thread_id}/runs/#{run_id}", fn conn ->
conn
|> Plug.Conn.put_resp_content_type("application/json")
|> Plug.Conn.resp(
200,
Jason.encode!(%{
"id" => run_id,
"object" => "thread.run",
"created_at" => 1_699_075_072,
"assistant_id" => assistant_id,
"thread_id" => thread_id,
"status" => "requires_action",
"required_action" => %{
"type" => "submit_tool_outputs",
"submit_tool_outputs" => %{
"tool_calls" => [
%{
"id" => tool_id,
"type" => "function",
"function" => %{"name" => "tool_1", "arguments" => ~s({"foo":"bar","baz":1})}
}
]
}
},
"started_at" => 1_699_075_072,
"expires_at" => nil,
"cancelled_at" => nil,
"failed_at" => nil,
"completed_at" => 1_699_075_073,
"last_error" => nil,
"model" => "gpt-4-1106-preview",
"instructions" => nil,
"tools" => [],
"file_ids" => [],
"metadata" => %{}
})
)
end)

:ok = GptAgent.add_user_message(pid, Faker.Lorem.sentence())

assert_receive {^pid,
%ToolCallRequested{
id: ^tool_id,
thread_id: ^thread_id,
run_id: ^run_id,
name: "tool_1",
arguments: %{"foo" => "bar", "baz" => 1}
}},
5_000

{:ok, call_counter_pid} = Agent.start_link(fn -> 0 end)

Bypass.expect(
bypass,
"POST",
"/v1/threads/#{thread_id}/runs/#{run_id}/submit_tool_outputs",
fn conn ->
assert Agent.get_and_update(call_counter_pid, &{&1, &1 + 1}) < 3

{:ok, body, conn} = Plug.Conn.read_body(conn)
body = Jason.decode!(body)
assert Map.keys(body) == ["tool_outputs"]
assert length(body["tool_outputs"]) == 1

assert %{"tool_call_id" => tool_id, "output" => ~s({"another":"answer"})} in body[
"tool_outputs"
]

conn
|> Plug.Conn.put_resp_content_type("application/json")
|> Plug.Conn.resp(
400,
Jason.encode!(%{
"error" => %{
"code" => nil,
"message" => "Runs in status \"queued\" do not accept tool outputs.",
"param" => nil,
"type" => "invalid_request_error"
}
})
)
end
)

:ok = GptAgent.submit_tool_output(pid, tool_id, %{another: "answer"})

assert_receive {^pid, %ToolCallOutputSubmissionFailed{}}, 5_000
end
end

describe "run_in_progress?/1" do
Expand Down

0 comments on commit d1c08c0

Please sign in to comment.