diff --git a/.gitignore b/.gitignore index 2496090..4c52eec 100644 --- a/.gitignore +++ b/.gitignore @@ -38,4 +38,6 @@ npm-debug.log elixir_ls/ .env -*.tmp \ No newline at end of file +*.tmp + +providers.yml \ No newline at end of file diff --git a/config/config.exs b/config/config.exs index 98ba630..eeb77cd 100644 --- a/config/config.exs +++ b/config/config.exs @@ -1,17 +1,10 @@ -# This file is responsible for configuring your application -# and its dependencies with the aid of the Config module. -# -# This configuration file is loaded before any dependency and -# is restricted to this project. - -# General application configuration +# General config, overriden by other files in this directory. import Config config :elixir_ai, ecto_repos: [ElixirAi.Repo], generators: [timestamp_type: :utc_datetime] -# Configures the endpoint config :elixir_ai, ElixirAiWeb.Endpoint, url: [host: "localhost"], adapter: Bandit.PhoenixAdapter, @@ -22,7 +15,6 @@ config :elixir_ai, ElixirAiWeb.Endpoint, pubsub_server: ElixirAi.PubSub, live_view: [signing_salt: "4UG1IVt+"] -# Configure esbuild (the version is required) config :esbuild, version: "0.17.11", elixir_ai: [ @@ -32,7 +24,6 @@ config :esbuild, env: %{"NODE_PATH" => Path.expand("../deps", __DIR__)} ] -# Configure tailwind (the version is required) config :tailwind, version: "4.0.9", elixir_ai: [ @@ -43,17 +34,12 @@ config :tailwind, cd: Path.expand("../assets", __DIR__) ] -# Configures Elixir's Logger config :logger, :console, format: "$time $metadata[$level] $message\n", metadata: [:request_id] -# Use Jason for JSON parsing in Phoenix config :phoenix, :json_library, Jason -# Lower the BEAM node-down detection window from the default 60s. -# Nodes send ticks every (net_ticktime / 4)s; a node is declared down -# after 4 missed ticks (net_ticktime total). 5s means detection in ≤5s. if System.get_env("RELEASE_MODE") do config :kernel, net_ticktime: 2 end @@ -67,6 +53,4 @@ config :libcluster, ] ] -# Import environment specific config. This must remain at the bottom -# of this file so it overrides the configuration defined above. import_config "#{config_env()}.exs" diff --git a/config/runtime.exs b/config/runtime.exs index c837ed5..428ba83 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -4,9 +4,9 @@ import Dotenvy source!([".env", System.get_env()]) config :elixir_ai, - ai_endpoint: env!("AI_RESPONSES_ENDPOINT", :string!), - ai_token: env!("AI_TOKEN", :string!), - ai_model: env!("AI_MODEL", :string!) + ai_endpoint: System.get_env("AI_RESPONSES_ENDPOINT"), + ai_token: System.get_env("AI_TOKEN"), + ai_model: System.get_env("AI_MODEL") # config/runtime.exs is executed for all environments, including # during releases. It is executed after compilation and before the @@ -72,7 +72,7 @@ if config_env() == :prod do ] end - host = System.get_env("PHX_HOST") || "example.com" + host = System.get_env("PHX_HOST") || raise "environment variable PHX_HOST is missing." port = String.to_integer(System.get_env("PORT") || "4000") config :elixir_ai, :dns_cluster_query, System.get_env("DNS_CLUSTER_QUERY") @@ -88,36 +88,4 @@ if config_env() == :prod do port: port ], secret_key_base: secret_key_base - - # ## SSL Support - # - # To get SSL working, you will need to add the `https` key - # to your endpoint configuration: - # - # config :elixir_ai, ElixirAiWeb.Endpoint, - # https: [ - # ..., - # port: 443, - # cipher_suite: :strong, - # keyfile: System.get_env("SOME_APP_SSL_KEY_PATH"), - # certfile: System.get_env("SOME_APP_SSL_CERT_PATH") - # ] - # - # The `cipher_suite` is set to `:strong` to support only the - # latest and more secure SSL ciphers. This means old browsers - # and clients may not be supported. You can set it to - # `:compatible` for wider support. - # - # `:keyfile` and `:certfile` expect an absolute path to the key - # and cert in disk or a relative path inside priv, for example - # "priv/ssl/server.key". For all supported SSL configuration - # options, see https://hexdocs.pm/plug/Plug.SSL.html#configure/1 - # - # We also recommend setting `force_ssl` in your config/prod.exs, - # ensuring no data is ever sent via http, always redirecting to https: - # - # config :elixir_ai, ElixirAiWeb.Endpoint, - # force_ssl: [hsts: true] - # - # Check `Plug.SSL` for all available options in `force_ssl`. end diff --git a/docker-compose.yml b/docker-compose.yml index 002ec69..aa69d4b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -31,6 +31,7 @@ services: RELEASE_NODE: elixir_ai@node1 RELEASE_COOKIE: secret_cluster_cookie SECRET_KEY_BASE: F1nY5uSyD0HfoWejcuuQiaQoMQrjrlFigb3bJ7p4hTXwpTza6sPLpmd+jLS7p0Sh + PROVIDERS_CONFIG_PATH: /app/providers.yml user: root command: | sh -c ' @@ -41,6 +42,7 @@ services: volumes: - .:/app - /app/_build + - ./providers.yml:/app/providers.yml:ro ports: - "4001:4000" depends_on: @@ -68,6 +70,7 @@ services: RELEASE_NODE: elixir_ai@node2 RELEASE_COOKIE: secret_cluster_cookie SECRET_KEY_BASE: F1nY5uSyD0HfoWejcuuQiaQoMQrjrlFigb3bJ7p4hTXwpTza6sPLpmd+jLS7p0Sh + PROVIDERS_CONFIG_PATH: /app/providers.yml user: root command: | sh -c ' @@ -78,6 +81,7 @@ services: volumes: - .:/app - /app/_build + - ./providers.yml:/app/providers.yml:ro ports: - "4002:4000" depends_on: diff --git a/example.providers.yml b/example.providers.yml new file mode 100644 index 0000000..e717661 --- /dev/null +++ b/example.providers.yml @@ -0,0 +1,9 @@ +providers: +- name: provider name + model: gpt-oss-20b + responses_endpoint: http://example.com/api/responses + api_key: your_api_key_here +- name: provider name 2 + model: gpt-oss-20b + responses_endpoint: http://example.com/api/responses + api_key: your_api_key_here \ No newline at end of file diff --git a/kubernetes/statefulset.yml b/kubernetes/statefulset.yml index 3b4c26e..ab2442d 100644 --- a/kubernetes/statefulset.yml +++ b/kubernetes/statefulset.yml @@ -4,7 +4,7 @@ metadata: name: ai-ha-elixir namespace: ai-ha-elixir spec: - serviceName: ai-ha-elixir-headless + serviceName: ai-ha-elixir-headless # replica1.ai-ha-elixir-headless.svc.cluster.local replicas: 2 podManagementPolicy: Parallel updateStrategy: diff --git a/lib/elixir_ai/ai_utils/chat_utils.ex b/lib/elixir_ai/ai_utils/chat_utils.ex index 31ed406..30652e2 100644 --- a/lib/elixir_ai/ai_utils/chat_utils.ex +++ b/lib/elixir_ai/ai_utils/chat_utils.ex @@ -27,9 +27,16 @@ defmodule ElixirAi.ChatUtils do } run_function = fn current_message_id, tool_call_id, args -> - Task.start(fn -> - result = function.(args) - send(server, {:tool_response, current_message_id, tool_call_id, result}) + Task.start_link(fn -> + try do + result = function.(args) + send(server, {:tool_response, current_message_id, tool_call_id, result}) + rescue + e -> + reason = Exception.format(:error, e, __STACKTRACE__) + Logger.error("Tool task crashed: #{reason}") + send(server, {:tool_response, current_message_id, tool_call_id, {:error, reason}}) + end end) end @@ -41,7 +48,7 @@ defmodule ElixirAi.ChatUtils do end def request_ai_response(server, messages, tools, provider) do - Task.start(fn -> + Task.start_link(fn -> api_url = provider.completions_url api_key = provider.api_token model = provider.model_name @@ -82,7 +89,8 @@ defmodule ElixirAi.ChatUtils do :ok {:error, reason} -> - IO.warn("AI request failed: #{inspect(reason)} for #{api_url}") + Logger.warning("AI request failed: #{inspect(reason)} for #{api_url}") + send(server, {:ai_request_error, reason}) end end) end diff --git a/lib/elixir_ai/application.ex b/lib/elixir_ai/application.ex index 4981f57..b6ceae3 100644 --- a/lib/elixir_ai/application.ex +++ b/lib/elixir_ai/application.ex @@ -12,6 +12,7 @@ defmodule ElixirAi.Application do {Cluster.Supervisor, [Application.get_env(:libcluster, :topologies, []), [name: ElixirAi.ClusterSupervisor]]}, {Phoenix.PubSub, name: ElixirAi.PubSub}, + {ElixirAi.LiveViewPG, []}, ElixirAi.ToolTesting, ElixirAiWeb.Endpoint, {Horde.Registry, @@ -55,7 +56,7 @@ defmodule ElixirAi.Application do if Application.get_env(:elixir_ai, :env) == :test do Supervisor.child_spec({Task, fn -> :ok end}, id: :skip_default_provider) else - {Task, fn -> ElixirAi.AiProvider.ensure_default_provider() end} + {Task, fn -> ElixirAi.AiProvider.ensure_configured_providers() end} end end diff --git a/lib/elixir_ai/chat_runner.ex b/lib/elixir_ai/chat_runner.ex index 82ed7b5..d133cbb 100644 --- a/lib/elixir_ai/chat_runner.ex +++ b/lib/elixir_ai/chat_runner.ex @@ -49,6 +49,7 @@ defmodule ElixirAi.ChatRunner do "Last message role was #{last_message.role}, requesting AI response for conversation #{name}" ) + broadcast_ui(name, :recovery_restart) ElixirAi.ChatUtils.request_ai_response(self(), messages, tools(self(), name), provider) end diff --git a/lib/elixir_ai/cluster_singleton.ex b/lib/elixir_ai/cluster_singleton.ex index e8c2b45..874413b 100644 --- a/lib/elixir_ai/cluster_singleton.ex +++ b/lib/elixir_ai/cluster_singleton.ex @@ -3,17 +3,33 @@ defmodule ElixirAi.ClusterSingleton do require Logger @sync_delay_ms 200 + @retry_delay_ms 500 @singletons [ElixirAi.ConversationManager] def start_link(opts), do: GenServer.start_link(__MODULE__, opts, name: __MODULE__) + def status, do: GenServer.call(__MODULE__, :status) + + def configured_singletons, do: @singletons + def init(_opts) do Process.send_after(self(), :start_singletons, @sync_delay_ms) {:ok, :pending} end - def handle_info(:start_singletons, _state) do + def handle_info(:start_singletons, state) do + if Node.list() == [] do + Logger.debug("ClusterSingleton: no peer nodes yet, retrying in #{@retry_delay_ms}ms") + Process.send_after(self(), :start_singletons, @retry_delay_ms) + {:noreply, state} + else + start_singletons() + {:noreply, :started} + end + end + + defp start_singletons do for module <- @singletons do if singleton_exists?(module) do Logger.debug( @@ -37,10 +53,10 @@ defmodule ElixirAi.ClusterSingleton do end end end - - {:noreply, :started} end + def handle_call(:status, _from, state), do: {:reply, state, state} + defp singleton_exists?(module) do case Horde.Registry.lookup(ElixirAi.ChatRegistry, module) do [{pid, _metadata} | _] when is_pid(pid) -> diff --git a/lib/elixir_ai/conversation_manager.ex b/lib/elixir_ai/conversation_manager.ex index 8da6f68..51de04c 100644 --- a/lib/elixir_ai/conversation_manager.ex +++ b/lib/elixir_ai/conversation_manager.ex @@ -21,7 +21,7 @@ defmodule ElixirAi.ConversationManager do def init(_) do Logger.info("ConversationManager initializing...") send(self(), :load_conversations) - {:ok, %{conversations: :loading, subscriptions: MapSet.new()}} + {:ok, %{conversations: :loading, subscriptions: MapSet.new(), runners: %{}}} end def create_conversation(name, ai_provider_id) do @@ -40,6 +40,10 @@ defmodule ElixirAi.ConversationManager do GenServer.call(@name, {:get_messages, name}) end + def list_runners do + GenServer.call(@name, :list_runners) + end + def handle_call(message, from, %{conversations: :loading} = state) do Logger.warning( "Received call #{inspect(message)} from #{inspect(from)} while loading conversations. Retrying after delay." @@ -75,7 +79,7 @@ defmodule ElixirAi.ConversationManager do %{conversations: conversations} = state ) do if Map.has_key?(conversations, name) do - reply_with_started(name, state) + reply_with_conversation(name, state) else {:reply, {:error, :not_found}, state} end @@ -84,10 +88,6 @@ defmodule ElixirAi.ConversationManager do def handle_call(:list, _from, %{conversations: conversations} = state) do keys = Map.keys(conversations) - Logger.debug( - "list_conversations returning: #{inspect(keys, limit: :infinity, printable_limit: :infinity, binaries: :as_binaries)}" - ) - {:reply, keys, state} end @@ -95,6 +95,19 @@ defmodule ElixirAi.ConversationManager do {:reply, Map.get(conversations, name, []), state} end + def handle_call(:list_runners, _from, state) do + {:reply, Map.get(state, :runners, %{}), state} + end + + def handle_info({:DOWN, _ref, :process, pid, reason}, %{runners: runners} = state) do + runners = + Enum.reject(runners, fn {_name, info} -> info.pid == pid end) + |> Map.new() + + Logger.info("ConversationManager: runner #{inspect(pid)} went down (#{inspect(reason)})") + {:noreply, %{state | runners: runners}} + end + def handle_info({:db_error, reason}, state) do Logger.error("ConversationManager received db_error: #{inspect(reason)}") {:noreply, state} @@ -138,10 +151,13 @@ defmodule ElixirAi.ConversationManager do end end + # Returns {pid} to callers that only need to know the process started (e.g. create). defp reply_with_started(name, state, update_state \\ fn s -> s end) do - case start_and_subscribe(name, state.subscriptions) do - {:ok, pid, new_subscriptions} -> - new_state = update_state.(%{state | subscriptions: new_subscriptions}) + case start_and_subscribe(name, state) do + {:ok, pid, new_subscriptions, new_runners} -> + new_state = + update_state.(%{state | subscriptions: new_subscriptions, runners: new_runners}) + {:reply, {:ok, pid}, new_state} {:error, _reason} = error -> @@ -149,7 +165,21 @@ defmodule ElixirAi.ConversationManager do end end - defp start_and_subscribe(name, subscriptions) do + # Returns the full conversation state using the pid directly, bypassing the + # Horde registry (which may not have synced yet on the calling node). + defp reply_with_conversation(name, state) do + case start_and_subscribe(name, state) do + {:ok, pid, new_subscriptions, new_runners} -> + new_state = %{state | subscriptions: new_subscriptions, runners: new_runners} + conversation = GenServer.call(pid, :get_conversation) + {:reply, {:ok, conversation}, new_state} + + {:error, _reason} = error -> + {:reply, error, state} + end + end + + defp start_and_subscribe(name, state) do result = case Horde.DynamicSupervisor.start_child( ElixirAi.ChatRunnerSupervisor, @@ -163,14 +193,24 @@ defmodule ElixirAi.ConversationManager do case result do {:ok, pid} -> new_subscriptions = - if MapSet.member?(subscriptions, name) do - subscriptions + if MapSet.member?(state.subscriptions, name) do + state.subscriptions else Phoenix.PubSub.subscribe(ElixirAi.PubSub, conversation_message_topic(name)) - MapSet.put(subscriptions, name) + MapSet.put(state.subscriptions, name) end - {:ok, pid, new_subscriptions} + existing_runners = Map.get(state, :runners, %{}) + + new_runners = + if Map.has_key?(existing_runners, name) do + existing_runners + else + Process.monitor(pid) + Map.put(existing_runners, name, %{pid: pid, node: node(pid)}) + end + + {:ok, pid, new_subscriptions, new_runners} error -> error diff --git a/lib/elixir_ai/data/ai_provider.ex b/lib/elixir_ai/data/ai_provider.ex index 271db62..3b3fa5c 100644 --- a/lib/elixir_ai/data/ai_provider.ex +++ b/lib/elixir_ai/data/ai_provider.ex @@ -138,28 +138,80 @@ defmodule ElixirAi.AiProvider do end def ensure_default_provider do - sql = "SELECT COUNT(*) FROM ai_providers" - params = %{} + endpoint = Application.get_env(:elixir_ai, :ai_endpoint) + token = Application.get_env(:elixir_ai, :ai_token) + model = Application.get_env(:elixir_ai, :ai_model) - case DbHelpers.run_sql(sql, params, providers_topic()) do - {:error, :db_error} -> - {:error, :db_error} + if endpoint && token && model do + case find_by_name("default") do + {:error, :not_found} -> + attrs = %{ + name: "default", + model_name: model, + api_token: token, + completions_url: endpoint + } - rows -> - case rows do - [%{"count" => 0}] -> - attrs = %{ - name: "default", - model_name: Application.fetch_env!(:elixir_ai, :ai_model), - api_token: Application.fetch_env!(:elixir_ai, :ai_token), - completions_url: Application.fetch_env!(:elixir_ai, :ai_endpoint) - } + create(attrs) - create(attrs) + {:ok, _} -> + :ok - _ -> - :ok + {:error, reason} -> + {:error, reason} + end + else + Logger.info("AI env vars not configured, skipping default provider creation") + :ok + end + end + + def ensure_providers_from_file do + case System.get_env("PROVIDERS_CONFIG_PATH") do + nil -> + :ok + + path -> + case YamlElixir.read_from_file(path) do + {:ok, %{"providers" => providers}} when is_list(providers) -> + Enum.each(providers, &ensure_provider_from_yaml/1) + + {:ok, _} -> + Logger.warning("providers.yml: expected a top-level 'providers' list, skipping") + + {:error, reason} -> + Logger.warning("Could not read providers config from #{path}: #{inspect(reason)}") end end end + + def ensure_configured_providers do + ensure_default_provider() + ensure_providers_from_file() + end + + defp ensure_provider_from_yaml(%{ + "name" => name, + "model" => model, + "responses_endpoint" => endpoint, + "api_key" => api_key + }) do + case find_by_name(name) do + {:error, :not_found} -> + Logger.info("Creating provider '#{name}' from providers config file") + create(%{name: name, model_name: model, api_token: api_key, completions_url: endpoint}) + + {:ok, _} -> + Logger.debug("Provider '#{name}' already exists, skipping") + + {:error, reason} -> + Logger.warning("Could not check existence of provider '#{name}': #{inspect(reason)}") + end + end + + defp ensure_provider_from_yaml(entry) do + Logger.warning( + "Skipping invalid provider entry in providers config file (must have name, model, responses_endpoint, api_key): #{inspect(entry)}" + ) + end end diff --git a/lib/elixir_ai/live_view_pg.ex b/lib/elixir_ai/live_view_pg.ex new file mode 100644 index 0000000..b9f19ea --- /dev/null +++ b/lib/elixir_ai/live_view_pg.ex @@ -0,0 +1,16 @@ +defmodule ElixirAi.LiveViewPG do + @moduledoc """ + Named :pg scope for tracking LiveView processes across the cluster. + Each LiveView joins {:liveview, ViewModule} on connect; :pg syncs membership + automatically and removes dead processes without any additional cleanup. + """ + + def child_spec(_opts) do + %{ + id: __MODULE__, + start: {:pg, :start_link, [__MODULE__]}, + type: :worker, + restart: :permanent + } + end +end diff --git a/lib/elixir_ai_web/admin/admin_live.ex b/lib/elixir_ai_web/admin/admin_live.ex new file mode 100644 index 0000000..285d614 --- /dev/null +++ b/lib/elixir_ai_web/admin/admin_live.ex @@ -0,0 +1,245 @@ +defmodule ElixirAiWeb.AdminLive do + use ElixirAiWeb, :live_view + require Logger + + @refresh_ms 1_000 + + def mount(_params, _session, socket) do + if connected?(socket) do + :net_kernel.monitor_nodes(true) + :pg.join(ElixirAi.LiveViewPG, {:liveview, __MODULE__}, self()) + schedule_refresh() + end + + {:ok, assign(socket, cluster_info: gather_info())} + end + + def handle_info({:nodeup, _node}, socket) do + {:noreply, assign(socket, cluster_info: gather_info())} + end + + def handle_info({:nodedown, _node}, socket) do + {:noreply, assign(socket, cluster_info: gather_info())} + end + + def handle_info(:refresh, socket) do + schedule_refresh() + {:noreply, assign(socket, cluster_info: gather_info())} + end + + defp schedule_refresh, do: Process.send_after(self(), :refresh, @refresh_ms) + + defp gather_info do + import ElixirAi.PubsubTopics + + all_nodes = [Node.self() | Node.list()] + configured = ElixirAi.ClusterSingleton.configured_singletons() + + node_statuses = + Enum.map(all_nodes, fn node -> + status = + if node == Node.self() do + try do + ElixirAi.ClusterSingleton.status() + catch + _, _ -> :unreachable + end + else + case :rpc.call(node, ElixirAi.ClusterSingleton, :status, [], 3_000) do + {:badrpc, _} -> :unreachable + result -> result + end + end + + {node, status} + end) + + singleton_locations = + Enum.map(configured, fn module -> + location = + case Horde.Registry.lookup(ElixirAi.ChatRegistry, module) do + [{pid, _}] -> node(pid) + _ -> nil + end + + {module, location} + end) + + # All ChatRunner entries in the distributed registry, keyed by conversation name. + # Each entry is a {name, node, pid, supervisor_node} tuple. + chat_runners = + Horde.DynamicSupervisor.which_children(ElixirAi.ChatRunnerSupervisor) + |> Enum.flat_map(fn + {_, pid, _, _} when is_pid(pid) -> + case Horde.Registry.select(ElixirAi.ChatRegistry, [ + {{:"$1", pid, :"$2"}, [], [{{:"$1", pid, :"$2"}}]} + ]) do + [{name, ^pid, _}] when is_binary(name) -> [{name, node(pid), pid}] + _ -> [] + end + + _ -> + [] + end) + |> Enum.sort_by(&elem(&1, 0)) + + # :pg is cluster-wide — one local call returns members from all nodes. + # Processes are automatically removed from their group when they die. + liveviews = + :pg.which_groups(ElixirAi.LiveViewPG) + |> Enum.flat_map(fn + {:liveview, view} -> + :pg.get_members(ElixirAi.LiveViewPG, {:liveview, view}) + |> Enum.map(fn pid -> {view, node(pid)} end) + + _ -> + [] + end) + + %{ + nodes: node_statuses, + configured_singletons: configured, + singleton_locations: singleton_locations, + chat_runners: chat_runners, + liveviews: liveviews + } + end + + def render(assigns) do + ~H""" +
+ Singletons +
++ Chat Runners + + {length(node_runners)} + +
++ LiveViews +
+No active processes
+ <% end %> +Refreshes every 1s or on node events.
+