From 85eb8bcefaba671a6f8fa8a4bf4e6d43dc508871 Mon Sep 17 00:00:00 2001 From: Alex Mickelson Date: Fri, 20 Mar 2026 12:16:02 -0600 Subject: [PATCH] better resume --- assets/js/app.js | 2 +- lib/elixir_ai/conversation_manager.ex | 3 +- lib/elixir_ai_web/chat/chat_live.ex | 38 ++++++++++++++++++++++ test/elixir_ai_web/live/chat_live_test.exs | 2 +- 4 files changed, 42 insertions(+), 3 deletions(-) diff --git a/assets/js/app.js b/assets/js/app.js index 4faaf95..9d38d2f 100644 --- a/assets/js/app.js +++ b/assets/js/app.js @@ -76,7 +76,7 @@ Hooks.ScrollBottom = { }); this.observer.observe(this.el, { childList: true, subtree: true }); this.handleEvent("scroll_to_bottom", () => { - this.scrollToBottom(); + requestAnimationFrame(() => this.scrollToBottom()); }); }, updated() { diff --git a/lib/elixir_ai/conversation_manager.ex b/lib/elixir_ai/conversation_manager.ex index 51de04c..d0943c8 100644 --- a/lib/elixir_ai/conversation_manager.ex +++ b/lib/elixir_ai/conversation_manager.ex @@ -167,12 +167,13 @@ defmodule ElixirAi.ConversationManager do # Returns the full conversation state using the pid directly, bypassing the # Horde registry (which may not have synced yet on the calling node). + # Also includes the runner pid so the caller can make further direct calls. defp reply_with_conversation(name, state) do case start_and_subscribe(name, state) do {:ok, pid, new_subscriptions, new_runners} -> new_state = %{state | subscriptions: new_subscriptions, runners: new_runners} conversation = GenServer.call(pid, :get_conversation) - {:reply, {:ok, conversation}, new_state} + {:reply, {:ok, Map.put(conversation, :runner_pid, pid)}, new_state} {:error, _reason} = error -> {:reply, error, state} diff --git a/lib/elixir_ai_web/chat/chat_live.ex b/lib/elixir_ai_web/chat/chat_live.ex index 1f739ea..4aafcee 100644 --- a/lib/elixir_ai_web/chat/chat_live.ex +++ b/lib/elixir_ai_web/chat/chat_live.ex @@ -14,11 +14,13 @@ defmodule ElixirAiWeb.ChatLive do if connected?(socket) do Phoenix.PubSub.subscribe(ElixirAi.PubSub, chat_topic(name)) :pg.join(ElixirAi.LiveViewPG, {:liveview, __MODULE__}, self()) + send(self(), :sync_streaming) end {:ok, socket |> assign(conversation_name: name) + |> assign(runner_pid: Map.get(conversation, :runner_pid)) |> assign(user_input: "") |> assign(messages: conversation.messages) |> assign(streaming_response: conversation.streaming_response) @@ -128,6 +130,35 @@ defmodule ElixirAiWeb.ChatLive do {:noreply, assign(socket, streaming_response: nil, ai_error: nil)} end + # Fetches the authoritative streaming snapshot directly from the runner pid, + # bypassing the Horde registry. Sent to self immediately after subscribing on + # connect so it is the first message processed — before any PubSub chunks. + def handle_info(:sync_streaming, %{assigns: %{runner_pid: pid}} = socket) + when is_pid(pid) do + case GenServer.call(pid, :get_streaming_response) do + nil -> + {:noreply, assign(socket, streaming_response: nil)} + + %{content: content, reasoning_content: reasoning_content} = snapshot -> + socket = + socket + |> assign(streaming_response: snapshot) + |> then(fn s -> + if content != "", do: push_event(s, "md_chunk", %{chunk: content}), else: s + end) + |> then(fn s -> + if reasoning_content != "", + do: push_event(s, "reasoning_chunk", %{chunk: reasoning_content}), + else: s + end) + |> push_event("scroll_to_bottom", %{}) + + {:noreply, socket} + end + end + + def handle_info(:sync_streaming, socket), do: {:noreply, socket} + def handle_info({:user_chat_message, message}, socket) do {:noreply, socket @@ -235,6 +266,13 @@ defmodule ElixirAiWeb.ChatLive do {:noreply, assign(socket, background_color: color)} end + defp get_snapshot(%{assigns: %{runner_pid: pid}} = _socket) when is_pid(pid) do + case GenServer.call(pid, :get_streaming_response) do + nil -> %{id: nil, content: "", reasoning_content: "", tool_calls: []} + snapshot -> snapshot + end + end + defp get_snapshot(socket) do ChatRunner.get_streaming_response(socket.assigns.conversation_name) |> case do diff --git a/test/elixir_ai_web/live/chat_live_test.exs b/test/elixir_ai_web/live/chat_live_test.exs index 556edc1..5fe2c88 100644 --- a/test/elixir_ai_web/live/chat_live_test.exs +++ b/test/elixir_ai_web/live/chat_live_test.exs @@ -4,7 +4,7 @@ defmodule ElixirAiWeb.ChatLiveTest do setup do stub(ElixirAi.ConversationManager, :open_conversation, fn _name -> - {:ok, %{messages: [], streaming_response: nil, provider: nil}} + {:ok, %{messages: [], streaming_response: nil, provider: nil, runner_pid: nil}} end) :ok