defmodule ElixirAiWeb.AdminLive do import ElixirAi.PubsubTopics use ElixirAiWeb, :live_view require Logger def mount(_params, _session, socket) do socket = if connected?(socket) do :net_kernel.monitor_nodes(true) # Join before monitoring so our own join doesn't trigger a spurious refresh. :pg.join(ElixirAi.LiveViewPG, {:liveview, __MODULE__}, self()) {pg_ref, _} = :pg.monitor_scope(ElixirAi.LiveViewPG) {runner_pg_ref, _} = :pg.monitor_scope(ElixirAi.RunnerPG) {singleton_pg_ref, _} = :pg.monitor_scope(ElixirAi.SingletonPG) socket |> assign(pg_ref: pg_ref) |> assign(runner_pg_ref: runner_pg_ref) |> assign(singleton_pg_ref: singleton_pg_ref) else assign(socket, pg_ref: nil, runner_pg_ref: nil, singleton_pg_ref: nil) end {:ok, socket |> assign(nodes: gather_node_statuses()) |> assign(singleton_locations: gather_singleton_locations()) |> assign(chat_runners: gather_chat_runners()) |> assign(liveviews: gather_liveviews())} end def handle_info({:nodeup, _node}, socket) do {:noreply, assign(socket, nodes: gather_node_statuses())} end def handle_info({:nodedown, _node}, socket) do {:noreply, assign(socket, nodes: gather_node_statuses())} end def handle_info(:refresh_singletons, socket) do {:noreply, assign(socket, singleton_locations: gather_singleton_locations())} end def handle_info({ref, change, _group, _pids}, %{assigns: %{singleton_pg_ref: ref}} = socket) when is_reference(ref) and change in [:join, :leave] do {:noreply, assign(socket, singleton_locations: gather_singleton_locations())} end def handle_info({ref, change, _group, _pids}, %{assigns: %{pg_ref: ref}} = socket) when is_reference(ref) and change in [:join, :leave] do {:noreply, assign(socket, liveviews: gather_liveviews())} end def handle_info({ref, change, _group, _pids}, %{assigns: %{runner_pg_ref: ref}} = socket) when is_reference(ref) and change in [:join, :leave] do {:noreply, assign(socket, chat_runners: gather_chat_runners())} end defp gather_node_statuses do located = ElixirAi.ClusterSingletonLauncher.singleton_locations() Enum.map([Node.self() | Node.list()], fn n -> status = if Enum.any?(located, fn {_, loc} -> loc == n end), do: :running, else: :not_running {n, status} end) end defp gather_singleton_locations do ElixirAi.ClusterSingletonLauncher.singleton_locations() end # All ChatRunner entries via :pg membership, keyed by conversation name. # Each entry is a {name, node, pid} tuple. defp gather_chat_runners do :pg.which_groups(ElixirAi.RunnerPG) |> Enum.flat_map(fn {:runner, name} -> :pg.get_members(ElixirAi.RunnerPG, {:runner, name}) |> Enum.map(fn pid -> {name, node(pid), pid} end) _ -> [] end) |> Enum.sort_by(&elem(&1, 0)) end # :pg is cluster-wide — one local call returns members from all nodes. # Processes are automatically removed from their group when they die. defp gather_liveviews do :pg.which_groups(ElixirAi.LiveViewPG) |> Enum.flat_map(fn {:liveview, view} -> :pg.get_members(ElixirAi.LiveViewPG, {:liveview, view}) |> Enum.map(fn pid -> {view, node(pid)} end) _ -> [] end) end def render(assigns) do ~H"""

Cluster Admin

<%= for {node, status} <- @nodes do %> <% node_singletons = Enum.filter(@singleton_locations, fn {_, loc} -> loc == node end) %> <% node_runners = Enum.filter(@chat_runners, fn {_, rnode, _} -> rnode == node end) %> <% node_liveviews = @liveviews |> Enum.filter(fn {_, n} -> n == node end) |> Enum.group_by(fn {view, _} -> view end) %>
{node} <%= if node == Node.self() do %> self <% end %>
<.status_badge status={status} />
<%= if node_singletons != [] do %>

Singletons

<%= for {module, _} <- node_singletons do %>
{inspect(module)}
<% end %>
<% end %> <%= if node_runners != [] do %>

Chat Runners {length(node_runners)}

<%= for {name, _, _} <- node_runners do %>
{name}
<% end %>
<% end %> <%= if node_liveviews != %{} do %>

LiveViews

<%= for {view, instances} <- node_liveviews do %>
{short_module(view)} ×{length(instances)}
<% end %>
<% end %> <%= if node_singletons == [] and node_runners == [] and node_liveviews == %{} do %>

No active processes

<% end %>
<% end %>
<% unlocated = Enum.filter(@singleton_locations, fn {_, loc} -> is_nil(loc) end) %> <%= if unlocated != [] do %>

Singletons Not Running

<%= for {module, _} <- unlocated do %> {inspect(module)} <% end %>
<% end %>

Nodes, singletons, liveviews & runners all refresh on membership changes.

""" end defp short_module(module) when is_atom(module) do module |> Atom.to_string() |> String.replace_prefix("Elixir.", "") |> String.split(".") |> List.last() end defp status_badge(assigns) do ~H""" <%= case @status do %> <% :started -> %> started <% :pending -> %> pending <% :unreachable -> %> unreachable <% other -> %> {inspect(other)} <% end %> """ end end