From c747f1d4ced2351a980fac52d9e8cfaeb0ca2bcd Mon Sep 17 00:00:00 2001 From: Alex Mickelson Date: Fri, 6 Mar 2026 13:39:32 -0700 Subject: [PATCH] identified performance issues with streamed markdown --- lib/elixir_ai/ai_utils/chat_utils.ex | 3 +- lib/elixir_ai/ai_utils/stream_line_utils.ex | 26 +++-- lib/elixir_ai/application.ex | 6 +- lib/elixir_ai/chat_runner.ex | 122 +++++++++----------- lib/elixir_ai/conversation_manager.ex | 49 ++++++++ lib/elixir_ai_web/live/chat_live.ex | 64 +++++++--- lib/elixir_ai_web/live/home_live.ex | 80 +++++++++++++ lib/elixir_ai_web/router.ex | 3 +- 8 files changed, 257 insertions(+), 96 deletions(-) create mode 100644 lib/elixir_ai/conversation_manager.ex create mode 100644 lib/elixir_ai_web/live/home_live.ex diff --git a/lib/elixir_ai/ai_utils/chat_utils.ex b/lib/elixir_ai/ai_utils/chat_utils.ex index 1e6c9c1..7fa96b9 100644 --- a/lib/elixir_ai/ai_utils/chat_utils.ex +++ b/lib/elixir_ai/ai_utils/chat_utils.ex @@ -68,7 +68,8 @@ defmodule ElixirAi.ChatUtils do {:cont, acc} end ) do - {:ok, _} -> + {:ok, _response} -> + # Logger.info("AI request completed with response #{inspect(response)}") :ok {:error, reason} -> diff --git a/lib/elixir_ai/ai_utils/stream_line_utils.ex b/lib/elixir_ai/ai_utils/stream_line_utils.ex index 760f374..fe01d12 100644 --- a/lib/elixir_ai/ai_utils/stream_line_utils.ex +++ b/lib/elixir_ai/ai_utils/stream_line_utils.ex @@ -34,10 +34,15 @@ defmodule ElixirAi.AiUtils.StreamLineUtils do end # last streamed response - def handle_stream_line(server, %{ - "choices" => [%{"finish_reason" => "stop"}], - "id" => id - }) do + def handle_stream_line( + server, + %{ + "choices" => [%{"finish_reason" => "stop"}], + "id" => id + } = msg + ) do + Logger.info("Received end of AI response stream for id #{id} with message: #{inspect(msg)}") + send( server, {:ai_text_stream_finish, id} @@ -113,11 +118,14 @@ defmodule ElixirAi.AiUtils.StreamLineUtils do end # end tool call - def handle_stream_line(server, %{ - "choices" => [%{"finish_reason" => "tool_calls"}], - "id" => id - }) do - # Logger.info("Received tool call end") + def handle_stream_line( + server, + %{ + "choices" => [%{"finish_reason" => "tool_calls"}], + "id" => id + } = message + ) do + Logger.info("Received tool_calls_finished with message: #{inspect(message)}") send(server, {:ai_tool_call_end, id}) end diff --git a/lib/elixir_ai/application.ex b/lib/elixir_ai/application.ex index 82299a8..69d6245 100644 --- a/lib/elixir_ai/application.ex +++ b/lib/elixir_ai/application.ex @@ -8,9 +8,11 @@ defmodule ElixirAi.Application do ElixirAiWeb.Telemetry, {DNSCluster, query: Application.get_env(:elixir_ai, :dns_cluster_query) || :ignore}, {Phoenix.PubSub, name: ElixirAi.PubSub}, - ElixirAi.ChatRunner, ElixirAi.ToolTesting, - ElixirAiWeb.Endpoint + ElixirAiWeb.Endpoint, + {Registry, keys: :unique, name: ElixirAi.ChatRegistry}, + {DynamicSupervisor, name: ElixirAi.ChatRunnerSupervisor, strategy: :one_for_one}, + ElixirAi.ConversationManager ] opts = [strategy: :one_for_one, name: ElixirAi.Supervisor] diff --git a/lib/elixir_ai/chat_runner.ex b/lib/elixir_ai/chat_runner.ex index 1ccfa2c..bd2d53e 100644 --- a/lib/elixir_ai/chat_runner.ex +++ b/lib/elixir_ai/chat_runner.ex @@ -3,49 +3,51 @@ defmodule ElixirAi.ChatRunner do use GenServer import ElixirAi.ChatUtils - @topic "ai_chat" + defp via(name), do: {:via, Registry, {ElixirAi.ChatRegistry, name}} + defp topic(name), do: "ai_chat:#{name}" - def new_user_message(text_content) do - GenServer.cast(__MODULE__, {:user_message, text_content}) + def new_user_message(name, text_content) do + GenServer.cast(via(name), {:user_message, text_content}) end - @spec get_conversation() :: any() - def get_conversation do - GenServer.call(__MODULE__, :get_conversation) + @spec get_conversation(String.t()) :: any() + def get_conversation(name) do + GenServer.call(via(name), :get_conversation) end - def start_link(_opts) do - GenServer.start_link( - __MODULE__, - %{ - messages: [], - streaming_response: nil, - pending_tool_calls: [], - tools: tools() - }, - name: __MODULE__ - ) + def get_streaming_response(name) do + GenServer.call(via(name), :get_streaming_response) end - def init(state) do - {:ok, state} + def start_link(name: name) do + GenServer.start_link(__MODULE__, name, name: via(name)) end - def tools do + def init(name) do + {:ok, %{ + name: name, + messages: [], + streaming_response: nil, + pending_tool_calls: [], + tools: tools(self()) + }} + end + + def tools(server) do [ ai_tool( name: "store_thing", description: "store a key value pair in memory", function: &ElixirAi.ToolTesting.hold_thing/1, parameters: ElixirAi.ToolTesting.hold_thing_params(), - server: __MODULE__ + server: server ), ai_tool( name: "read_thing", description: "read a key value pair that was previously stored with store_thing", function: &ElixirAi.ToolTesting.get_thing/1, parameters: ElixirAi.ToolTesting.get_thing_params(), - server: __MODULE__ + server: server ), ai_tool( name: "set_background_color", @@ -53,14 +55,14 @@ defmodule ElixirAi.ChatRunner do "set the background color of the chat interface, accepts specified tailwind colors", function: &ElixirAi.ToolTesting.set_background_color/1, parameters: ElixirAi.ToolTesting.set_background_color_params(), - server: __MODULE__ + server: server ) ] end def handle_cast({:user_message, text_content}, state) do new_message = %{role: :user, content: text_content} - broadcast({:user_chat_message, new_message}) + broadcast(state.name, {:user_chat_message, new_message}) new_state = %{state | messages: state.messages ++ [new_message]} request_ai_response(self(), new_state.messages, state.tools) @@ -69,7 +71,7 @@ defmodule ElixirAi.ChatRunner do def handle_info({:start_new_ai_response, id}, state) do starting_response = %{id: id, reasoning_content: "", content: "", tool_calls: []} - broadcast({:start_ai_response_stream, starting_response}) + broadcast(state.name, {:start_ai_response_stream, starting_response}) {:noreply, %{state | streaming_response: starting_response}} end @@ -87,7 +89,7 @@ defmodule ElixirAi.ChatRunner do end def handle_info({:ai_reasoning_chunk, _id, reasoning_content}, state) do - broadcast({:reasoning_chunk_content, reasoning_content}) + broadcast(state.name, {:reasoning_chunk_content, reasoning_content}) {:noreply, %{ @@ -100,7 +102,7 @@ defmodule ElixirAi.ChatRunner do end def handle_info({:ai_text_chunk, _id, text_content}, state) do - broadcast({:text_chunk_content, text_content}) + broadcast(state.name, {:text_chunk_content, text_content}) {:noreply, %{ @@ -124,7 +126,7 @@ defmodule ElixirAi.ChatRunner do tool_calls: state.streaming_response.tool_calls } - broadcast({:end_ai_response, final_message}) + broadcast(state.name, {:end_ai_response, final_message}) {:noreply, %{ @@ -182,17 +184,6 @@ defmodule ElixirAi.ChatRunner do def handle_info({:ai_tool_call_end, id}, state) do Logger.info("ending tool call with tools: #{inspect(state.streaming_response.tool_calls)}") - parsed_tool_calls = - Enum.map(state.streaming_response.tool_calls, fn tool_call -> - case Jason.decode(tool_call.arguments) do - {:ok, decoded_args} -> - {:ok, tool_call, decoded_args} - - {:error, e} -> - {:error, tool_call, "Failed to decode tool arguments: #{inspect(e)}"} - end - end) - tool_request_message = %{ role: :assistant, content: state.streaming_response.content, @@ -200,38 +191,27 @@ defmodule ElixirAi.ChatRunner do tool_calls: state.streaming_response.tool_calls } - broadcast({:tool_request_message, tool_request_message}) + broadcast(state.name, {:tool_request_message, tool_request_message}) - failed_call_messages = - parsed_tool_calls - |> Enum.filter(fn - {:error, _tool_call, _error_msg} -> true - _ -> false - end) - |> Enum.map(fn {:error, tool_call, error_msg} -> - Logger.error("Tool call #{tool_call.name} failed with error: #{error_msg}") - %{role: :tool, content: error_msg, tool_call_id: tool_call.id} - end) - pending_call_ids = - parsed_tool_calls - |> Enum.filter(fn - {:ok, _tool_call, _decoded_args} -> true - _ -> false - end) - |> Enum.map(fn {:ok, tool_call, decoded_args} -> - case Enum.find(state.tools, fn t -> t.name == tool_call.name end) do + {failed_call_messages, pending_call_ids} = + Enum.reduce(state.streaming_response.tool_calls, {[], []}, fn tool_call, {failed, pending} -> + with {:ok, decoded_args} <- Jason.decode(tool_call.arguments), + tool when not is_nil(tool) <- Enum.find(state.tools, fn t -> t.name == tool_call.name end) do + tool.run_function.(id, tool_call.id, decoded_args) + {failed, [tool_call.id | pending]} + else + {:error, e} -> + error_msg = "Failed to decode tool arguments: #{inspect(e)}" + Logger.error("Tool call #{tool_call.name} failed: #{error_msg}") + {[%{role: :tool, content: error_msg, tool_call_id: tool_call.id} | failed], pending} + nil -> - Logger.error("No tool definition found for #{tool_call.name}") - nil - - tool -> - tool.run_function.(id, tool_call.id, decoded_args) - - tool_call.id + error_msg = "No tool definition found for #{tool_call.name}" + Logger.error(error_msg) + {[%{role: :tool, content: error_msg, tool_call_id: tool_call.id} | failed], pending} end end) - |> Enum.filter(& &1) {:noreply, %{ @@ -244,7 +224,7 @@ defmodule ElixirAi.ChatRunner do def handle_info({:tool_response, _id, tool_call_id, result}, state) do new_message = %{role: :tool, content: inspect(result), tool_call_id: tool_call_id} - broadcast({:one_tool_finished, new_message}) + broadcast(state.name, {:one_tool_finished, new_message}) new_pending_tool_calls = Enum.filter(state.pending_tool_calls, fn id -> id != tool_call_id end) @@ -259,7 +239,7 @@ defmodule ElixirAi.ChatRunner do end if new_pending_tool_calls == [] do - broadcast(:tool_calls_finished) + broadcast(state.name, :tool_calls_finished) request_ai_response(self(), state.messages ++ [new_message], state.tools) end @@ -276,5 +256,9 @@ defmodule ElixirAi.ChatRunner do {:reply, state, state} end - defp broadcast(msg), do: Phoenix.PubSub.broadcast(ElixirAi.PubSub, @topic, msg) + def handle_call(:get_streaming_response, _from, state) do + {:reply, state.streaming_response, state} + end + + defp broadcast(name, msg), do: Phoenix.PubSub.broadcast(ElixirAi.PubSub, topic(name), msg) end diff --git a/lib/elixir_ai/conversation_manager.ex b/lib/elixir_ai/conversation_manager.ex new file mode 100644 index 0000000..82da392 --- /dev/null +++ b/lib/elixir_ai/conversation_manager.ex @@ -0,0 +1,49 @@ +defmodule ElixirAi.ConversationManager do + use GenServer + + def start_link(_opts), do: GenServer.start_link(__MODULE__, [], name: __MODULE__) + def init(names), do: {:ok, names} + + def create_conversation(name) do + GenServer.call(__MODULE__, {:create, name}) + end + + def open_conversation(name) do + GenServer.call(__MODULE__, {:open, name}) + end + + def list_conversations do + GenServer.call(__MODULE__, :list) + end + + def handle_call({:create, name}, _from, names) do + if name in names do + {:reply, {:error, :already_exists}, names} + else + {:reply, start_runner(name), [name | names]} + end + end + + def handle_call({:open, name}, _from, names) do + if name in names do + {:reply, start_runner(name), names} + else + {:reply, {:error, :not_found}, names} + end + end + def handle_call(:list, _from, names) do + {:reply, names, names} + end + + defp start_runner(name) do + case DynamicSupervisor.start_child( + ElixirAi.ChatRunnerSupervisor, + {ElixirAi.ChatRunner, name: name} + ) do + {:ok, pid} -> {:ok, pid} + {:error, {:already_started, pid}} -> {:ok, pid} + error -> error + end + end + +end diff --git a/lib/elixir_ai_web/live/chat_live.ex b/lib/elixir_ai_web/live/chat_live.ex index 7a24538..2aebb61 100644 --- a/lib/elixir_ai_web/live/chat_live.ex +++ b/lib/elixir_ai_web/live/chat_live.ex @@ -4,8 +4,7 @@ defmodule ElixirAiWeb.ChatLive do import ElixirAiWeb.Spinner import ElixirAiWeb.ChatMessage alias ElixirAi.ChatRunner - - @topic "ai_chat" + alias ElixirAi.ConversationManager def valid_background_colors do [ @@ -19,23 +18,35 @@ defmodule ElixirAiWeb.ChatLive do ] end - def mount(_params, _session, socket) do - if connected?(socket), do: Phoenix.PubSub.subscribe(ElixirAi.PubSub, @topic) - conversation = ChatRunner.get_conversation() + def mount(%{"name" => name}, _session, socket) do + case ConversationManager.open_conversation(name) do + {:ok, _pid} -> + if connected?(socket), + do: Phoenix.PubSub.subscribe(ElixirAi.PubSub, "ai_chat:#{name}") - {:ok, - socket - |> assign(user_input: "") - |> assign(messages: conversation.messages) - |> assign(streaming_response: nil) - |> assign(background_color: "bg-cyan-950/30")} + conversation = ChatRunner.get_conversation(name) + + {:ok, + socket + |> assign(conversation_name: name) + |> assign(user_input: "") + |> assign(messages: conversation.messages) + |> assign(streaming_response: conversation.streaming_response) + |> assign(background_color: "bg-cyan-950/30")} + + {:error, :not_found} -> + {:ok, push_navigate(socket, to: "/")} + end end def render(assigns) do ~H"""
-
- Live Chat +
+ <.link navigate={~p"/"} class="text-cyan-700 hover:text-cyan-400 transition-colors"> + ← + + {@conversation_name}
user_input}, socket) when user_input != "" do - ChatRunner.new_user_message(user_input) + ChatRunner.new_user_message(socket.assigns.conversation_name, user_input) {:noreply, assign(socket, user_input: "")} end @@ -104,6 +115,15 @@ defmodule ElixirAiWeb.ChatLive do {:noreply, assign(socket, streaming_response: starting_response)} end + # chunk arrived before :start_ai_response_stream — fetch snapshot from runner and apply + def handle_info( + {:reasoning_chunk_content, reasoning_content}, + %{assigns: %{streaming_response: nil}} = socket + ) do + base = get_snapshot(socket) |> Map.update!(:reasoning_content, &(&1 <> reasoning_content)) + {:noreply, assign(socket, streaming_response: base)} + end + def handle_info({:reasoning_chunk_content, reasoning_content}, socket) do updated_response = %{ socket.assigns.streaming_response @@ -114,6 +134,14 @@ defmodule ElixirAiWeb.ChatLive do {:noreply, assign(socket, streaming_response: updated_response)} end + def handle_info( + {:text_chunk_content, text_content}, + %{assigns: %{streaming_response: nil}} = socket + ) do + base = get_snapshot(socket) |> Map.update!(:content, &(&1 <> text_content)) + {:noreply, assign(socket, streaming_response: base)} + end + def handle_info({:text_chunk_content, text_content}, socket) do updated_response = %{ socket.assigns.streaming_response @@ -158,4 +186,12 @@ defmodule ElixirAiWeb.ChatLive do Logger.info("setting background color to #{color}") {:noreply, assign(socket, background_color: color)} end + + defp get_snapshot(socket) do + ChatRunner.get_streaming_response(socket.assigns.conversation_name) + |> case do + nil -> %{id: nil, content: "", reasoning_content: "", tool_calls: []} + snapshot -> snapshot + end + end end diff --git a/lib/elixir_ai_web/live/home_live.ex b/lib/elixir_ai_web/live/home_live.ex new file mode 100644 index 0000000..7b8f06d --- /dev/null +++ b/lib/elixir_ai_web/live/home_live.ex @@ -0,0 +1,80 @@ +defmodule ElixirAiWeb.HomeLive do + use ElixirAiWeb, :live_view + alias ElixirAi.ConversationManager + + def mount(_params, _session, socket) do + {:ok, + socket + |> assign(conversations: ConversationManager.list_conversations()) + |> assign(new_name: "") + |> assign(error: nil)} + end + + def render(assigns) do + ~H""" +
+

Conversations

+ +
    + <%= if @conversations == [] do %> +
  • No conversations yet.
  • + <% end %> + <%= for name <- @conversations do %> +
  • + <.link + navigate={~p"/chat/#{name}"} + class="block px-4 py-2 rounded-lg border border-cyan-900/40 bg-cyan-950/20 text-cyan-300 hover:border-cyan-700 hover:bg-cyan-950/40 transition-colors text-sm" + > + {name} + +
  • + <% end %> +
+ +
+ + +
+ + <%= if @error do %> +

{@error}

+ <% end %> +
+ """ + end + + @spec handle_event(<<_::48>>, map(), any()) :: {:noreply, any()} + def handle_event("create", %{"name" => name}, socket) do + name = String.trim(name) + + if name == "" do + {:noreply, assign(socket, error: "Name can't be blank")} + else + case ConversationManager.create_conversation(name) do + {:ok, _pid} -> + {:noreply, + socket + |> push_navigate(to: ~p"/chat/#{name}") + |> assign(error: nil)} + + {:error, :already_exists} -> + {:noreply, assign(socket, error: "A conversation with that name already exists")} + + _ -> + {:noreply, assign(socket, error: "Failed to create conversation")} + end + end + end +end diff --git a/lib/elixir_ai_web/router.ex b/lib/elixir_ai_web/router.ex index 1726911..d091cb3 100644 --- a/lib/elixir_ai_web/router.ex +++ b/lib/elixir_ai_web/router.ex @@ -17,7 +17,8 @@ defmodule ElixirAiWeb.Router do scope "/", ElixirAiWeb do pipe_through :browser - get "/", PageController, :home + live "/", HomeLive + live "/chat/:name", ChatLive end # Other scopes may use custom stacks.