identified performance issues with streamed markdown

This commit is contained in:
2026-03-06 13:39:32 -07:00
parent aee7aa7b16
commit c747f1d4ce
8 changed files with 257 additions and 96 deletions

View File

@@ -68,7 +68,8 @@ defmodule ElixirAi.ChatUtils do
{:cont, acc}
end
) do
{:ok, _} ->
{:ok, _response} ->
# Logger.info("AI request completed with response #{inspect(response)}")
:ok
{:error, reason} ->

View File

@@ -34,10 +34,15 @@ defmodule ElixirAi.AiUtils.StreamLineUtils do
end
# last streamed response
def handle_stream_line(server, %{
"choices" => [%{"finish_reason" => "stop"}],
"id" => id
}) do
def handle_stream_line(
server,
%{
"choices" => [%{"finish_reason" => "stop"}],
"id" => id
} = msg
) do
Logger.info("Received end of AI response stream for id #{id} with message: #{inspect(msg)}")
send(
server,
{:ai_text_stream_finish, id}
@@ -113,11 +118,14 @@ defmodule ElixirAi.AiUtils.StreamLineUtils do
end
# end tool call
def handle_stream_line(server, %{
"choices" => [%{"finish_reason" => "tool_calls"}],
"id" => id
}) do
# Logger.info("Received tool call end")
def handle_stream_line(
server,
%{
"choices" => [%{"finish_reason" => "tool_calls"}],
"id" => id
} = message
) do
Logger.info("Received tool_calls_finished with message: #{inspect(message)}")
send(server, {:ai_tool_call_end, id})
end

View File

@@ -8,9 +8,11 @@ defmodule ElixirAi.Application do
ElixirAiWeb.Telemetry,
{DNSCluster, query: Application.get_env(:elixir_ai, :dns_cluster_query) || :ignore},
{Phoenix.PubSub, name: ElixirAi.PubSub},
ElixirAi.ChatRunner,
ElixirAi.ToolTesting,
ElixirAiWeb.Endpoint
ElixirAiWeb.Endpoint,
{Registry, keys: :unique, name: ElixirAi.ChatRegistry},
{DynamicSupervisor, name: ElixirAi.ChatRunnerSupervisor, strategy: :one_for_one},
ElixirAi.ConversationManager
]
opts = [strategy: :one_for_one, name: ElixirAi.Supervisor]

View File

@@ -3,49 +3,51 @@ defmodule ElixirAi.ChatRunner do
use GenServer
import ElixirAi.ChatUtils
@topic "ai_chat"
defp via(name), do: {:via, Registry, {ElixirAi.ChatRegistry, name}}
defp topic(name), do: "ai_chat:#{name}"
def new_user_message(text_content) do
GenServer.cast(__MODULE__, {:user_message, text_content})
def new_user_message(name, text_content) do
GenServer.cast(via(name), {:user_message, text_content})
end
@spec get_conversation() :: any()
def get_conversation do
GenServer.call(__MODULE__, :get_conversation)
@spec get_conversation(String.t()) :: any()
def get_conversation(name) do
GenServer.call(via(name), :get_conversation)
end
def start_link(_opts) do
GenServer.start_link(
__MODULE__,
%{
messages: [],
streaming_response: nil,
pending_tool_calls: [],
tools: tools()
},
name: __MODULE__
)
def get_streaming_response(name) do
GenServer.call(via(name), :get_streaming_response)
end
def init(state) do
{:ok, state}
def start_link(name: name) do
GenServer.start_link(__MODULE__, name, name: via(name))
end
def tools do
def init(name) do
{:ok, %{
name: name,
messages: [],
streaming_response: nil,
pending_tool_calls: [],
tools: tools(self())
}}
end
def tools(server) do
[
ai_tool(
name: "store_thing",
description: "store a key value pair in memory",
function: &ElixirAi.ToolTesting.hold_thing/1,
parameters: ElixirAi.ToolTesting.hold_thing_params(),
server: __MODULE__
server: server
),
ai_tool(
name: "read_thing",
description: "read a key value pair that was previously stored with store_thing",
function: &ElixirAi.ToolTesting.get_thing/1,
parameters: ElixirAi.ToolTesting.get_thing_params(),
server: __MODULE__
server: server
),
ai_tool(
name: "set_background_color",
@@ -53,14 +55,14 @@ defmodule ElixirAi.ChatRunner do
"set the background color of the chat interface, accepts specified tailwind colors",
function: &ElixirAi.ToolTesting.set_background_color/1,
parameters: ElixirAi.ToolTesting.set_background_color_params(),
server: __MODULE__
server: server
)
]
end
def handle_cast({:user_message, text_content}, state) do
new_message = %{role: :user, content: text_content}
broadcast({:user_chat_message, new_message})
broadcast(state.name, {:user_chat_message, new_message})
new_state = %{state | messages: state.messages ++ [new_message]}
request_ai_response(self(), new_state.messages, state.tools)
@@ -69,7 +71,7 @@ defmodule ElixirAi.ChatRunner do
def handle_info({:start_new_ai_response, id}, state) do
starting_response = %{id: id, reasoning_content: "", content: "", tool_calls: []}
broadcast({:start_ai_response_stream, starting_response})
broadcast(state.name, {:start_ai_response_stream, starting_response})
{:noreply, %{state | streaming_response: starting_response}}
end
@@ -87,7 +89,7 @@ defmodule ElixirAi.ChatRunner do
end
def handle_info({:ai_reasoning_chunk, _id, reasoning_content}, state) do
broadcast({:reasoning_chunk_content, reasoning_content})
broadcast(state.name, {:reasoning_chunk_content, reasoning_content})
{:noreply,
%{
@@ -100,7 +102,7 @@ defmodule ElixirAi.ChatRunner do
end
def handle_info({:ai_text_chunk, _id, text_content}, state) do
broadcast({:text_chunk_content, text_content})
broadcast(state.name, {:text_chunk_content, text_content})
{:noreply,
%{
@@ -124,7 +126,7 @@ defmodule ElixirAi.ChatRunner do
tool_calls: state.streaming_response.tool_calls
}
broadcast({:end_ai_response, final_message})
broadcast(state.name, {:end_ai_response, final_message})
{:noreply,
%{
@@ -182,17 +184,6 @@ defmodule ElixirAi.ChatRunner do
def handle_info({:ai_tool_call_end, id}, state) do
Logger.info("ending tool call with tools: #{inspect(state.streaming_response.tool_calls)}")
parsed_tool_calls =
Enum.map(state.streaming_response.tool_calls, fn tool_call ->
case Jason.decode(tool_call.arguments) do
{:ok, decoded_args} ->
{:ok, tool_call, decoded_args}
{:error, e} ->
{:error, tool_call, "Failed to decode tool arguments: #{inspect(e)}"}
end
end)
tool_request_message = %{
role: :assistant,
content: state.streaming_response.content,
@@ -200,38 +191,27 @@ defmodule ElixirAi.ChatRunner do
tool_calls: state.streaming_response.tool_calls
}
broadcast({:tool_request_message, tool_request_message})
broadcast(state.name, {:tool_request_message, tool_request_message})
failed_call_messages =
parsed_tool_calls
|> Enum.filter(fn
{:error, _tool_call, _error_msg} -> true
_ -> false
end)
|> Enum.map(fn {:error, tool_call, error_msg} ->
Logger.error("Tool call #{tool_call.name} failed with error: #{error_msg}")
%{role: :tool, content: error_msg, tool_call_id: tool_call.id}
end)
pending_call_ids =
parsed_tool_calls
|> Enum.filter(fn
{:ok, _tool_call, _decoded_args} -> true
_ -> false
end)
|> Enum.map(fn {:ok, tool_call, decoded_args} ->
case Enum.find(state.tools, fn t -> t.name == tool_call.name end) do
{failed_call_messages, pending_call_ids} =
Enum.reduce(state.streaming_response.tool_calls, {[], []}, fn tool_call, {failed, pending} ->
with {:ok, decoded_args} <- Jason.decode(tool_call.arguments),
tool when not is_nil(tool) <- Enum.find(state.tools, fn t -> t.name == tool_call.name end) do
tool.run_function.(id, tool_call.id, decoded_args)
{failed, [tool_call.id | pending]}
else
{:error, e} ->
error_msg = "Failed to decode tool arguments: #{inspect(e)}"
Logger.error("Tool call #{tool_call.name} failed: #{error_msg}")
{[%{role: :tool, content: error_msg, tool_call_id: tool_call.id} | failed], pending}
nil ->
Logger.error("No tool definition found for #{tool_call.name}")
nil
tool ->
tool.run_function.(id, tool_call.id, decoded_args)
tool_call.id
error_msg = "No tool definition found for #{tool_call.name}"
Logger.error(error_msg)
{[%{role: :tool, content: error_msg, tool_call_id: tool_call.id} | failed], pending}
end
end)
|> Enum.filter(& &1)
{:noreply,
%{
@@ -244,7 +224,7 @@ defmodule ElixirAi.ChatRunner do
def handle_info({:tool_response, _id, tool_call_id, result}, state) do
new_message = %{role: :tool, content: inspect(result), tool_call_id: tool_call_id}
broadcast({:one_tool_finished, new_message})
broadcast(state.name, {:one_tool_finished, new_message})
new_pending_tool_calls =
Enum.filter(state.pending_tool_calls, fn id -> id != tool_call_id end)
@@ -259,7 +239,7 @@ defmodule ElixirAi.ChatRunner do
end
if new_pending_tool_calls == [] do
broadcast(:tool_calls_finished)
broadcast(state.name, :tool_calls_finished)
request_ai_response(self(), state.messages ++ [new_message], state.tools)
end
@@ -276,5 +256,9 @@ defmodule ElixirAi.ChatRunner do
{:reply, state, state}
end
defp broadcast(msg), do: Phoenix.PubSub.broadcast(ElixirAi.PubSub, @topic, msg)
def handle_call(:get_streaming_response, _from, state) do
{:reply, state.streaming_response, state}
end
defp broadcast(name, msg), do: Phoenix.PubSub.broadcast(ElixirAi.PubSub, topic(name), msg)
end

View File

@@ -0,0 +1,49 @@
defmodule ElixirAi.ConversationManager do
use GenServer
def start_link(_opts), do: GenServer.start_link(__MODULE__, [], name: __MODULE__)
def init(names), do: {:ok, names}
def create_conversation(name) do
GenServer.call(__MODULE__, {:create, name})
end
def open_conversation(name) do
GenServer.call(__MODULE__, {:open, name})
end
def list_conversations do
GenServer.call(__MODULE__, :list)
end
def handle_call({:create, name}, _from, names) do
if name in names do
{:reply, {:error, :already_exists}, names}
else
{:reply, start_runner(name), [name | names]}
end
end
def handle_call({:open, name}, _from, names) do
if name in names do
{:reply, start_runner(name), names}
else
{:reply, {:error, :not_found}, names}
end
end
def handle_call(:list, _from, names) do
{:reply, names, names}
end
defp start_runner(name) do
case DynamicSupervisor.start_child(
ElixirAi.ChatRunnerSupervisor,
{ElixirAi.ChatRunner, name: name}
) do
{:ok, pid} -> {:ok, pid}
{:error, {:already_started, pid}} -> {:ok, pid}
error -> error
end
end
end