working on redundancy

This commit is contained in:
2026-03-06 16:37:31 -07:00
parent 8059048db2
commit 181c6ca84b
16 changed files with 282 additions and 29 deletions

View File

@@ -7,13 +7,27 @@ defmodule ElixirAi.Application do
children = [
ElixirAiWeb.Telemetry,
ElixirAi.Repo,
{DNSCluster, query: Application.get_env(:elixir_ai, :dns_cluster_query) || :ignore},
{Cluster.Supervisor,
[Application.get_env(:libcluster, :topologies, []), [name: ElixirAi.ClusterSupervisor]]},
{Phoenix.PubSub, name: ElixirAi.PubSub},
ElixirAi.ToolTesting,
ElixirAiWeb.Endpoint,
{Registry, keys: :unique, name: ElixirAi.ChatRegistry},
{DynamicSupervisor, name: ElixirAi.ChatRunnerSupervisor, strategy: :one_for_one},
ElixirAi.ConversationManager
{Horde.Registry,
[
name: ElixirAi.ChatRegistry,
keys: :unique,
members: :auto,
delta_crdt_options: [sync_interval: 100]
]},
{Horde.DynamicSupervisor,
[
name: ElixirAi.ChatRunnerSupervisor,
strategy: :one_for_one,
members: :auto,
delta_crdt_options: [sync_interval: 100],
process_redistribution: :active
]},
ElixirAi.ClusterSingleton
]
opts = [strategy: :one_for_one, name: ElixirAi.Supervisor]

View File

@@ -4,7 +4,7 @@ defmodule ElixirAi.ChatRunner do
import ElixirAi.ChatUtils
alias ElixirAi.{Conversation, Message}
defp via(name), do: {:via, Registry, {ElixirAi.ChatRegistry, name}}
defp via(name), do: {:via, Horde.Registry, {ElixirAi.ChatRegistry, name}}
defp topic(name), do: "ai_chat:#{name}"
defp message_topic(name), do: "conversation_messages:#{name}"
@@ -32,6 +32,13 @@ defmodule ElixirAi.ChatRunner do
_ -> []
end
last_message = List.last(messages)
if last_message && last_message.role == :user do
Logger.info("Last message role was #{last_message.role}, requesting AI response for conversation #{name}")
request_ai_response(self(), messages, tools(self(), name))
end
{:ok,
%{
name: name,
@@ -282,6 +289,12 @@ defmodule ElixirAi.ChatRunner do
}}
end
def handle_info({:ai_request_error, reason}, state) do
Logger.error("AI request error: #{inspect(reason)}")
broadcast_ui(state.name, {:ai_request_error, reason})
{:noreply, %{state | streaming_response: nil, pending_tool_calls: []}}
end
def handle_call(:get_conversation, _from, state) do
{:reply, state, state}
end

View File

@@ -0,0 +1,31 @@
defmodule ElixirAi.ClusterSingleton do
use GenServer
@sync_delay_ms 200
@singletons [ElixirAi.ConversationManager]
def start_link(opts), do: GenServer.start_link(__MODULE__, opts, name: __MODULE__)
@impl true
def init(_opts) do
Process.send_after(self(), :start_singletons, @sync_delay_ms)
{:ok, :pending}
end
@impl true
def handle_info(:start_singletons, state) do
for module <- @singletons do
case Horde.DynamicSupervisor.start_child(ElixirAi.ChatRunnerSupervisor, module) do
{:ok, _pid} -> :ok
{:error, {:already_started, _pid}} -> :ok
{:error, :already_present} -> :ok
{:error, reason} ->
require Logger
Logger.warning("ClusterSingleton: failed to start #{inspect(module)}: #{inspect(reason)}")
end
end
{:noreply, :started}
end
end

View File

@@ -2,7 +2,19 @@ defmodule ElixirAi.ConversationManager do
use GenServer
alias ElixirAi.{Conversation, Message}
def start_link(_opts), do: GenServer.start_link(__MODULE__, nil, name: __MODULE__)
@name {:via, Horde.Registry, {ElixirAi.ChatRegistry, __MODULE__}}
def start_link(_opts) do
GenServer.start_link(__MODULE__, nil, name: @name)
end
def child_spec(opts) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [opts]},
restart: :transient
}
end
def init(_) do
names = Conversation.all_names()
@@ -11,19 +23,19 @@ defmodule ElixirAi.ConversationManager do
end
def create_conversation(name) do
GenServer.call(__MODULE__, {:create, name})
GenServer.call(@name, {:create, name})
end
def open_conversation(name) do
GenServer.call(__MODULE__, {:open, name})
GenServer.call(@name, {:open, name})
end
def list_conversations do
GenServer.call(__MODULE__, :list)
GenServer.call(@name, :list)
end
def get_messages(name) do
GenServer.call(__MODULE__, {:get_messages, name})
GenServer.call(@name, {:get_messages, name})
end
def handle_call({:create, name}, _from, conversations) do
@@ -64,10 +76,9 @@ defmodule ElixirAi.ConversationManager do
def handle_info({:store_message, name, message}, conversations) do
messages = Map.get(conversations, name, [])
position = length(messages)
case Conversation.find_id(name) do
{:ok, conv_id} -> Message.insert(conv_id, message, position)
{:ok, conv_id} -> Message.insert(conv_id, message)
_ -> :ok
end
@@ -76,7 +87,7 @@ defmodule ElixirAi.ConversationManager do
defp start_and_subscribe(name) do
result =
case DynamicSupervisor.start_child(
case Horde.DynamicSupervisor.start_child(
ElixirAi.ChatRunnerSupervisor,
{ElixirAi.ChatRunner, name: name}
) do

View File

@@ -7,7 +7,7 @@ defmodule ElixirAi.Conversation do
end
def create(name) do
case Repo.insert_all("conversations", [[id: Ecto.UUID.generate(), name: name, inserted_at: now(), updated_at: now()]]) do
case Repo.insert_all("conversations", [[name: name, inserted_at: now(), updated_at: now()]]) do
{1, _} -> :ok
_ -> {:error, :db_error}
end

View File

@@ -6,7 +6,7 @@ defmodule ElixirAi.Message do
Repo.all(
from m in "messages",
where: m.conversation_id == ^conversation_id,
order_by: m.position,
order_by: m.id,
select: %{
role: m.role,
content: m.content,
@@ -18,17 +18,15 @@ defmodule ElixirAi.Message do
|> Enum.map(&decode_message/1)
end
def insert(conversation_id, message, position) do
def insert(conversation_id, message) do
Repo.insert_all("messages", [
[
id: Ecto.UUID.generate(),
conversation_id: conversation_id,
role: to_string(message.role),
content: message[:content],
reasoning_content: message[:reasoning_content],
tool_calls: encode_tool_calls(message[:tool_calls]),
tool_call_id: message[:tool_call_id],
position: position,
inserted_at: DateTime.truncate(DateTime.utc_now(), :second)
]
])
@@ -40,9 +38,22 @@ defmodule ElixirAi.Message do
defp decode_message(row) do
row
|> Map.update!(:role, &String.to_existing_atom/1)
|> Map.update(:tool_calls, nil, fn
nil -> nil
json when is_binary(json) ->
json |> Jason.decode!() |> Enum.map(&atomize_keys/1)
already_decoded -> Enum.map(already_decoded, &atomize_keys/1)
end)
|> drop_nil_fields()
end
defp atomize_keys(map) when is_map(map) do
Map.new(map, fn
{k, v} when is_binary(k) -> {String.to_atom(k), v}
{k, v} -> {k, v}
end)
end
defp drop_nil_fields(map) do
Map.reject(map, fn {_k, v} -> is_nil(v) end)
end