This commit is contained in:
@@ -76,7 +76,7 @@ Hooks.ScrollBottom = {
|
|||||||
});
|
});
|
||||||
this.observer.observe(this.el, { childList: true, subtree: true });
|
this.observer.observe(this.el, { childList: true, subtree: true });
|
||||||
this.handleEvent("scroll_to_bottom", () => {
|
this.handleEvent("scroll_to_bottom", () => {
|
||||||
this.scrollToBottom();
|
requestAnimationFrame(() => this.scrollToBottom());
|
||||||
});
|
});
|
||||||
},
|
},
|
||||||
updated() {
|
updated() {
|
||||||
|
|||||||
@@ -167,12 +167,13 @@ defmodule ElixirAi.ConversationManager do
|
|||||||
|
|
||||||
# Returns the full conversation state using the pid directly, bypassing the
|
# Returns the full conversation state using the pid directly, bypassing the
|
||||||
# Horde registry (which may not have synced yet on the calling node).
|
# Horde registry (which may not have synced yet on the calling node).
|
||||||
|
# Also includes the runner pid so the caller can make further direct calls.
|
||||||
defp reply_with_conversation(name, state) do
|
defp reply_with_conversation(name, state) do
|
||||||
case start_and_subscribe(name, state) do
|
case start_and_subscribe(name, state) do
|
||||||
{:ok, pid, new_subscriptions, new_runners} ->
|
{:ok, pid, new_subscriptions, new_runners} ->
|
||||||
new_state = %{state | subscriptions: new_subscriptions, runners: new_runners}
|
new_state = %{state | subscriptions: new_subscriptions, runners: new_runners}
|
||||||
conversation = GenServer.call(pid, :get_conversation)
|
conversation = GenServer.call(pid, :get_conversation)
|
||||||
{:reply, {:ok, conversation}, new_state}
|
{:reply, {:ok, Map.put(conversation, :runner_pid, pid)}, new_state}
|
||||||
|
|
||||||
{:error, _reason} = error ->
|
{:error, _reason} = error ->
|
||||||
{:reply, error, state}
|
{:reply, error, state}
|
||||||
|
|||||||
@@ -14,11 +14,13 @@ defmodule ElixirAiWeb.ChatLive do
|
|||||||
if connected?(socket) do
|
if connected?(socket) do
|
||||||
Phoenix.PubSub.subscribe(ElixirAi.PubSub, chat_topic(name))
|
Phoenix.PubSub.subscribe(ElixirAi.PubSub, chat_topic(name))
|
||||||
:pg.join(ElixirAi.LiveViewPG, {:liveview, __MODULE__}, self())
|
:pg.join(ElixirAi.LiveViewPG, {:liveview, __MODULE__}, self())
|
||||||
|
send(self(), :sync_streaming)
|
||||||
end
|
end
|
||||||
|
|
||||||
{:ok,
|
{:ok,
|
||||||
socket
|
socket
|
||||||
|> assign(conversation_name: name)
|
|> assign(conversation_name: name)
|
||||||
|
|> assign(runner_pid: Map.get(conversation, :runner_pid))
|
||||||
|> assign(user_input: "")
|
|> assign(user_input: "")
|
||||||
|> assign(messages: conversation.messages)
|
|> assign(messages: conversation.messages)
|
||||||
|> assign(streaming_response: conversation.streaming_response)
|
|> assign(streaming_response: conversation.streaming_response)
|
||||||
@@ -128,6 +130,35 @@ defmodule ElixirAiWeb.ChatLive do
|
|||||||
{:noreply, assign(socket, streaming_response: nil, ai_error: nil)}
|
{:noreply, assign(socket, streaming_response: nil, ai_error: nil)}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# Fetches the authoritative streaming snapshot directly from the runner pid,
|
||||||
|
# bypassing the Horde registry. Sent to self immediately after subscribing on
|
||||||
|
# connect so it is the first message processed — before any PubSub chunks.
|
||||||
|
def handle_info(:sync_streaming, %{assigns: %{runner_pid: pid}} = socket)
|
||||||
|
when is_pid(pid) do
|
||||||
|
case GenServer.call(pid, :get_streaming_response) do
|
||||||
|
nil ->
|
||||||
|
{:noreply, assign(socket, streaming_response: nil)}
|
||||||
|
|
||||||
|
%{content: content, reasoning_content: reasoning_content} = snapshot ->
|
||||||
|
socket =
|
||||||
|
socket
|
||||||
|
|> assign(streaming_response: snapshot)
|
||||||
|
|> then(fn s ->
|
||||||
|
if content != "", do: push_event(s, "md_chunk", %{chunk: content}), else: s
|
||||||
|
end)
|
||||||
|
|> then(fn s ->
|
||||||
|
if reasoning_content != "",
|
||||||
|
do: push_event(s, "reasoning_chunk", %{chunk: reasoning_content}),
|
||||||
|
else: s
|
||||||
|
end)
|
||||||
|
|> push_event("scroll_to_bottom", %{})
|
||||||
|
|
||||||
|
{:noreply, socket}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def handle_info(:sync_streaming, socket), do: {:noreply, socket}
|
||||||
|
|
||||||
def handle_info({:user_chat_message, message}, socket) do
|
def handle_info({:user_chat_message, message}, socket) do
|
||||||
{:noreply,
|
{:noreply,
|
||||||
socket
|
socket
|
||||||
@@ -235,6 +266,13 @@ defmodule ElixirAiWeb.ChatLive do
|
|||||||
{:noreply, assign(socket, background_color: color)}
|
{:noreply, assign(socket, background_color: color)}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
defp get_snapshot(%{assigns: %{runner_pid: pid}} = _socket) when is_pid(pid) do
|
||||||
|
case GenServer.call(pid, :get_streaming_response) do
|
||||||
|
nil -> %{id: nil, content: "", reasoning_content: "", tool_calls: []}
|
||||||
|
snapshot -> snapshot
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
defp get_snapshot(socket) do
|
defp get_snapshot(socket) do
|
||||||
ChatRunner.get_streaming_response(socket.assigns.conversation_name)
|
ChatRunner.get_streaming_response(socket.assigns.conversation_name)
|
||||||
|> case do
|
|> case do
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ defmodule ElixirAiWeb.ChatLiveTest do
|
|||||||
|
|
||||||
setup do
|
setup do
|
||||||
stub(ElixirAi.ConversationManager, :open_conversation, fn _name ->
|
stub(ElixirAi.ConversationManager, :open_conversation, fn _name ->
|
||||||
{:ok, %{messages: [], streaming_response: nil, provider: nil}}
|
{:ok, %{messages: [], streaming_response: nil, provider: nil, runner_pid: nil}}
|
||||||
end)
|
end)
|
||||||
|
|
||||||
:ok
|
:ok
|
||||||
|
|||||||
Reference in New Issue
Block a user