From aee7aa7b16737775442d8a2fe02ade2d7988e6fa Mon Sep 17 00:00:00 2001 From: Alex Mickelson Date: Fri, 6 Mar 2026 11:01:57 -0700 Subject: [PATCH] async tool results --- lib/elixir_ai/ai_utils/chat_utils.ex | 13 ++- lib/elixir_ai/ai_utils/stream_line_utils.ex | 2 +- lib/elixir_ai/chat_runner.ex | 106 ++++++++++++++----- lib/elixir_ai/tool_testing.ex | 10 +- lib/elixir_ai_web/components/chat_message.ex | 2 +- lib/elixir_ai_web/live/chat_live.ex | 48 ++++++--- 6 files changed, 126 insertions(+), 55 deletions(-) diff --git a/lib/elixir_ai/ai_utils/chat_utils.ex b/lib/elixir_ai/ai_utils/chat_utils.ex index e266f9f..1e6c9c1 100644 --- a/lib/elixir_ai/ai_utils/chat_utils.ex +++ b/lib/elixir_ai/ai_utils/chat_utils.ex @@ -6,7 +6,8 @@ defmodule ElixirAi.ChatUtils do name: name, description: description, function: function, - parameters: parameters + parameters: parameters, + server: server ) do schema = %{ "type" => "function", @@ -25,10 +26,18 @@ defmodule ElixirAi.ChatUtils do } } + run_function = fn current_message_id, tool_call_id, args -> + Task.start(fn -> + result = function.(args) + send(server, {:tool_response, current_message_id, tool_call_id, result}) + end) + end + %{ name: name, definition: schema, - function: function + # function: function, + run_function: run_function } end diff --git a/lib/elixir_ai/ai_utils/stream_line_utils.ex b/lib/elixir_ai/ai_utils/stream_line_utils.ex index 564bff5..760f374 100644 --- a/lib/elixir_ai/ai_utils/stream_line_utils.ex +++ b/lib/elixir_ai/ai_utils/stream_line_utils.ex @@ -40,7 +40,7 @@ defmodule ElixirAi.AiUtils.StreamLineUtils do }) do send( server, - {:ai_stream_finish, id} + {:ai_text_stream_finish, id} ) end diff --git a/lib/elixir_ai/chat_runner.ex b/lib/elixir_ai/chat_runner.ex index e4e2fbe..1ccfa2c 100644 --- a/lib/elixir_ai/chat_runner.ex +++ b/lib/elixir_ai/chat_runner.ex @@ -20,6 +20,7 @@ defmodule ElixirAi.ChatRunner do %{ messages: [], streaming_response: nil, + pending_tool_calls: [], tools: tools() }, name: __MODULE__ @@ -36,20 +37,23 @@ defmodule ElixirAi.ChatRunner do name: "store_thing", description: "store a key value pair in memory", function: &ElixirAi.ToolTesting.hold_thing/1, - parameters: ElixirAi.ToolTesting.hold_thing_params() + parameters: ElixirAi.ToolTesting.hold_thing_params(), + server: __MODULE__ ), ai_tool( name: "read_thing", description: "read a key value pair that was previously stored with store_thing", function: &ElixirAi.ToolTesting.get_thing/1, - parameters: ElixirAi.ToolTesting.get_thing_params() + parameters: ElixirAi.ToolTesting.get_thing_params(), + server: __MODULE__ ), ai_tool( name: "set_background_color", description: "set the background color of the chat interface, accepts specified tailwind colors", function: &ElixirAi.ToolTesting.set_background_color/1, - parameters: ElixirAi.ToolTesting.set_background_color_params() + parameters: ElixirAi.ToolTesting.set_background_color_params(), + server: __MODULE__ ) ] end @@ -108,13 +112,11 @@ defmodule ElixirAi.ChatRunner do }} end - def handle_info({:ai_stream_finish, _id}, state) do + def handle_info({:ai_text_stream_finish, _id}, state) do Logger.info( "AI stream finished for id #{state.streaming_response.id}, broadcasting end of AI response" ) - broadcast(:end_ai_response) - final_message = %{ role: :assistant, content: state.streaming_response.content, @@ -122,6 +124,8 @@ defmodule ElixirAi.ChatRunner do tool_calls: state.streaming_response.tool_calls } + broadcast({:end_ai_response, final_message}) + {:noreply, %{ state @@ -175,20 +179,17 @@ defmodule ElixirAi.ChatRunner do {:noreply, %{state | streaming_response: new_streaming_response}} end - def handle_info({:ai_tool_call_end, _id}, state) do + def handle_info({:ai_tool_call_end, id}, state) do Logger.info("ending tool call with tools: #{inspect(state.streaming_response.tool_calls)}") - tool_calls = + parsed_tool_calls = Enum.map(state.streaming_response.tool_calls, fn tool_call -> case Jason.decode(tool_call.arguments) do {:ok, decoded_args} -> - tool = state.tools |> Enum.find(fn t -> t.name == tool_call.name end) - - res = tool.function.(decoded_args) - Map.put(tool_call, :result, res) + {:ok, tool_call, decoded_args} {:error, e} -> - Map.put(tool_call, :error, "Failed to decode tool arguments: #{inspect(e)}") + {:error, tool_call, "Failed to decode tool arguments: #{inspect(e)}"} end end) @@ -196,26 +197,79 @@ defmodule ElixirAi.ChatRunner do role: :assistant, content: state.streaming_response.content, reasoning_content: state.streaming_response.reasoning_content, - tool_calls: tool_calls + tool_calls: state.streaming_response.tool_calls } - result_messages = - Enum.map(tool_calls, fn call -> - if Map.has_key?(call, :result) do - %{role: :tool, content: "#{inspect(call.result)}", tool_call_id: call.id} - else - %{role: :tool, content: "Error in #{call.name}: #{call.error}", tool_call_id: call.id} - end + broadcast({:tool_request_message, tool_request_message}) + + failed_call_messages = + parsed_tool_calls + |> Enum.filter(fn + {:error, _tool_call, _error_msg} -> true + _ -> false + end) + |> Enum.map(fn {:error, tool_call, error_msg} -> + Logger.error("Tool call #{tool_call.name} failed with error: #{error_msg}") + %{role: :tool, content: error_msg, tool_call_id: tool_call.id} end) - new_messages = [tool_request_message] ++ result_messages + pending_call_ids = + parsed_tool_calls + |> Enum.filter(fn + {:ok, _tool_call, _decoded_args} -> true + _ -> false + end) + |> Enum.map(fn {:ok, tool_call, decoded_args} -> + case Enum.find(state.tools, fn t -> t.name == tool_call.name end) do + nil -> + Logger.error("No tool definition found for #{tool_call.name}") + nil - Logger.info("All tool calls finished, broadcasting updated tool calls with results") - broadcast({:tool_calls_finished, new_messages}) + tool -> + tool.run_function.(id, tool_call.id, decoded_args) - request_ai_response(self(), state.messages ++ new_messages, state.tools) + tool_call.id + end + end) + |> Enum.filter(& &1) - {:noreply, %{state | messages: state.messages ++ new_messages, streaming_response: nil}} + {:noreply, + %{ + state + | messages: state.messages ++ [tool_request_message] ++ failed_call_messages, + pending_tool_calls: pending_call_ids + }} + end + + def handle_info({:tool_response, _id, tool_call_id, result}, state) do + new_message = %{role: :tool, content: inspect(result), tool_call_id: tool_call_id} + + broadcast({:one_tool_finished, new_message}) + + new_pending_tool_calls = + Enum.filter(state.pending_tool_calls, fn id -> id != tool_call_id end) + + new_streaming_response = + case new_pending_tool_calls do + [] -> + nil + + _ -> + state.streaming_response + end + + if new_pending_tool_calls == [] do + broadcast(:tool_calls_finished) + request_ai_response(self(), state.messages ++ [new_message], state.tools) + end + + {:noreply, + %{ + state + | pending_tool_calls: new_pending_tool_calls, + streaming_response: new_streaming_response, + messages: state.messages ++ [new_message] + }} end def handle_call(:get_conversation, _from, state) do diff --git a/lib/elixir_ai/tool_testing.ex b/lib/elixir_ai/tool_testing.ex index c1fff49..e379d8b 100644 --- a/lib/elixir_ai/tool_testing.ex +++ b/lib/elixir_ai/tool_testing.ex @@ -44,15 +44,7 @@ defmodule ElixirAi.ToolTesting do end def set_background_color_params do - valid_tailwind_colors = [ - "bg-cyan-950/30", - "bg-red-950/30", - "bg-green-950/30", - "bg-blue-950/30", - "bg-yellow-950/30", - "bg-purple-950/30", - "bg-pink-950/30" - ] + valid_tailwind_colors = ElixirAiWeb.ChatLive.valid_background_colors() %{ "type" => "object", diff --git a/lib/elixir_ai_web/components/chat_message.ex b/lib/elixir_ai_web/components/chat_message.ex index 5f2cff4..5591788 100644 --- a/lib/elixir_ai_web/components/chat_message.ex +++ b/lib/elixir_ai_web/components/chat_message.ex @@ -42,7 +42,7 @@ defmodule ElixirAiWeb.ChatMessage do def assistant_message(assigns) do assigns = assigns - |> assign(:_reasoning_id, "reasoning-#{:erlang.phash2(assigns.content)}") + |> assign(:_reasoning_id, "reasoning-#{:erlang.phash2({assigns.content, assigns.reasoning_content, assigns.tool_calls})}") |> assign(:_expanded, false) ~H""" diff --git a/lib/elixir_ai_web/live/chat_live.ex b/lib/elixir_ai_web/live/chat_live.ex index 13b2d10..7a24538 100644 --- a/lib/elixir_ai_web/live/chat_live.ex +++ b/lib/elixir_ai_web/live/chat_live.ex @@ -7,6 +7,18 @@ defmodule ElixirAiWeb.ChatLive do @topic "ai_chat" + def valid_background_colors do + [ + "bg-cyan-950/30", + "bg-red-950/30", + "bg-green-950/30", + "bg-blue-950/30", + "bg-yellow-950/30", + "bg-purple-950/30", + "bg-pink-950/30" + ] + end + def mount(_params, _session, socket) do if connected?(socket), do: Phoenix.PubSub.subscribe(ElixirAi.PubSub, @topic) conversation = ChatRunner.get_conversation() @@ -111,35 +123,39 @@ defmodule ElixirAiWeb.ChatLive do {:noreply, assign(socket, streaming_response: updated_response)} end - def handle_info({:tool_calls_finished, tool_messages}, socket) do - Logger.info("Received tool_calls_finished with #{inspect(tool_messages)}") + def handle_info(:tool_calls_finished, socket) do + Logger.info("Received tool_calls_finished") {:noreply, socket - |> update(:messages, &(&1 ++ tool_messages)) |> assign(streaming_response: nil)} end - # tool_calls_finished already cleared streaming_response and committed messages — ignore - def handle_info(:end_ai_response, %{assigns: %{streaming_response: nil}} = socket) do - {:noreply, socket} - end - - def handle_info(:end_ai_response, socket) do - final_response = %{ - role: :assistant, - content: socket.assigns.streaming_response.content, - reasoning_content: socket.assigns.streaming_response.reasoning_content, - tool_calls: socket.assigns.streaming_response.tool_calls - } + def handle_info({:tool_request_message, tool_request_message}, socket) do + Logger.info("tool request message: #{inspect(tool_request_message)}") {:noreply, socket - |> update(:messages, &(&1 ++ [final_response])) + |> update(:messages, &(&1 ++ [tool_request_message]))} + end + + def handle_info({:one_tool_finished, tool_response}, socket) do + Logger.info("Received one_tool_finished with #{inspect(tool_response)}") + + {:noreply, + socket + |> update(:messages, &(&1 ++ [tool_response]))} + end + + def handle_info({:end_ai_response, final_message}, socket) do + {:noreply, + socket + |> update(:messages, &(&1 ++ [final_message])) |> assign(streaming_response: nil)} end def handle_info({:set_background_color, color}, socket) do + Logger.info("setting background color to #{color}") {:noreply, assign(socket, background_color: color)} end end