This commit is contained in:
@@ -1,108 +1,129 @@
|
||||
defmodule ElixirAiWeb.AdminLive do
|
||||
import ElixirAi.PubsubTopics
|
||||
use ElixirAiWeb, :live_view
|
||||
require Logger
|
||||
|
||||
@refresh_ms 1_000
|
||||
|
||||
def mount(_params, _session, socket) do
|
||||
if connected?(socket) do
|
||||
:net_kernel.monitor_nodes(true)
|
||||
:pg.join(ElixirAi.LiveViewPG, {:liveview, __MODULE__}, self())
|
||||
schedule_refresh()
|
||||
end
|
||||
socket =
|
||||
if connected?(socket) do
|
||||
:net_kernel.monitor_nodes(true)
|
||||
# Join before monitoring so our own join doesn't trigger a spurious refresh.
|
||||
:pg.join(ElixirAi.LiveViewPG, {:liveview, __MODULE__}, self())
|
||||
{pg_ref, _} = :pg.monitor_scope(ElixirAi.LiveViewPG)
|
||||
{runner_pg_ref, _} = :pg.monitor_scope(ElixirAi.RunnerPG)
|
||||
{singleton_pg_ref, _} = :pg.monitor_scope(ElixirAi.SingletonPG)
|
||||
|
||||
{:ok, assign(socket, cluster_info: gather_info())}
|
||||
socket
|
||||
|> assign(pg_ref: pg_ref)
|
||||
|> assign(runner_pg_ref: runner_pg_ref)
|
||||
|> assign(singleton_pg_ref: singleton_pg_ref)
|
||||
else
|
||||
assign(socket, pg_ref: nil, runner_pg_ref: nil, singleton_pg_ref: nil)
|
||||
end
|
||||
|
||||
{:ok,
|
||||
socket
|
||||
|> assign(nodes: gather_node_statuses())
|
||||
|> assign(singleton_locations: gather_singleton_locations())
|
||||
|> assign(chat_runners: gather_chat_runners())
|
||||
|> assign(liveviews: gather_liveviews())}
|
||||
end
|
||||
|
||||
def handle_info({:nodeup, _node}, socket) do
|
||||
{:noreply, assign(socket, cluster_info: gather_info())}
|
||||
{:noreply, assign(socket, nodes: gather_node_statuses())}
|
||||
end
|
||||
|
||||
def handle_info({:nodedown, _node}, socket) do
|
||||
{:noreply, assign(socket, cluster_info: gather_info())}
|
||||
{:noreply, assign(socket, nodes: gather_node_statuses())}
|
||||
end
|
||||
|
||||
def handle_info(:refresh, socket) do
|
||||
schedule_refresh()
|
||||
{:noreply, assign(socket, cluster_info: gather_info())}
|
||||
def handle_info(:refresh_singletons, socket) do
|
||||
{:noreply, assign(socket, singleton_locations: gather_singleton_locations())}
|
||||
end
|
||||
|
||||
defp schedule_refresh, do: Process.send_after(self(), :refresh, @refresh_ms)
|
||||
def handle_info({ref, change, _group, _pids}, %{assigns: %{singleton_pg_ref: ref}} = socket)
|
||||
when is_reference(ref) and change in [:join, :leave] do
|
||||
{:noreply, assign(socket, singleton_locations: gather_singleton_locations())}
|
||||
end
|
||||
|
||||
defp gather_info do
|
||||
import ElixirAi.PubsubTopics
|
||||
def handle_info({ref, change, _group, _pids}, %{assigns: %{pg_ref: ref}} = socket)
|
||||
when is_reference(ref) and change in [:join, :leave] do
|
||||
{:noreply, assign(socket, liveviews: gather_liveviews())}
|
||||
end
|
||||
|
||||
def handle_info({ref, change, _group, _pids}, %{assigns: %{runner_pg_ref: ref}} = socket)
|
||||
when is_reference(ref) and change in [:join, :leave] do
|
||||
{:noreply, assign(socket, chat_runners: gather_chat_runners())}
|
||||
end
|
||||
|
||||
defp gather_node_statuses do
|
||||
all_nodes = [Node.self() | Node.list()]
|
||||
configured = ElixirAi.ClusterSingleton.configured_singletons()
|
||||
|
||||
node_statuses =
|
||||
Enum.map(all_nodes, fn node ->
|
||||
status =
|
||||
if node == Node.self() do
|
||||
try do
|
||||
ElixirAi.ClusterSingleton.status()
|
||||
catch
|
||||
_, _ -> :unreachable
|
||||
end
|
||||
else
|
||||
case :rpc.call(node, ElixirAi.ClusterSingleton, :status, [], 3_000) do
|
||||
{:badrpc, _} -> :unreachable
|
||||
result -> result
|
||||
end
|
||||
Enum.map(all_nodes, fn node ->
|
||||
status =
|
||||
if node == Node.self() do
|
||||
try do
|
||||
ElixirAi.ClusterSingleton.status()
|
||||
catch
|
||||
_, _ -> :unreachable
|
||||
end
|
||||
|
||||
{node, status}
|
||||
end)
|
||||
|
||||
singleton_locations =
|
||||
Enum.map(configured, fn module ->
|
||||
location =
|
||||
case Horde.Registry.lookup(ElixirAi.ChatRegistry, module) do
|
||||
[{pid, _}] -> node(pid)
|
||||
_ -> nil
|
||||
else
|
||||
case :rpc.call(node, ElixirAi.ClusterSingleton, :status, [], 3_000) do
|
||||
{:badrpc, _} -> :unreachable
|
||||
result -> result
|
||||
end
|
||||
end
|
||||
|
||||
{module, location}
|
||||
end)
|
||||
{node, status}
|
||||
end)
|
||||
end
|
||||
|
||||
# All ChatRunner entries in the distributed registry, keyed by conversation name.
|
||||
# Each entry is a {name, node, pid, supervisor_node} tuple.
|
||||
chat_runners =
|
||||
Horde.DynamicSupervisor.which_children(ElixirAi.ChatRunnerSupervisor)
|
||||
defp gather_singleton_locations do
|
||||
running =
|
||||
:pg.which_groups(ElixirAi.SingletonPG)
|
||||
|> Enum.flat_map(fn
|
||||
{_, pid, _, _} when is_pid(pid) ->
|
||||
case Horde.Registry.select(ElixirAi.ChatRegistry, [
|
||||
{{:"$1", pid, :"$2"}, [], [{{:"$1", pid, :"$2"}}]}
|
||||
]) do
|
||||
[{name, ^pid, _}] when is_binary(name) -> [{name, node(pid), pid}]
|
||||
{:singleton, module} ->
|
||||
case :pg.get_members(ElixirAi.SingletonPG, {:singleton, module}) do
|
||||
[pid | _] -> [{module, node(pid)}]
|
||||
_ -> []
|
||||
end
|
||||
|
||||
_ ->
|
||||
[]
|
||||
end)
|
||||
|> Enum.sort_by(&elem(&1, 0))
|
||||
|> Map.new()
|
||||
|
||||
# :pg is cluster-wide — one local call returns members from all nodes.
|
||||
# Processes are automatically removed from their group when they die.
|
||||
liveviews =
|
||||
:pg.which_groups(ElixirAi.LiveViewPG)
|
||||
|> Enum.flat_map(fn
|
||||
{:liveview, view} ->
|
||||
:pg.get_members(ElixirAi.LiveViewPG, {:liveview, view})
|
||||
|> Enum.map(fn pid -> {view, node(pid)} end)
|
||||
ElixirAi.ClusterSingleton.configured_singletons()
|
||||
|> Enum.map(fn module -> {module, Map.get(running, module)} end)
|
||||
end
|
||||
|
||||
_ ->
|
||||
[]
|
||||
end)
|
||||
# All ChatRunner entries via :pg membership, keyed by conversation name.
|
||||
# Each entry is a {name, node, pid} tuple.
|
||||
defp gather_chat_runners do
|
||||
:pg.which_groups(ElixirAi.RunnerPG)
|
||||
|> Enum.flat_map(fn
|
||||
{:runner, name} ->
|
||||
:pg.get_members(ElixirAi.RunnerPG, {:runner, name})
|
||||
|> Enum.map(fn pid -> {name, node(pid), pid} end)
|
||||
|
||||
%{
|
||||
nodes: node_statuses,
|
||||
configured_singletons: configured,
|
||||
singleton_locations: singleton_locations,
|
||||
chat_runners: chat_runners,
|
||||
liveviews: liveviews
|
||||
}
|
||||
_ ->
|
||||
[]
|
||||
end)
|
||||
|> Enum.sort_by(&elem(&1, 0))
|
||||
end
|
||||
|
||||
# :pg is cluster-wide — one local call returns members from all nodes.
|
||||
# Processes are automatically removed from their group when they die.
|
||||
defp gather_liveviews do
|
||||
:pg.which_groups(ElixirAi.LiveViewPG)
|
||||
|> Enum.flat_map(fn
|
||||
{:liveview, view} ->
|
||||
:pg.get_members(ElixirAi.LiveViewPG, {:liveview, view})
|
||||
|> Enum.map(fn pid -> {view, node(pid)} end)
|
||||
|
||||
_ ->
|
||||
[]
|
||||
end)
|
||||
end
|
||||
|
||||
def render(assigns) do
|
||||
@@ -111,13 +132,13 @@ defmodule ElixirAiWeb.AdminLive do
|
||||
<h1 class="text-lg font-semibold text-seafoam-200 tracking-wide">Cluster Admin</h1>
|
||||
|
||||
<div class="grid gap-4 grid-cols-1 lg:grid-cols-2 xl:grid-cols-3">
|
||||
<%= for {node, status} <- @cluster_info.nodes do %>
|
||||
<%= for {node, status} <- @nodes do %>
|
||||
<% node_singletons =
|
||||
Enum.filter(@cluster_info.singleton_locations, fn {_, loc} -> loc == node end) %>
|
||||
Enum.filter(@singleton_locations, fn {_, loc} -> loc == node end) %>
|
||||
<% node_runners =
|
||||
Enum.filter(@cluster_info.chat_runners, fn {_, rnode, _} -> rnode == node end) %>
|
||||
Enum.filter(@chat_runners, fn {_, rnode, _} -> rnode == node end) %>
|
||||
<% node_liveviews =
|
||||
@cluster_info.liveviews
|
||||
@liveviews
|
||||
|> Enum.filter(fn {_, n} -> n == node end)
|
||||
|> Enum.group_by(fn {view, _} -> view end) %>
|
||||
|
||||
@@ -126,7 +147,9 @@ defmodule ElixirAiWeb.AdminLive do
|
||||
<div class="flex items-center gap-2">
|
||||
<span class="font-mono text-sm font-semibold text-seafoam-200">{node}</span>
|
||||
<%= if node == Node.self() do %>
|
||||
<span class="text-xs bg-seafoam-800/50 text-seafoam-400 px-1.5 py-0.5 rounded">self</span>
|
||||
<span class="text-xs bg-seafoam-800/50 text-seafoam-400 px-1.5 py-0.5 rounded">
|
||||
self
|
||||
</span>
|
||||
<% end %>
|
||||
</div>
|
||||
<.status_badge status={status} />
|
||||
@@ -191,7 +214,7 @@ defmodule ElixirAiWeb.AdminLive do
|
||||
</div>
|
||||
|
||||
<% unlocated =
|
||||
Enum.filter(@cluster_info.singleton_locations, fn {_, loc} -> is_nil(loc) end) %>
|
||||
Enum.filter(@singleton_locations, fn {_, loc} -> is_nil(loc) end) %>
|
||||
<%= if unlocated != [] do %>
|
||||
<section>
|
||||
<h2 class="text-xs font-semibold uppercase tracking-widest text-red-500 mb-2">
|
||||
@@ -207,7 +230,9 @@ defmodule ElixirAiWeb.AdminLive do
|
||||
</section>
|
||||
<% end %>
|
||||
|
||||
<p class="text-xs text-seafoam-800">Refreshes every 1s or on node events.</p>
|
||||
<p class="text-xs text-seafoam-800">
|
||||
Nodes, singletons, liveviews & runners all refresh on membership changes.
|
||||
</p>
|
||||
</div>
|
||||
"""
|
||||
end
|
||||
183
lib/elixir_ai_web/features/chat/stream_handler.ex
Normal file
183
lib/elixir_ai_web/features/chat/stream_handler.ex
Normal file
@@ -0,0 +1,183 @@
|
||||
defmodule ElixirAi.ChatRunner.StreamHandler do
|
||||
require Logger
|
||||
import ElixirAi.ChatRunner.OutboundHelpers
|
||||
|
||||
def handle({:start_new_ai_response, id}, state) do
|
||||
starting_response = %{id: id, reasoning_content: "", content: "", tool_calls: []}
|
||||
broadcast_ui(state.name, {:start_ai_response_stream, starting_response})
|
||||
{:noreply, %{state | streaming_response: starting_response}}
|
||||
end
|
||||
|
||||
def handle({:ai_reasoning_chunk, _id, reasoning_content}, state) do
|
||||
broadcast_ui(state.name, {:reasoning_chunk_content, reasoning_content})
|
||||
|
||||
{:noreply,
|
||||
%{
|
||||
state
|
||||
| streaming_response: %{
|
||||
state.streaming_response
|
||||
| reasoning_content: state.streaming_response.reasoning_content <> reasoning_content
|
||||
}
|
||||
}}
|
||||
end
|
||||
|
||||
def handle({:ai_text_chunk, _id, text_content}, state) do
|
||||
broadcast_ui(state.name, {:text_chunk_content, text_content})
|
||||
|
||||
{:noreply,
|
||||
%{
|
||||
state
|
||||
| streaming_response: %{
|
||||
state.streaming_response
|
||||
| content: state.streaming_response.content <> text_content
|
||||
}
|
||||
}}
|
||||
end
|
||||
|
||||
def handle({:ai_text_stream_finish, _id}, state) do
|
||||
Logger.info(
|
||||
"AI stream finished for id #{state.streaming_response.id}, broadcasting end of AI response"
|
||||
)
|
||||
|
||||
final_message = %{
|
||||
role: :assistant,
|
||||
content: state.streaming_response.content,
|
||||
reasoning_content: state.streaming_response.reasoning_content,
|
||||
tool_calls: state.streaming_response.tool_calls
|
||||
}
|
||||
|
||||
broadcast_ui(state.name, {:end_ai_response, final_message})
|
||||
store_message(state.name, final_message)
|
||||
|
||||
{:noreply,
|
||||
%{
|
||||
state
|
||||
| streaming_response: nil,
|
||||
messages: state.messages ++ [final_message]
|
||||
}}
|
||||
end
|
||||
|
||||
def handle(
|
||||
{:ai_tool_call_start, _id, {tool_name, tool_args_start, tool_index, tool_call_id}},
|
||||
state
|
||||
) do
|
||||
Logger.info("AI started tool call #{tool_name}")
|
||||
|
||||
new_streaming_response = %{
|
||||
state.streaming_response
|
||||
| tool_calls:
|
||||
state.streaming_response.tool_calls ++
|
||||
[
|
||||
%{
|
||||
name: tool_name,
|
||||
arguments: tool_args_start,
|
||||
index: tool_index,
|
||||
id: tool_call_id
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
{:noreply, %{state | streaming_response: new_streaming_response}}
|
||||
end
|
||||
|
||||
def handle({:ai_tool_call_middle, _id, {tool_args_diff, tool_index}}, state) do
|
||||
new_streaming_response = %{
|
||||
state.streaming_response
|
||||
| tool_calls:
|
||||
Enum.map(state.streaming_response.tool_calls, fn
|
||||
%{arguments: existing_args, index: ^tool_index} = tool_call ->
|
||||
%{tool_call | arguments: existing_args <> tool_args_diff}
|
||||
|
||||
other ->
|
||||
other
|
||||
end)
|
||||
}
|
||||
|
||||
{:noreply, %{state | streaming_response: new_streaming_response}}
|
||||
end
|
||||
|
||||
def handle({:ai_tool_call_end, id}, state) do
|
||||
tool_request_message = %{
|
||||
role: :assistant,
|
||||
content: state.streaming_response.content,
|
||||
reasoning_content: state.streaming_response.reasoning_content,
|
||||
tool_calls: state.streaming_response.tool_calls
|
||||
}
|
||||
|
||||
broadcast_ui(state.name, {:tool_request_message, tool_request_message})
|
||||
|
||||
{failed_call_messages, pending_call_ids} =
|
||||
Enum.reduce(state.streaming_response.tool_calls, {[], []}, fn tool_call,
|
||||
{failed, pending} ->
|
||||
with {:ok, decoded_args} <- Jason.decode(tool_call.arguments),
|
||||
tool when not is_nil(tool) <-
|
||||
Enum.find(state.server_tools ++ state.liveview_tools ++ state.page_tools, fn t ->
|
||||
t.name == tool_call.name
|
||||
end) do
|
||||
tool.run_function.(id, tool_call.id, decoded_args)
|
||||
{failed, [tool_call.id | pending]}
|
||||
else
|
||||
{:error, e} ->
|
||||
error_msg = "Failed to decode tool arguments: #{inspect(e)}"
|
||||
Logger.error("Tool call #{tool_call.name} failed: #{error_msg}")
|
||||
{[%{role: :tool, content: error_msg, tool_call_id: tool_call.id} | failed], pending}
|
||||
|
||||
nil ->
|
||||
error_msg = "No tool definition found for #{tool_call.name}"
|
||||
Logger.error(error_msg)
|
||||
{[%{role: :tool, content: error_msg, tool_call_id: tool_call.id} | failed], pending}
|
||||
end
|
||||
end)
|
||||
|
||||
store_message(state.name, [tool_request_message] ++ failed_call_messages)
|
||||
|
||||
{:noreply,
|
||||
%{
|
||||
state
|
||||
| messages: state.messages ++ [tool_request_message] ++ failed_call_messages,
|
||||
pending_tool_calls: pending_call_ids
|
||||
}}
|
||||
end
|
||||
|
||||
def handle({:tool_response, _id, tool_call_id, result}, state) do
|
||||
new_message = %{role: :tool, content: inspect(result), tool_call_id: tool_call_id}
|
||||
|
||||
broadcast_ui(state.name, {:one_tool_finished, new_message})
|
||||
store_message(state.name, new_message)
|
||||
|
||||
new_pending_tool_calls =
|
||||
Enum.filter(state.pending_tool_calls, fn id -> id != tool_call_id end)
|
||||
|
||||
new_streaming_response =
|
||||
case new_pending_tool_calls do
|
||||
[] -> nil
|
||||
_ -> state.streaming_response
|
||||
end
|
||||
|
||||
if new_pending_tool_calls == [] do
|
||||
broadcast_ui(state.name, :tool_calls_finished)
|
||||
|
||||
ElixirAi.ChatUtils.request_ai_response(
|
||||
self(),
|
||||
messages_with_system_prompt(state.messages ++ [new_message], state.system_prompt),
|
||||
state.server_tools ++ state.liveview_tools ++ state.page_tools,
|
||||
state.provider,
|
||||
state.tool_choice
|
||||
)
|
||||
end
|
||||
|
||||
{:noreply,
|
||||
%{
|
||||
state
|
||||
| pending_tool_calls: new_pending_tool_calls,
|
||||
streaming_response: new_streaming_response,
|
||||
messages: state.messages ++ [new_message]
|
||||
}}
|
||||
end
|
||||
|
||||
def handle({:ai_request_error, reason}, state) do
|
||||
Logger.error("AI request error: #{inspect(reason)}")
|
||||
broadcast_ui(state.name, {:ai_request_error, reason})
|
||||
{:noreply, %{state | streaming_response: nil, pending_tool_calls: []}}
|
||||
end
|
||||
end
|
||||
16
lib/elixir_ai_web/utils/live_view_pg.ex
Normal file
16
lib/elixir_ai_web/utils/live_view_pg.ex
Normal file
@@ -0,0 +1,16 @@
|
||||
defmodule ElixirAi.LiveViewPG do
|
||||
@moduledoc """
|
||||
Named :pg scope for tracking LiveView processes across the cluster.
|
||||
Each LiveView joins {:liveview, ViewModule} on connect; :pg syncs membership
|
||||
automatically and removes dead processes without any additional cleanup.
|
||||
"""
|
||||
|
||||
def child_spec(_opts) do
|
||||
%{
|
||||
id: __MODULE__,
|
||||
start: {:pg, :start_link, [__MODULE__]},
|
||||
type: :worker,
|
||||
restart: :permanent
|
||||
}
|
||||
end
|
||||
end
|
||||
15
lib/elixir_ai_web/utils/page_tools_pg.ex
Normal file
15
lib/elixir_ai_web/utils/page_tools_pg.ex
Normal file
@@ -0,0 +1,15 @@
|
||||
defmodule ElixirAi.PageToolsPG do
|
||||
@moduledoc """
|
||||
Named :pg scope for tracking LiveViews that implement AiControllable.
|
||||
Group key is `{:page, voice_session_id}` — one group per browser session.
|
||||
"""
|
||||
|
||||
def child_spec(_opts) do
|
||||
%{
|
||||
id: __MODULE__,
|
||||
start: {:pg, :start_link, [__MODULE__]},
|
||||
type: :worker,
|
||||
restart: :permanent
|
||||
}
|
||||
end
|
||||
end
|
||||
Reference in New Issue
Block a user