diff --git a/lib/elixir_ai/ai_controllable.ex b/lib/elixir_ai/ai_tools/ai_controllable.ex similarity index 100% rename from lib/elixir_ai/ai_controllable.ex rename to lib/elixir_ai/ai_tools/ai_controllable.ex diff --git a/lib/elixir_ai/ai_controllable/hook.ex b/lib/elixir_ai/ai_tools/ai_controllable_hook.ex similarity index 100% rename from lib/elixir_ai/ai_controllable/hook.ex rename to lib/elixir_ai/ai_tools/ai_controllable_hook.ex diff --git a/lib/elixir_ai/ai_tools.ex b/lib/elixir_ai/ai_tools/ai_tools.ex similarity index 100% rename from lib/elixir_ai/ai_tools.ex rename to lib/elixir_ai/ai_tools/ai_tools.ex diff --git a/lib/elixir_ai/tool_testing.ex b/lib/elixir_ai/ai_tools/tool_testing.ex similarity index 100% rename from lib/elixir_ai/tool_testing.ex rename to lib/elixir_ai/ai_tools/tool_testing.ex diff --git a/lib/elixir_ai/application.ex b/lib/elixir_ai/application.ex index 194cfaa..db54d3a 100644 --- a/lib/elixir_ai/application.ex +++ b/lib/elixir_ai/application.ex @@ -12,6 +12,8 @@ defmodule ElixirAi.Application do [Application.get_env(:libcluster, :topologies, []), [name: ElixirAi.ClusterSupervisor]]}, {Phoenix.PubSub, name: ElixirAi.PubSub}, {ElixirAi.LiveViewPG, []}, + {ElixirAi.RunnerPG, []}, + {ElixirAi.SingletonPG, []}, {ElixirAi.PageToolsPG, []}, {ElixirAi.AudioProcessingPG, []}, {DynamicSupervisor, name: ElixirAi.AudioWorkerSupervisor, strategy: :one_for_one}, diff --git a/lib/elixir_ai/ai_utils/chat_utils.ex b/lib/elixir_ai/chat_runner/ai_utils/chat_utils.ex similarity index 100% rename from lib/elixir_ai/ai_utils/chat_utils.ex rename to lib/elixir_ai/chat_runner/ai_utils/chat_utils.ex diff --git a/lib/elixir_ai/ai_utils/stream_line_utils.ex b/lib/elixir_ai/chat_runner/ai_utils/stream_line_utils.ex similarity index 100% rename from lib/elixir_ai/ai_utils/stream_line_utils.ex rename to lib/elixir_ai/chat_runner/ai_utils/stream_line_utils.ex diff --git a/lib/elixir_ai/chat_runner/chat_runner.ex b/lib/elixir_ai/chat_runner/chat_runner.ex index e0ce86a..6c7fcb4 100644 --- a/lib/elixir_ai/chat_runner/chat_runner.ex +++ b/lib/elixir_ai/chat_runner/chat_runner.ex @@ -68,6 +68,7 @@ defmodule ElixirAi.ChatRunner do def init(name) do Phoenix.PubSub.subscribe(ElixirAi.PubSub, conversation_message_topic(name)) + :pg.join(ElixirAi.RunnerPG, {:runner, name}, self()) messages = case Conversation.find_id(name) do diff --git a/lib/elixir_ai/chat_runner/runner_pg.ex b/lib/elixir_ai/chat_runner/runner_pg.ex new file mode 100644 index 0000000..dbe3799 --- /dev/null +++ b/lib/elixir_ai/chat_runner/runner_pg.ex @@ -0,0 +1,16 @@ +defmodule ElixirAi.RunnerPG do + @moduledoc """ + Named :pg scope for tracking ChatRunner processes across the cluster. + Each ChatRunner joins {:runner, name} on init; :pg syncs membership + automatically and removes dead processes without any additional cleanup. + """ + + def child_spec(_opts) do + %{ + id: __MODULE__, + start: {:pg, :start_link, [__MODULE__]}, + type: :worker, + restart: :permanent + } + end +end diff --git a/lib/elixir_ai/system_prompts.ex b/lib/elixir_ai/chat_runner/system_prompts.ex similarity index 100% rename from lib/elixir_ai/system_prompts.ex rename to lib/elixir_ai/chat_runner/system_prompts.ex diff --git a/lib/elixir_ai/cluster_singleton.ex b/lib/elixir_ai/cluster_singleton/cluster_singleton.ex similarity index 100% rename from lib/elixir_ai/cluster_singleton.ex rename to lib/elixir_ai/cluster_singleton/cluster_singleton.ex diff --git a/lib/elixir_ai/cluster_singleton/singleton_pg.ex b/lib/elixir_ai/cluster_singleton/singleton_pg.ex new file mode 100644 index 0000000..15e33ee --- /dev/null +++ b/lib/elixir_ai/cluster_singleton/singleton_pg.ex @@ -0,0 +1,16 @@ +defmodule ElixirAi.SingletonPG do + @moduledoc """ + Named :pg scope for tracking cluster singleton processes across the cluster. + Each singleton joins {:singleton, __MODULE__} on init; :pg syncs membership + automatically and removes dead processes without any additional cleanup. + """ + + def child_spec(_opts) do + %{ + id: __MODULE__, + start: {:pg, :start_link, [__MODULE__]}, + type: :worker, + restart: :permanent + } + end +end diff --git a/lib/elixir_ai/conversation_manager.ex b/lib/elixir_ai/conversation_manager.ex index 84860f2..3050635 100644 --- a/lib/elixir_ai/conversation_manager.ex +++ b/lib/elixir_ai/conversation_manager.ex @@ -20,6 +20,7 @@ defmodule ElixirAi.ConversationManager do def init(_) do Logger.info("ConversationManager initializing...") + :pg.join(ElixirAi.SingletonPG, {:singleton, __MODULE__}, self()) send(self(), :load_conversations) {:ok, %{conversations: :loading, subscriptions: MapSet.new(), runners: %{}}} end diff --git a/lib/elixir_ai_web/admin/admin_live.ex b/lib/elixir_ai_web/features/admin/admin_live.ex similarity index 57% rename from lib/elixir_ai_web/admin/admin_live.ex rename to lib/elixir_ai_web/features/admin/admin_live.ex index c8df99a..0cc0061 100644 --- a/lib/elixir_ai_web/admin/admin_live.ex +++ b/lib/elixir_ai_web/features/admin/admin_live.ex @@ -1,108 +1,129 @@ defmodule ElixirAiWeb.AdminLive do + import ElixirAi.PubsubTopics use ElixirAiWeb, :live_view require Logger - @refresh_ms 1_000 - def mount(_params, _session, socket) do - if connected?(socket) do - :net_kernel.monitor_nodes(true) - :pg.join(ElixirAi.LiveViewPG, {:liveview, __MODULE__}, self()) - schedule_refresh() - end + socket = + if connected?(socket) do + :net_kernel.monitor_nodes(true) + # Join before monitoring so our own join doesn't trigger a spurious refresh. + :pg.join(ElixirAi.LiveViewPG, {:liveview, __MODULE__}, self()) + {pg_ref, _} = :pg.monitor_scope(ElixirAi.LiveViewPG) + {runner_pg_ref, _} = :pg.monitor_scope(ElixirAi.RunnerPG) + {singleton_pg_ref, _} = :pg.monitor_scope(ElixirAi.SingletonPG) - {:ok, assign(socket, cluster_info: gather_info())} + socket + |> assign(pg_ref: pg_ref) + |> assign(runner_pg_ref: runner_pg_ref) + |> assign(singleton_pg_ref: singleton_pg_ref) + else + assign(socket, pg_ref: nil, runner_pg_ref: nil, singleton_pg_ref: nil) + end + + {:ok, + socket + |> assign(nodes: gather_node_statuses()) + |> assign(singleton_locations: gather_singleton_locations()) + |> assign(chat_runners: gather_chat_runners()) + |> assign(liveviews: gather_liveviews())} end def handle_info({:nodeup, _node}, socket) do - {:noreply, assign(socket, cluster_info: gather_info())} + {:noreply, assign(socket, nodes: gather_node_statuses())} end def handle_info({:nodedown, _node}, socket) do - {:noreply, assign(socket, cluster_info: gather_info())} + {:noreply, assign(socket, nodes: gather_node_statuses())} end - def handle_info(:refresh, socket) do - schedule_refresh() - {:noreply, assign(socket, cluster_info: gather_info())} + def handle_info(:refresh_singletons, socket) do + {:noreply, assign(socket, singleton_locations: gather_singleton_locations())} end - defp schedule_refresh, do: Process.send_after(self(), :refresh, @refresh_ms) + def handle_info({ref, change, _group, _pids}, %{assigns: %{singleton_pg_ref: ref}} = socket) + when is_reference(ref) and change in [:join, :leave] do + {:noreply, assign(socket, singleton_locations: gather_singleton_locations())} + end - defp gather_info do - import ElixirAi.PubsubTopics + def handle_info({ref, change, _group, _pids}, %{assigns: %{pg_ref: ref}} = socket) + when is_reference(ref) and change in [:join, :leave] do + {:noreply, assign(socket, liveviews: gather_liveviews())} + end + def handle_info({ref, change, _group, _pids}, %{assigns: %{runner_pg_ref: ref}} = socket) + when is_reference(ref) and change in [:join, :leave] do + {:noreply, assign(socket, chat_runners: gather_chat_runners())} + end + + defp gather_node_statuses do all_nodes = [Node.self() | Node.list()] - configured = ElixirAi.ClusterSingleton.configured_singletons() - node_statuses = - Enum.map(all_nodes, fn node -> - status = - if node == Node.self() do - try do - ElixirAi.ClusterSingleton.status() - catch - _, _ -> :unreachable - end - else - case :rpc.call(node, ElixirAi.ClusterSingleton, :status, [], 3_000) do - {:badrpc, _} -> :unreachable - result -> result - end + Enum.map(all_nodes, fn node -> + status = + if node == Node.self() do + try do + ElixirAi.ClusterSingleton.status() + catch + _, _ -> :unreachable end - - {node, status} - end) - - singleton_locations = - Enum.map(configured, fn module -> - location = - case Horde.Registry.lookup(ElixirAi.ChatRegistry, module) do - [{pid, _}] -> node(pid) - _ -> nil + else + case :rpc.call(node, ElixirAi.ClusterSingleton, :status, [], 3_000) do + {:badrpc, _} -> :unreachable + result -> result end + end - {module, location} - end) + {node, status} + end) + end - # All ChatRunner entries in the distributed registry, keyed by conversation name. - # Each entry is a {name, node, pid, supervisor_node} tuple. - chat_runners = - Horde.DynamicSupervisor.which_children(ElixirAi.ChatRunnerSupervisor) + defp gather_singleton_locations do + running = + :pg.which_groups(ElixirAi.SingletonPG) |> Enum.flat_map(fn - {_, pid, _, _} when is_pid(pid) -> - case Horde.Registry.select(ElixirAi.ChatRegistry, [ - {{:"$1", pid, :"$2"}, [], [{{:"$1", pid, :"$2"}}]} - ]) do - [{name, ^pid, _}] when is_binary(name) -> [{name, node(pid), pid}] + {:singleton, module} -> + case :pg.get_members(ElixirAi.SingletonPG, {:singleton, module}) do + [pid | _] -> [{module, node(pid)}] _ -> [] end _ -> [] end) - |> Enum.sort_by(&elem(&1, 0)) + |> Map.new() - # :pg is cluster-wide — one local call returns members from all nodes. - # Processes are automatically removed from their group when they die. - liveviews = - :pg.which_groups(ElixirAi.LiveViewPG) - |> Enum.flat_map(fn - {:liveview, view} -> - :pg.get_members(ElixirAi.LiveViewPG, {:liveview, view}) - |> Enum.map(fn pid -> {view, node(pid)} end) + ElixirAi.ClusterSingleton.configured_singletons() + |> Enum.map(fn module -> {module, Map.get(running, module)} end) + end - _ -> - [] - end) + # All ChatRunner entries via :pg membership, keyed by conversation name. + # Each entry is a {name, node, pid} tuple. + defp gather_chat_runners do + :pg.which_groups(ElixirAi.RunnerPG) + |> Enum.flat_map(fn + {:runner, name} -> + :pg.get_members(ElixirAi.RunnerPG, {:runner, name}) + |> Enum.map(fn pid -> {name, node(pid), pid} end) - %{ - nodes: node_statuses, - configured_singletons: configured, - singleton_locations: singleton_locations, - chat_runners: chat_runners, - liveviews: liveviews - } + _ -> + [] + end) + |> Enum.sort_by(&elem(&1, 0)) + end + + # :pg is cluster-wide — one local call returns members from all nodes. + # Processes are automatically removed from their group when they die. + defp gather_liveviews do + :pg.which_groups(ElixirAi.LiveViewPG) + |> Enum.flat_map(fn + {:liveview, view} -> + :pg.get_members(ElixirAi.LiveViewPG, {:liveview, view}) + |> Enum.map(fn pid -> {view, node(pid)} end) + + _ -> + [] + end) end def render(assigns) do @@ -111,13 +132,13 @@ defmodule ElixirAiWeb.AdminLive do

Cluster Admin

- <%= for {node, status} <- @cluster_info.nodes do %> + <%= for {node, status} <- @nodes do %> <% node_singletons = - Enum.filter(@cluster_info.singleton_locations, fn {_, loc} -> loc == node end) %> + Enum.filter(@singleton_locations, fn {_, loc} -> loc == node end) %> <% node_runners = - Enum.filter(@cluster_info.chat_runners, fn {_, rnode, _} -> rnode == node end) %> + Enum.filter(@chat_runners, fn {_, rnode, _} -> rnode == node end) %> <% node_liveviews = - @cluster_info.liveviews + @liveviews |> Enum.filter(fn {_, n} -> n == node end) |> Enum.group_by(fn {view, _} -> view end) %> @@ -126,7 +147,9 @@ defmodule ElixirAiWeb.AdminLive do
{node} <%= if node == Node.self() do %> - self + + self + <% end %>
<.status_badge status={status} /> @@ -191,7 +214,7 @@ defmodule ElixirAiWeb.AdminLive do
<% unlocated = - Enum.filter(@cluster_info.singleton_locations, fn {_, loc} -> is_nil(loc) end) %> + Enum.filter(@singleton_locations, fn {_, loc} -> is_nil(loc) end) %> <%= if unlocated != [] do %>

@@ -207,7 +230,9 @@ defmodule ElixirAiWeb.AdminLive do

<% end %> -

Refreshes every 1s or on node events.

+

+ Nodes, singletons, liveviews & runners all refresh on membership changes. +

""" end diff --git a/lib/elixir_ai_web/chat/chat_live.ex b/lib/elixir_ai_web/features/chat/chat_live.ex similarity index 100% rename from lib/elixir_ai_web/chat/chat_live.ex rename to lib/elixir_ai_web/features/chat/chat_live.ex diff --git a/lib/elixir_ai_web/chat/chat_message.ex b/lib/elixir_ai_web/features/chat/chat_message.ex similarity index 100% rename from lib/elixir_ai_web/chat/chat_message.ex rename to lib/elixir_ai_web/features/chat/chat_message.ex diff --git a/lib/elixir_ai_web/chat/chat_provider_display.ex b/lib/elixir_ai_web/features/chat/chat_provider_display.ex similarity index 100% rename from lib/elixir_ai_web/chat/chat_provider_display.ex rename to lib/elixir_ai_web/features/chat/chat_provider_display.ex diff --git a/lib/elixir_ai_web/chat/json_display.ex b/lib/elixir_ai_web/features/chat/json_display.ex similarity index 100% rename from lib/elixir_ai_web/chat/json_display.ex rename to lib/elixir_ai_web/features/chat/json_display.ex diff --git a/lib/elixir_ai/chat_runner/stream_handler.ex b/lib/elixir_ai_web/features/chat/stream_handler.ex similarity index 100% rename from lib/elixir_ai/chat_runner/stream_handler.ex rename to lib/elixir_ai_web/features/chat/stream_handler.ex diff --git a/lib/elixir_ai_web/home/ai_providers_live.ex b/lib/elixir_ai_web/features/home/ai_providers_live.ex similarity index 100% rename from lib/elixir_ai_web/home/ai_providers_live.ex rename to lib/elixir_ai_web/features/home/ai_providers_live.ex diff --git a/lib/elixir_ai_web/home/home_live.ex b/lib/elixir_ai_web/features/home/home_live.ex similarity index 100% rename from lib/elixir_ai_web/home/home_live.ex rename to lib/elixir_ai_web/features/home/home_live.ex diff --git a/lib/elixir_ai_web/voice/recording.ex b/lib/elixir_ai_web/features/voice/recording.ex similarity index 100% rename from lib/elixir_ai_web/voice/recording.ex rename to lib/elixir_ai_web/features/voice/recording.ex diff --git a/lib/elixir_ai_web/voice/voice_conversation.ex b/lib/elixir_ai_web/features/voice/voice_conversation.ex similarity index 100% rename from lib/elixir_ai_web/voice/voice_conversation.ex rename to lib/elixir_ai_web/features/voice/voice_conversation.ex diff --git a/lib/elixir_ai_web/voice/voice_live.ex b/lib/elixir_ai_web/features/voice/voice_live.ex similarity index 100% rename from lib/elixir_ai_web/voice/voice_live.ex rename to lib/elixir_ai_web/features/voice/voice_live.ex diff --git a/lib/elixir_ai_web/plugs/voice_session_id.ex b/lib/elixir_ai_web/features/voice/voice_session_id_plug.ex similarity index 100% rename from lib/elixir_ai_web/plugs/voice_session_id.ex rename to lib/elixir_ai_web/features/voice/voice_session_id_plug.ex diff --git a/lib/elixir_ai_web/components/form_components.ex b/lib/elixir_ai_web/utils/form_components.ex similarity index 100% rename from lib/elixir_ai_web/components/form_components.ex rename to lib/elixir_ai_web/utils/form_components.ex diff --git a/lib/elixir_ai_web/components/layouts.ex b/lib/elixir_ai_web/utils/layouts.ex similarity index 100% rename from lib/elixir_ai_web/components/layouts.ex rename to lib/elixir_ai_web/utils/layouts.ex diff --git a/lib/elixir_ai_web/components/layouts/app.html.heex b/lib/elixir_ai_web/utils/layouts/app.html.heex similarity index 100% rename from lib/elixir_ai_web/components/layouts/app.html.heex rename to lib/elixir_ai_web/utils/layouts/app.html.heex diff --git a/lib/elixir_ai_web/components/layouts/root.html.heex b/lib/elixir_ai_web/utils/layouts/root.html.heex similarity index 100% rename from lib/elixir_ai_web/components/layouts/root.html.heex rename to lib/elixir_ai_web/utils/layouts/root.html.heex diff --git a/lib/elixir_ai/live_view_pg.ex b/lib/elixir_ai_web/utils/live_view_pg.ex similarity index 100% rename from lib/elixir_ai/live_view_pg.ex rename to lib/elixir_ai_web/utils/live_view_pg.ex diff --git a/lib/elixir_ai/page_tools_pg.ex b/lib/elixir_ai_web/utils/page_tools_pg.ex similarity index 100% rename from lib/elixir_ai/page_tools_pg.ex rename to lib/elixir_ai_web/utils/page_tools_pg.ex diff --git a/lib/elixir_ai_web/components/spinner.ex b/lib/elixir_ai_web/utils/spinner.ex similarity index 100% rename from lib/elixir_ai_web/components/spinner.ex rename to lib/elixir_ai_web/utils/spinner.ex