diff --git a/config/config.exs b/config/config.exs index 744668c..289573f 100644 --- a/config/config.exs +++ b/config/config.exs @@ -11,6 +11,8 @@ case config_env() do config :gpt_agent, :rate_limit_retry_delay, 30_000 config :gpt_agent, :rate_limit_max_retries, 10 + config :gpt_agent, :tool_output_retry_delay, 1_000 + :test -> config :open_ai_client, :openai_api_key, "test" config :open_ai_client, :openai_organization_id, "test" @@ -20,6 +22,8 @@ case config_env() do config :gpt_agent, :rate_limit_retry_delay, 100 config :gpt_agent, :rate_limit_max_retries, 2 + config :gpt_agent, :tool_output_retry_delay, 0 + _ -> nil end diff --git a/lib/gpt_agent.ex b/lib/gpt_agent.ex index fd87abf..74a39a2 100644 --- a/lib/gpt_agent.ex +++ b/lib/gpt_agent.ex @@ -22,6 +22,7 @@ defmodule GptAgent do RunFailed, RunStarted, ToolCallOutputRecorded, + ToolCallOutputSubmissionFailed, ToolCallRequested, UserMessageAdded } @@ -29,8 +30,10 @@ defmodule GptAgent do # two minutes @timeout_ms 120_000 - @rate_limit_max_retries Application.compile_env(:gpt_agent, :rate_limit_max_retries, 0) - @rate_limit_retry_delay Application.compile_env(:gpt_agent, :rate_limit_retry_delay, 0) + @rate_limit_max_retries Application.compile_env(:gpt_agent, :rate_limit_max_retries, 10) + @rate_limit_retry_delay Application.compile_env(:gpt_agent, :rate_limit_retry_delay, 30_000) + + @tool_output_retry_delay Application.compile_env(:gpt_agent, :tool_output_retry_delay, 1000) typedstruct do field :assistant_id, Types.assistant_id(), enforce: true @@ -308,23 +311,46 @@ defmodule GptAgent do end end + defp possibly_send_outputs_to_openai(state, failure_count \\ 0) + + defp possibly_send_outputs_to_openai(state, failure_count) when failure_count >= 3 do + log("Failed to send tool outputs to OpenAI after 3 attempts, giving up", :warning) + + state + |> publish_event( + ToolCallOutputSubmissionFailed.new!( + thread_id: state.thread_id, + run_id: state.run_id + ) + ) + end + defp possibly_send_outputs_to_openai( - %__MODULE__{running?: true, tool_calls: [], tool_outputs: [_ | _]} = state + %__MODULE__{running?: true, tool_calls: [], tool_outputs: [_ | _]} = state, + failure_count ) do log("Sending tool outputs to OpenAI") - {:ok, %{body: %{"object" => "thread.run", "cancelled_at" => nil, "failed_at" => nil}}} = - OpenAiClient.post("/v1/threads/#{state.thread_id}/runs/#{state.run_id}/submit_tool_outputs", - json: %{tool_outputs: state.tool_outputs}, - receive_timeout: receive_timeout_ms(state) - ) + try do + {:ok, %{body: %{"object" => "thread.run", "cancelled_at" => nil, "failed_at" => nil}}} = + OpenAiClient.post( + "/v1/threads/#{state.thread_id}/runs/#{state.run_id}/submit_tool_outputs", + json: %{tool_outputs: state.tool_outputs}, + receive_timeout: receive_timeout_ms(state) + ) + rescue + exception -> + log("Failed to send tool outputs to OpenAI: #{inspect(exception)}", :warning) + :timer.sleep(@tool_output_retry_delay) + possibly_send_outputs_to_openai(state, failure_count + 1) + end Process.send_after(self(), {:check_run_status, state.run_id}, heartbeat_interval_ms()) %{state | tool_outputs: []} end - defp possibly_send_outputs_to_openai(%__MODULE__{} = state), do: state + defp possibly_send_outputs_to_openai(%__MODULE__{} = state, _failure_count), do: state @impl true def handle_call(:run_in_progress?, _caller, %__MODULE__{} = state) do diff --git a/lib/gpt_agent/events/tool_call_output_submission_failed.ex b/lib/gpt_agent/events/tool_call_output_submission_failed.ex new file mode 100644 index 0000000..5a9dd98 --- /dev/null +++ b/lib/gpt_agent/events/tool_call_output_submission_failed.ex @@ -0,0 +1,14 @@ +defmodule GptAgent.Events.ToolCallOutputSubmissionFailed do + @moduledoc """ + The GPT Assistant has recorded a tool call output + """ + + use GptAgent.Types + alias GptAgent.Types + + @derive Jason.Encoder + typedstruct enforce: true do + field :thread_id, Types.thread_id() + field :run_id, Types.run_id() + end +end diff --git a/mix.exs b/mix.exs index 6bb7bde..c145111 100644 --- a/mix.exs +++ b/mix.exs @@ -4,7 +4,7 @@ defmodule GptAgent.MixProject do def project do [ app: :gpt_agent, - version: "9.2.2", + version: "9.2.3", elixir: "~> 1.16", start_permanent: Mix.env() == :prod, aliases: aliases(), diff --git a/test/gpt_agent_test.exs b/test/gpt_agent_test.exs index d6e660c..79a4ed7 100644 --- a/test/gpt_agent_test.exs +++ b/test/gpt_agent_test.exs @@ -16,6 +16,7 @@ defmodule GptAgentTest do RunFailed, RunStarted, ToolCallOutputRecorded, + ToolCallOutputSubmissionFailed, ToolCallRequested, UserMessageAdded } @@ -1635,6 +1636,109 @@ defmodule GptAgentTest do assert_receive {^pid, %RunCompleted{}}, 5_000 end + + @tag capture_log: true + test "error response from OpenAI will be retried up to 3 times", + %{ + bypass: bypass, + assistant_id: assistant_id, + thread_id: thread_id, + run_id: run_id + } do + {:ok, pid} = + GptAgent.connect(thread_id: thread_id, last_message_id: nil, assistant_id: assistant_id) + + tool_id = UUID.uuid4() + + Bypass.stub(bypass, "GET", "/v1/threads/#{thread_id}/runs/#{run_id}", fn conn -> + conn + |> Plug.Conn.put_resp_content_type("application/json") + |> Plug.Conn.resp( + 200, + Jason.encode!(%{ + "id" => run_id, + "object" => "thread.run", + "created_at" => 1_699_075_072, + "assistant_id" => assistant_id, + "thread_id" => thread_id, + "status" => "requires_action", + "required_action" => %{ + "type" => "submit_tool_outputs", + "submit_tool_outputs" => %{ + "tool_calls" => [ + %{ + "id" => tool_id, + "type" => "function", + "function" => %{"name" => "tool_1", "arguments" => ~s({"foo":"bar","baz":1})} + } + ] + } + }, + "started_at" => 1_699_075_072, + "expires_at" => nil, + "cancelled_at" => nil, + "failed_at" => nil, + "completed_at" => 1_699_075_073, + "last_error" => nil, + "model" => "gpt-4-1106-preview", + "instructions" => nil, + "tools" => [], + "file_ids" => [], + "metadata" => %{} + }) + ) + end) + + :ok = GptAgent.add_user_message(pid, Faker.Lorem.sentence()) + + assert_receive {^pid, + %ToolCallRequested{ + id: ^tool_id, + thread_id: ^thread_id, + run_id: ^run_id, + name: "tool_1", + arguments: %{"foo" => "bar", "baz" => 1} + }}, + 5_000 + + {:ok, call_counter_pid} = Agent.start_link(fn -> 0 end) + + Bypass.expect( + bypass, + "POST", + "/v1/threads/#{thread_id}/runs/#{run_id}/submit_tool_outputs", + fn conn -> + assert Agent.get_and_update(call_counter_pid, &{&1, &1 + 1}) < 3 + + {:ok, body, conn} = Plug.Conn.read_body(conn) + body = Jason.decode!(body) + assert Map.keys(body) == ["tool_outputs"] + assert length(body["tool_outputs"]) == 1 + + assert %{"tool_call_id" => tool_id, "output" => ~s({"another":"answer"})} in body[ + "tool_outputs" + ] + + conn + |> Plug.Conn.put_resp_content_type("application/json") + |> Plug.Conn.resp( + 400, + Jason.encode!(%{ + "error" => %{ + "code" => nil, + "message" => "Runs in status \"queued\" do not accept tool outputs.", + "param" => nil, + "type" => "invalid_request_error" + } + }) + ) + end + ) + + :ok = GptAgent.submit_tool_output(pid, tool_id, %{another: "answer"}) + + assert_receive {^pid, %ToolCallOutputSubmissionFailed{}}, 5_000 + end end describe "run_in_progress?/1" do