From 62f16b2bdecf0076aaf1356b83959ed70078fa7b Mon Sep 17 00:00:00 2001 From: Alex Mickelson Date: Wed, 25 Mar 2026 15:13:43 -0600 Subject: [PATCH] udpates --- lib/elixir_ai/ai_tools/ai_tools.ex | 21 ++- lib/elixir_ai/application.ex | 73 ++++----- .../cluster_singleton/cluster_singleton.ex | 69 -------- .../cluster_singleton_launcher.ex | 78 ++++++++++ lib/elixir_ai/conversation_manager.ex | 147 ++++++++++++++++-- .../features/admin/admin_live.ex | 36 +---- .../features/voice/voice_live.ex | 73 +++++---- 7 files changed, 318 insertions(+), 179 deletions(-) delete mode 100644 lib/elixir_ai/cluster_singleton/cluster_singleton.ex create mode 100644 lib/elixir_ai/cluster_singleton/cluster_singleton_launcher.ex diff --git a/lib/elixir_ai/ai_tools/ai_tools.ex b/lib/elixir_ai/ai_tools/ai_tools.ex index 93c7571..03c357d 100644 --- a/lib/elixir_ai/ai_tools/ai_tools.ex +++ b/lib/elixir_ai/ai_tools/ai_tools.ex @@ -18,7 +18,7 @@ defmodule ElixirAi.AiTools do import ElixirAi.ChatUtils, only: [ai_tool: 1] - @server_tool_names ["store_thing", "read_thing"] + @server_tool_names ["store_thing", "read_thing", "list_conversations"] @liveview_tool_names ["set_background_color", "navigate_to"] @all_tool_names @server_tool_names ++ @liveview_tool_names @@ -29,7 +29,7 @@ defmodule ElixirAi.AiTools do def all_tool_names, do: @all_tool_names def build_server_tools(server, allowed_names) do - [store_thing(server), read_thing(server)] + [store_thing(server), read_thing(server), list_conversations(server)] |> Enum.filter(&(&1.name in allowed_names)) end @@ -67,6 +67,23 @@ defmodule ElixirAi.AiTools do ) end + def list_conversations(server) do + ai_tool( + name: "list_conversations", + description: """ + Returns a list of all conversation names in the application. + Always call this tool before navigating to a conversation page (e.g. /chat/:name) + to ensure the conversation exists and to obtain the exact name to use in the path. + """, + function: fn _args -> + names = ElixirAi.ConversationManager.list_conversations() + {:ok, names} + end, + parameters: %{"type" => "object", "properties" => %{}}, + server: server + ) + end + # --------------------------------------------------------------------------- # LiveView tools # --------------------------------------------------------------------------- diff --git a/lib/elixir_ai/application.ex b/lib/elixir_ai/application.ex index db54d3a..392f4cb 100644 --- a/lib/elixir_ai/application.ex +++ b/lib/elixir_ai/application.ex @@ -3,39 +3,40 @@ defmodule ElixirAi.Application do use Application def start(_type, _args) do - children = [ - ElixirAiWeb.Telemetry, - # Conditionally start Repo (skip in test environment) - repo_child_spec(), - default_provider_task(), - {Cluster.Supervisor, - [Application.get_env(:libcluster, :topologies, []), [name: ElixirAi.ClusterSupervisor]]}, - {Phoenix.PubSub, name: ElixirAi.PubSub}, - {ElixirAi.LiveViewPG, []}, - {ElixirAi.RunnerPG, []}, - {ElixirAi.SingletonPG, []}, - {ElixirAi.PageToolsPG, []}, - {ElixirAi.AudioProcessingPG, []}, - {DynamicSupervisor, name: ElixirAi.AudioWorkerSupervisor, strategy: :one_for_one}, - ElixirAi.ToolTesting, - ElixirAiWeb.Endpoint, - {Horde.Registry, - [ - name: ElixirAi.ChatRegistry, - keys: :unique, - members: :auto, - delta_crdt_options: [sync_interval: 100] - ]}, - {Horde.DynamicSupervisor, - [ - name: ElixirAi.ChatRunnerSupervisor, - strategy: :one_for_one, - members: :auto, - delta_crdt_options: [sync_interval: 100], - process_redistribution: :active - ]}, - cluster_singleton_child_spec() - ] + children = + [ + ElixirAiWeb.Telemetry, + # Conditionally start Repo (skip in test environment) + repo_child_spec(), + default_provider_task(), + {Cluster.Supervisor, + [Application.get_env(:libcluster, :topologies, []), [name: ElixirAi.ClusterSupervisor]]}, + {Phoenix.PubSub, name: ElixirAi.PubSub}, + {ElixirAi.LiveViewPG, []}, + {ElixirAi.RunnerPG, []}, + {ElixirAi.SingletonPG, []}, + {ElixirAi.PageToolsPG, []}, + {ElixirAi.AudioProcessingPG, []}, + {DynamicSupervisor, name: ElixirAi.AudioWorkerSupervisor, strategy: :one_for_one}, + ElixirAi.ToolTesting, + ElixirAiWeb.Endpoint, + {Horde.Registry, + [ + name: ElixirAi.ChatRegistry, + keys: :unique, + members: :auto, + delta_crdt_options: [sync_interval: 100] + ]}, + {Horde.DynamicSupervisor, + [ + name: ElixirAi.ChatRunnerSupervisor, + strategy: :one_for_one, + members: :auto, + delta_crdt_options: [sync_interval: 100], + process_redistribution: :active + ]}, + cluster_singleton_child_spec(ElixirAi.ConversationManager) + ] opts = [strategy: :one_for_one, name: ElixirAi.Supervisor] Supervisor.start_link(children, opts) @@ -63,11 +64,11 @@ defmodule ElixirAi.Application do end end - defp cluster_singleton_child_spec do + defp cluster_singleton_child_spec(module) do if Application.get_env(:elixir_ai, :env) == :test do - Supervisor.child_spec({Task, fn -> :ok end}, id: :skip_cluster_singleton) + Supervisor.child_spec({Task, fn -> :ok end}, id: {:skip_cluster_singleton, module}) else - ElixirAi.ClusterSingleton + {ElixirAi.ClusterSingletonLauncher, module: module} end end end diff --git a/lib/elixir_ai/cluster_singleton/cluster_singleton.ex b/lib/elixir_ai/cluster_singleton/cluster_singleton.ex deleted file mode 100644 index 874413b..0000000 --- a/lib/elixir_ai/cluster_singleton/cluster_singleton.ex +++ /dev/null @@ -1,69 +0,0 @@ -defmodule ElixirAi.ClusterSingleton do - use GenServer - require Logger - - @sync_delay_ms 200 - @retry_delay_ms 500 - - @singletons [ElixirAi.ConversationManager] - - def start_link(opts), do: GenServer.start_link(__MODULE__, opts, name: __MODULE__) - - def status, do: GenServer.call(__MODULE__, :status) - - def configured_singletons, do: @singletons - - def init(_opts) do - Process.send_after(self(), :start_singletons, @sync_delay_ms) - {:ok, :pending} - end - - def handle_info(:start_singletons, state) do - if Node.list() == [] do - Logger.debug("ClusterSingleton: no peer nodes yet, retrying in #{@retry_delay_ms}ms") - Process.send_after(self(), :start_singletons, @retry_delay_ms) - {:noreply, state} - else - start_singletons() - {:noreply, :started} - end - end - - defp start_singletons do - for module <- @singletons do - if singleton_exists?(module) do - Logger.debug( - "ClusterSingleton: singleton already exists, skipping start for #{inspect(module)}" - ) - else - case Horde.DynamicSupervisor.start_child(ElixirAi.ChatRunnerSupervisor, module) do - {:ok, _pid} -> - :ok - - {:error, {:already_started, _pid}} -> - :ok - - {:error, :already_present} -> - :ok - - {:error, reason} -> - Logger.warning( - "ClusterSingleton: failed to start #{inspect(module)}: #{inspect(reason)}" - ) - end - end - end - end - - def handle_call(:status, _from, state), do: {:reply, state, state} - - defp singleton_exists?(module) do - case Horde.Registry.lookup(ElixirAi.ChatRegistry, module) do - [{pid, _metadata} | _] when is_pid(pid) -> - true - - _ -> - false - end - end -end diff --git a/lib/elixir_ai/cluster_singleton/cluster_singleton_launcher.ex b/lib/elixir_ai/cluster_singleton/cluster_singleton_launcher.ex new file mode 100644 index 0000000..5d807fe --- /dev/null +++ b/lib/elixir_ai/cluster_singleton/cluster_singleton_launcher.ex @@ -0,0 +1,78 @@ +defmodule ElixirAi.ClusterSingletonLauncher do + require Logger + + @retry_delay_ms 500 + + def start_link(opts) do + Task.start_link(fn -> run(opts) end) + end + + def child_spec(opts) do + %{ + id: {__MODULE__, Keyword.fetch!(opts, :module)}, + start: {__MODULE__, :start_link, [opts]}, + restart: :transient + } + end + + # Returns [{module, node_or_nil}] for all configured singletons. + # node_or_nil is nil when the singleton is not currently running anywhere. + def singleton_locations do + [ElixirAi.ConversationManager] + |> Enum.map(fn module -> + node = + case :pg.get_members(ElixirAi.SingletonPG, {:singleton, module}) do + [pid | _] -> node(pid) + _ -> nil + end + + {module, node} + end) + end + + defp run(opts) do + module = Keyword.fetch!(opts, :module) + + if Node.list() == [] do + Logger.debug( + "ClusterSingletonLauncher: no peer nodes yet, retrying in #{@retry_delay_ms}ms" + ) + + Process.sleep(@retry_delay_ms) + run(opts) + else + launch(module) + end + end + + defp launch(module) do + if singleton_exists?(module) do + Logger.debug( + "ClusterSingletonLauncher: singleton already exists, skipping start for #{inspect(module)}" + ) + else + case Horde.DynamicSupervisor.start_child(ElixirAi.ChatRunnerSupervisor, module) do + {:ok, _pid} -> + :ok + + {:error, {:already_started, _pid}} -> + :ok + + {:error, :already_present} -> + :ok + + {:error, reason} -> + Logger.warning( + "ClusterSingletonLauncher: failed to start #{inspect(module)}: #{inspect(reason)}" + ) + end + end + end + + defp singleton_exists?(module) do + case Horde.Registry.lookup(ElixirAi.ChatRegistry, module) do + [{pid, _metadata} | _] when is_pid(pid) -> true + _ -> false + end + end +end diff --git a/lib/elixir_ai/conversation_manager.ex b/lib/elixir_ai/conversation_manager.ex index 3050635..c6668c6 100644 --- a/lib/elixir_ai/conversation_manager.ex +++ b/lib/elixir_ai/conversation_manager.ex @@ -21,6 +21,8 @@ defmodule ElixirAi.ConversationManager do def init(_) do Logger.info("ConversationManager initializing...") :pg.join(ElixirAi.SingletonPG, {:singleton, __MODULE__}, self()) + # Mitigation 4: receive :nodedown when a cluster peer disappears (sleep/wake, crash) + :net_kernel.monitor_nodes(true) send(self(), :load_conversations) {:ok, %{conversations: :loading, subscriptions: MapSet.new(), runners: %{}}} end @@ -102,14 +104,57 @@ defmodule ElixirAi.ConversationManager do end def handle_info({:DOWN, _ref, :process, pid, reason}, %{runners: runners} = state) do - runners = - Enum.reject(runners, fn {_name, info} -> info.pid == pid end) + # Find the name before removing so we can check for a Horde redistribution replacement + {name, _} = Enum.find(runners, {nil, nil}, fn {_n, info} -> info.pid == pid end) + + new_runners = + runners + |> Enum.reject(fn {_n, info} -> info.pid == pid end) |> Map.new() Logger.info("ConversationManager: runner #{inspect(pid)} went down (#{inspect(reason)})") - {:noreply, %{state | runners: runners}} + + # Mitigation 2: Horde may have already restarted the runner on another node; re-monitor it + new_runners = + if name do + case :pg.get_members(ElixirAi.RunnerPG, {:runner, name}) do + [new_pid | _] when new_pid != pid -> + Logger.info( + "ConversationManager: re-monitoring redistributed runner for #{name} at #{inspect(new_pid)}" + ) + + Process.monitor(new_pid) + Map.put(new_runners, name, %{pid: new_pid, node: node(new_pid)}) + + _ -> + new_runners + end + else + new_runners + end + + {:noreply, %{state | runners: new_runners}} end + # Mitigation 4: node went down — evict all cached runners on that node immediately, + # before the individual :DOWN messages for each pid arrive. + def handle_info({:nodedown, down_node}, %{runners: runners} = state) do + stale = Enum.filter(runners, fn {_name, info} -> info.node == down_node end) + + if stale != [] do + names = Enum.map(stale, &elem(&1, 0)) + + Logger.info( + "ConversationManager: node #{down_node} down, clearing stale runners: #{inspect(names)}" + ) + end + + new_runners = Map.reject(runners, fn {_name, info} -> info.node == down_node end) + {:noreply, %{state | runners: new_runners}} + end + + def handle_info({:nodeup, _node}, state), do: {:noreply, state} + def handle_info({:error, {:db_error, reason}}, state) do Logger.error("ConversationManager received db_error: #{inspect(reason)}") {:noreply, state} @@ -142,7 +187,33 @@ defmodule ElixirAi.ConversationManager do conversations = Map.new(conversation_list, fn %{name: name} -> {name, []} end) Logger.info("Conversation map keys: #{inspect(Map.keys(conversations))}") - {:noreply, %{state | conversations: conversations}} + + # Mitigation 3: after a ConversationManager restart, re-establish monitors for any + # ChatRunners that are still alive in Horde — they carry on running but we lost + # all monitor refs when this process restarted. + runners = + :pg.which_groups(ElixirAi.RunnerPG) + |> Enum.flat_map(fn + {:runner, name} -> + case :pg.get_members(ElixirAi.RunnerPG, {:runner, name}) do + [pid | _] -> + Process.monitor(pid) + [{name, %{pid: pid, node: node(pid)}}] + + _ -> + [] + end + + _ -> + [] + end) + |> Map.new() + + Logger.info( + "ConversationManager: re-established monitors for #{map_size(runners)} live runners" + ) + + {:noreply, %{state | conversations: conversations, runners: runners}} end def handle_info({:retry_call, message, from}, state) do @@ -195,9 +266,32 @@ defmodule ElixirAi.ConversationManager do ElixirAi.ChatRunnerSupervisor, {ElixirAi.ChatRunner, name: name} ) do - {:ok, pid} -> {:ok, pid} - {:error, {:already_started, pid}} -> {:ok, pid} - error -> error + {:ok, pid} -> + {:ok, pid} + + {:error, {:already_started, pid}} -> + # Mitigation 6: the returned pid may be on a node that just went down but whose + # :DOWN message hasn't been processed yet; verify the node is still reachable. + if node_alive?(node(pid)) do + {:ok, pid} + else + # Node is gone; Horde will redistribute — wait briefly for the new registration. + case registry_lookup_with_retry(name) do + nil -> {:error, :runner_unavailable} + new_pid -> {:ok, new_pid} + end + end + + # Mitigation 1: :already_present means Horde knows the child spec but the process + # is mid-redistribution and not yet registered. Retry the registry until it appears. + {:error, :already_present} -> + case registry_lookup_with_retry(name) do + nil -> {:error, :runner_unavailable} + pid -> {:ok, pid} + end + + error -> + error end case result do @@ -210,14 +304,21 @@ defmodule ElixirAi.ConversationManager do MapSet.put(state.subscriptions, name) end - existing_runners = Map.get(state, :runners, %{}) - new_runners = - if Map.has_key?(existing_runners, name) do - existing_runners - else - Process.monitor(pid) - Map.put(existing_runners, name, %{pid: pid, node: node(pid)}) + case Map.get(state.runners, name) do + nil -> + Process.monitor(pid) + Map.put(state.runners, name, %{pid: pid, node: node(pid)}) + + %{pid: ^pid} -> + # Same pid — nothing to update + state.runners + + %{pid: old_pid} -> + # Pid changed (redistribution raced ahead of :DOWN) — swap the monitor + Process.demonitor(old_pid, [:flush]) + Process.monitor(pid) + Map.put(state.runners, name, %{pid: pid, node: node(pid)}) end {:ok, pid, new_subscriptions, new_runners} @@ -226,4 +327,22 @@ defmodule ElixirAi.ConversationManager do error end end + + # Mitigation 5: Horde registry syncs via delta-CRDT with up to ~100ms lag after a + # process moves nodes. Retry with exponential backoff before concluding it doesn't exist. + defp registry_lookup_with_retry(name, retries \\ 3, delay_ms \\ 50) + defp registry_lookup_with_retry(_name, 0, _delay_ms), do: nil + + defp registry_lookup_with_retry(name, retries, delay_ms) do + case Horde.Registry.lookup(ElixirAi.ChatRegistry, name) do + [{pid, _} | _] when is_pid(pid) -> + pid + + _ -> + Process.sleep(delay_ms) + registry_lookup_with_retry(name, retries - 1, delay_ms * 2) + end + end + + defp node_alive?(n), do: n == Node.self() or n in Node.list() end diff --git a/lib/elixir_ai_web/features/admin/admin_live.ex b/lib/elixir_ai_web/features/admin/admin_live.ex index 0cc0061..548193a 100644 --- a/lib/elixir_ai_web/features/admin/admin_live.ex +++ b/lib/elixir_ai_web/features/admin/admin_live.ex @@ -57,44 +57,18 @@ defmodule ElixirAiWeb.AdminLive do end defp gather_node_statuses do - all_nodes = [Node.self() | Node.list()] + located = ElixirAi.ClusterSingletonLauncher.singleton_locations() - Enum.map(all_nodes, fn node -> + Enum.map([Node.self() | Node.list()], fn n -> status = - if node == Node.self() do - try do - ElixirAi.ClusterSingleton.status() - catch - _, _ -> :unreachable - end - else - case :rpc.call(node, ElixirAi.ClusterSingleton, :status, [], 3_000) do - {:badrpc, _} -> :unreachable - result -> result - end - end + if Enum.any?(located, fn {_, loc} -> loc == n end), do: :running, else: :not_running - {node, status} + {n, status} end) end defp gather_singleton_locations do - running = - :pg.which_groups(ElixirAi.SingletonPG) - |> Enum.flat_map(fn - {:singleton, module} -> - case :pg.get_members(ElixirAi.SingletonPG, {:singleton, module}) do - [pid | _] -> [{module, node(pid)}] - _ -> [] - end - - _ -> - [] - end) - |> Map.new() - - ElixirAi.ClusterSingleton.configured_singletons() - |> Enum.map(fn module -> {module, Map.get(running, module)} end) + ElixirAi.ClusterSingletonLauncher.singleton_locations() end # All ChatRunner entries via :pg membership, keyed by conversation name. diff --git a/lib/elixir_ai_web/features/voice/voice_live.ex b/lib/elixir_ai_web/features/voice/voice_live.ex index 471aa3f..436f5f7 100644 --- a/lib/elixir_ai_web/features/voice/voice_live.ex +++ b/lib/elixir_ai_web/features/voice/voice_live.ex @@ -278,43 +278,60 @@ defmodule ElixirAiWeb.VoiceLive do # --- Private helpers --- defp start_voice_conversation(socket, transcription) do - name = "voice-#{System.system_time(:second)}" + existing_name = socket.assigns.conversation_name - case AiProvider.find_by_name("default") do - {:ok, provider} -> - case ConversationManager.create_conversation(name, provider.id, "voice") do - {:ok, _pid} -> - case ConversationManager.open_conversation(name) do - {:ok, conv} -> - connect_and_send(socket, name, conv, transcription) + if existing_name do + # Reuse the existing conversation — just re-open to get a fresh runner pid + case ConversationManager.open_conversation(existing_name) do + {:ok, conv} -> + connect_and_send(socket, existing_name, conv, transcription) - {:error, reason} -> - assign(socket, - state: :transcribed, - ai_error: "Failed to open voice conversation: #{inspect(reason)}" - ) - end + {:error, reason} -> + assign(socket, + state: :transcribed, + ai_error: "Failed to reopen voice conversation: #{inspect(reason)}" + ) + end + else + name = "voice-#{System.system_time(:second)}" - {:error, reason} -> - assign(socket, - state: :transcribed, - ai_error: "Failed to create voice conversation: #{inspect(reason)}" - ) - end + case AiProvider.find_by_name("default") do + {:ok, provider} -> + case ConversationManager.create_conversation(name, provider.id, "voice") do + {:ok, _pid} -> + case ConversationManager.open_conversation(name) do + {:ok, conv} -> + connect_and_send(socket, name, conv, transcription) - {:error, reason} -> - assign(socket, - state: :transcribed, - ai_error: "No default AI provider found: #{inspect(reason)}" - ) + {:error, reason} -> + assign(socket, + state: :transcribed, + ai_error: "Failed to open voice conversation: #{inspect(reason)}" + ) + end + + {:error, reason} -> + assign(socket, + state: :transcribed, + ai_error: "Failed to create voice conversation: #{inspect(reason)}" + ) + end + + {:error, reason} -> + assign(socket, + state: :transcribed, + ai_error: "No default AI provider found: #{inspect(reason)}" + ) + end end end defp connect_and_send(socket, name, conversation, transcription) do runner_pid = Map.get(conversation, :runner_pid) + already_connected = socket.assigns.conversation_name == name try do - if connected?(socket) do + if connected?(socket) and not already_connected do Phoenix.PubSub.subscribe(ElixirAi.PubSub, chat_topic(name)) if runner_pid, @@ -325,7 +342,9 @@ defmodule ElixirAiWeb.VoiceLive do page_tools = discover_and_build_page_tools(socket, runner_pid) if page_tools != [] do - ChatRunner.register_page_tools(name, page_tools) + # Use the direct pid rather than the registry name to avoid + # Horde delta-CRDT sync lag on freshly-created processes. + GenServer.call(runner_pid, {:session, {:register_page_tools, page_tools}}) end end