defmodule ElixirAiWeb.AdminLive do import ElixirAi.PubsubTopics use ElixirAiWeb, :live_view require Logger def mount(_params, _session, socket) do socket = if connected?(socket) do :net_kernel.monitor_nodes(true) # Join before monitoring so our own join doesn't trigger a spurious refresh. :pg.join(ElixirAi.LiveViewPG, {:liveview, __MODULE__}, self()) {pg_ref, _} = :pg.monitor_scope(ElixirAi.LiveViewPG) {runner_pg_ref, _} = :pg.monitor_scope(ElixirAi.RunnerPG) {singleton_pg_ref, _} = :pg.monitor_scope(ElixirAi.SingletonPG) socket |> assign(pg_ref: pg_ref) |> assign(runner_pg_ref: runner_pg_ref) |> assign(singleton_pg_ref: singleton_pg_ref) else assign(socket, pg_ref: nil, runner_pg_ref: nil, singleton_pg_ref: nil) end {:ok, socket |> assign(nodes: gather_node_statuses()) |> assign(singleton_locations: gather_singleton_locations()) |> assign(chat_runners: gather_chat_runners()) |> assign(liveviews: gather_liveviews())} end def handle_info({:nodeup, _node}, socket) do {:noreply, assign(socket, nodes: gather_node_statuses())} end def handle_info({:nodedown, _node}, socket) do {:noreply, assign(socket, nodes: gather_node_statuses())} end def handle_info(:refresh_singletons, socket) do {:noreply, assign(socket, singleton_locations: gather_singleton_locations())} end def handle_info({ref, change, _group, _pids}, %{assigns: %{singleton_pg_ref: ref}} = socket) when is_reference(ref) and change in [:join, :leave] do {:noreply, assign(socket, singleton_locations: gather_singleton_locations())} end def handle_info({ref, change, _group, _pids}, %{assigns: %{pg_ref: ref}} = socket) when is_reference(ref) and change in [:join, :leave] do {:noreply, assign(socket, liveviews: gather_liveviews())} end def handle_info({ref, change, _group, _pids}, %{assigns: %{runner_pg_ref: ref}} = socket) when is_reference(ref) and change in [:join, :leave] do {:noreply, assign(socket, chat_runners: gather_chat_runners())} end defp gather_node_statuses do located = ElixirAi.ClusterSingletonLauncher.singleton_locations() Enum.map([Node.self() | Node.list()], fn n -> status = if Enum.any?(located, fn {_, loc} -> loc == n end), do: :running, else: :not_running {n, status} end) end defp gather_singleton_locations do ElixirAi.ClusterSingletonLauncher.singleton_locations() end # All ChatRunner entries via :pg membership, keyed by conversation name. # Each entry is a {name, node, pid} tuple. defp gather_chat_runners do :pg.which_groups(ElixirAi.RunnerPG) |> Enum.flat_map(fn {:runner, name} -> :pg.get_members(ElixirAi.RunnerPG, {:runner, name}) |> Enum.map(fn pid -> {name, node(pid), pid} end) _ -> [] end) |> Enum.sort_by(&elem(&1, 0)) end # :pg is cluster-wide — one local call returns members from all nodes. # Processes are automatically removed from their group when they die. defp gather_liveviews do :pg.which_groups(ElixirAi.LiveViewPG) |> Enum.flat_map(fn {:liveview, view} -> :pg.get_members(ElixirAi.LiveViewPG, {:liveview, view}) |> Enum.map(fn pid -> {view, node(pid)} end) _ -> [] end) end def render(assigns) do ~H"""
Singletons
Chat Runners {length(node_runners)}
LiveViews
No active processes
<% end %>Nodes, singletons, liveviews & runners all refresh on membership changes.