diff --git a/lib/elixir_ai/ai_tools.ex b/lib/elixir_ai/ai_tools.ex index 9e9a234..e89822a 100644 --- a/lib/elixir_ai/ai_tools.ex +++ b/lib/elixir_ai/ai_tools.ex @@ -130,20 +130,16 @@ defmodule ElixirAi.AiTools do # Private # --------------------------------------------------------------------------- - defp dispatch_to_liveview(server, tool_name, args) do - case GenServer.call(server, :get_liveview_pid) do - nil -> + pids = GenServer.call(server, :get_liveview_pids) + + case pids do + [] -> {:ok, "no browser session active, #{tool_name} skipped"} - liveview_pid -> - send(liveview_pid, {:liveview_tool_call, tool_name, args, self()}) - - receive do - {:liveview_tool_result, result} -> result - after - 5_000 -> {:ok, "browser session timed out, #{tool_name} skipped"} - end + _ -> + Enum.each(pids, &send(&1, {:liveview_tool_call, tool_name, args})) + {:ok, "#{tool_name} sent to #{length(pids)} session(s)"} end end end diff --git a/lib/elixir_ai/chat_runner.ex b/lib/elixir_ai/chat_runner.ex index 9584cc5..47215e1 100644 --- a/lib/elixir_ai/chat_runner.ex +++ b/lib/elixir_ai/chat_runner.ex @@ -23,8 +23,8 @@ defmodule ElixirAi.ChatRunner do GenServer.call(via(name), {:register_liveview_pid, liveview_pid}) end - def deregister_liveview_pid(name) do - GenServer.call(via(name), :deregister_liveview_pid) + def deregister_liveview_pid(name, liveview_pid) when is_pid(liveview_pid) do + GenServer.call(via(name), {:deregister_liveview_pid, liveview_pid}) end @spec get_conversation(String.t()) :: any() @@ -102,8 +102,7 @@ defmodule ElixirAi.ChatRunner do server_tools: server_tools, liveview_tools: liveview_tools, provider: provider, - liveview_pid: nil, - liveview_monitor_ref: nil + liveview_pids: %{} }} end @@ -352,6 +351,17 @@ defmodule ElixirAi.ChatRunner do {:noreply, %{state | streaming_response: nil, pending_tool_calls: []}} end + def handle_info({:DOWN, ref, :process, pid, _reason}, state) do + case Map.get(state.liveview_pids, pid) do + ^ref -> + Logger.info("ChatRunner #{state.name}: LiveView #{inspect(pid)} disconnected") + {:noreply, %{state | liveview_pids: Map.delete(state.liveview_pids, pid)}} + + _ -> + {:noreply, state} + end + end + def handle_call(:get_conversation, _from, state) do {:reply, state, state} end @@ -360,20 +370,24 @@ defmodule ElixirAi.ChatRunner do {:reply, state.streaming_response, state} end - def handle_call(:get_liveview_pid, _from, state) do - {:reply, state.liveview_pid, state} + def handle_call(:get_liveview_pids, _from, state) do + {:reply, Map.keys(state.liveview_pids), state} end def handle_call({:register_liveview_pid, liveview_pid}, _from, state) do - # Clear any previous monitor - if state.liveview_monitor_ref, do: Process.demonitor(state.liveview_monitor_ref, [:flush]) ref = Process.monitor(liveview_pid) - {:reply, :ok, %{state | liveview_pid: liveview_pid, liveview_monitor_ref: ref}} + {:reply, :ok, %{state | liveview_pids: Map.put(state.liveview_pids, liveview_pid, ref)}} end - def handle_call(:deregister_liveview_pid, _from, state) do - if state.liveview_monitor_ref, do: Process.demonitor(state.liveview_monitor_ref, [:flush]) - {:reply, :ok, %{state | liveview_pid: nil, liveview_monitor_ref: nil}} + def handle_call({:deregister_liveview_pid, liveview_pid}, _from, state) do + case Map.pop(state.liveview_pids, liveview_pid) do + {nil, _} -> + {:reply, :ok, state} + + {ref, new_pids} -> + Process.demonitor(ref, [:flush]) + {:reply, :ok, %{state | liveview_pids: new_pids}} + end end def handle_call({:set_tool_choice, tool_choice}, _from, state) do @@ -395,11 +409,6 @@ defmodule ElixirAi.ChatRunner do }} end - def handle_info({:DOWN, ref, :process, _pid, _reason}, %{liveview_monitor_ref: ref} = state) do - Logger.info("ChatRunner #{state.name}: LiveView disconnected, clearing liveview_pid") - {:noreply, %{state | liveview_pid: nil, liveview_monitor_ref: nil}} - end - defp broadcast_ui(name, msg), do: Phoenix.PubSub.broadcast(ElixirAi.PubSub, chat_topic(name), msg) diff --git a/lib/elixir_ai_web/chat/chat_live.ex b/lib/elixir_ai_web/chat/chat_live.ex index 418d5a1..e12a9bd 100644 --- a/lib/elixir_ai_web/chat/chat_live.ex +++ b/lib/elixir_ai_web/chat/chat_live.ex @@ -14,6 +14,7 @@ defmodule ElixirAiWeb.ChatLive do if connected?(socket) do Phoenix.PubSub.subscribe(ElixirAi.PubSub, chat_topic(name)) :pg.join(ElixirAi.LiveViewPG, {:liveview, __MODULE__}, self()) + ChatRunner.register_liveview_pid(name, self()) send(self(), :sync_streaming) end @@ -258,11 +259,31 @@ defmodule ElixirAiWeb.ChatLive do {:noreply, assign(socket, ai_error: error_message, streaming_response: nil)} end + def handle_info({:liveview_tool_call, "set_background_color", %{"color" => color}}, socket) do + {:noreply, assign(socket, background_color: color)} + end + + def handle_info({:liveview_tool_call, "navigate_to", %{"path" => path}}, socket) do + {:noreply, push_navigate(socket, to: path)} + end + + def handle_info({:liveview_tool_call, _tool_name, _args}, socket) do + {:noreply, socket} + end + def handle_info({:set_background_color, color}, socket) do Logger.info("setting background color to #{color}") {:noreply, assign(socket, background_color: color)} end + def terminate(_reason, %{assigns: %{conversation_name: name}} = socket) do + if connected?(socket) do + ChatRunner.deregister_liveview_pid(name, self()) + end + + :ok + end + defp get_snapshot(%{assigns: %{runner_pid: pid}} = _socket) when is_pid(pid) do case GenServer.call(pid, :get_streaming_response) do nil -> %{id: nil, content: "", reasoning_content: "", tool_calls: []}