diff --git a/config/test.exs b/config/test.exs index ab910f4..a7c266c 100644 --- a/config/test.exs +++ b/config/test.exs @@ -1,5 +1,8 @@ import Config +# Mark this as test environment +config :elixir_ai, :env, :test + # We don't run a server during test. If one is required, # you can enable the server option below. config :elixir_ai, ElixirAiWeb.Endpoint, @@ -16,3 +19,7 @@ config :phoenix, :plug_init_mode, :runtime # Enable helpful, but potentially expensive runtime checks config :phoenix_live_view, enable_expensive_runtime_checks: true + +# Configure the database for testing +# We use pool_size: 0 to prevent database connections during tests +config :elixir_ai, ElixirAi.Repo, pool_size: 0 diff --git a/docker-compose.yml b/docker-compose.yml index e48d0ae..b3c482e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -6,8 +6,6 @@ services: POSTGRES_PASSWORD: elixir_ai POSTGRES_DB: elixir_ai_dev command: postgres -c hba_file=/etc/postgresql/pg_hba.conf - ports: - - 5432:5432 volumes: - ./schema.sql:/docker-entrypoint-initdb.d/schema.sql - ./postgres/pg_hba.conf:/etc/postgresql/pg_hba.conf diff --git a/lib/elixir_ai/ai_utils/stream_line_utils.ex b/lib/elixir_ai/ai_utils/stream_line_utils.ex index bf79468..1e87830 100644 --- a/lib/elixir_ai/ai_utils/stream_line_utils.ex +++ b/lib/elixir_ai/ai_utils/stream_line_utils.ex @@ -123,7 +123,7 @@ defmodule ElixirAi.AiUtils.StreamLineUtils do %{ "choices" => [%{"finish_reason" => "tool_calls"}], "id" => id - } = message + } ) do # Logger.info("Received tool_calls_finished with message: #{inspect(message)}") send(server, {:ai_tool_call_end, id}) diff --git a/lib/elixir_ai/application.ex b/lib/elixir_ai/application.ex index 3d59f87..94ac8ad 100644 --- a/lib/elixir_ai/application.ex +++ b/lib/elixir_ai/application.ex @@ -6,8 +6,9 @@ defmodule ElixirAi.Application do def start(_type, _args) do children = [ ElixirAiWeb.Telemetry, - ElixirAi.Repo, - {Task, fn -> ElixirAi.AiProvider.ensure_default_provider() end}, + # Conditionally start Repo (skip in test environment) + repo_child_spec(), + default_provider_task(), {Cluster.Supervisor, [Application.get_env(:libcluster, :topologies, []), [name: ElixirAi.ClusterSupervisor]]}, {Phoenix.PubSub, name: ElixirAi.PubSub}, @@ -40,4 +41,21 @@ defmodule ElixirAi.Application do ElixirAiWeb.Endpoint.config_change(changed, removed) :ok end + + # Skip Repo and related tasks in test environment + defp repo_child_spec do + if Application.get_env(:elixir_ai, :env) == :test do + Supervisor.child_spec({Task, fn -> :ok end}, id: :skip_repo) + else + ElixirAi.Repo + end + end + + defp default_provider_task do + if Application.get_env(:elixir_ai, :env) == :test do + Supervisor.child_spec({Task, fn -> :ok end}, id: :skip_default_provider) + else + {Task, fn -> ElixirAi.AiProvider.ensure_default_provider() end} + end + end end diff --git a/lib/elixir_ai/chat_runner.ex b/lib/elixir_ai/chat_runner.ex index 843a8b3..b99ae2a 100644 --- a/lib/elixir_ai/chat_runner.ex +++ b/lib/elixir_ai/chat_runner.ex @@ -6,7 +6,7 @@ defmodule ElixirAi.ChatRunner do defp via(name), do: {:via, Horde.Registry, {ElixirAi.ChatRegistry, name}} defp topic(name), do: "ai_chat:#{name}" - defp message_topic(name), do: "conversation_messages:#{name}" + def message_topic(name), do: "conversation_messages:#{name}" def new_user_message(name, text_content) do GenServer.cast(via(name), {:user_message, text_content}) @@ -28,14 +28,17 @@ defmodule ElixirAi.ChatRunner do def init(name) do messages = case Conversation.find_id(name) do - {:ok, conv_id} -> Message.load_for_conversation(conv_id) + {:ok, conv_id} -> Message.load_for_conversation(conv_id, topic: message_topic(name)) _ -> [] end last_message = List.last(messages) if last_message && last_message.role == :user do - Logger.info("Last message role was #{last_message.role}, requesting AI response for conversation #{name}") + Logger.info( + "Last message role was #{last_message.role}, requesting AI response for conversation #{name}" + ) + request_ai_response(self(), messages, tools(self(), name)) end @@ -48,7 +51,7 @@ defmodule ElixirAi.ChatRunner do tools: tools(self(), name), ai_provider_url: Application.get_env(:elixir_ai, :ai_provider_url), ai_model: Application.get_env(:elixir_ai, :ai_model), - ai_token: Application.get_env(:elixir_ai, :ai_token), + ai_token: Application.get_env(:elixir_ai, :ai_token) }} end diff --git a/lib/elixir_ai/cluster_singleton.ex b/lib/elixir_ai/cluster_singleton.ex index adf51a2..e8c2b45 100644 --- a/lib/elixir_ai/cluster_singleton.ex +++ b/lib/elixir_ai/cluster_singleton.ex @@ -1,5 +1,6 @@ defmodule ElixirAi.ClusterSingleton do use GenServer + require Logger @sync_delay_ms 200 @@ -7,25 +8,46 @@ defmodule ElixirAi.ClusterSingleton do def start_link(opts), do: GenServer.start_link(__MODULE__, opts, name: __MODULE__) - @impl true def init(_opts) do Process.send_after(self(), :start_singletons, @sync_delay_ms) {:ok, :pending} end - @impl true - def handle_info(:start_singletons, state) do + def handle_info(:start_singletons, _state) do for module <- @singletons do - case Horde.DynamicSupervisor.start_child(ElixirAi.ChatRunnerSupervisor, module) do - {:ok, _pid} -> :ok - {:error, {:already_started, _pid}} -> :ok - {:error, :already_present} -> :ok - {:error, reason} -> - require Logger - Logger.warning("ClusterSingleton: failed to start #{inspect(module)}: #{inspect(reason)}") + if singleton_exists?(module) do + Logger.debug( + "ClusterSingleton: singleton already exists, skipping start for #{inspect(module)}" + ) + else + case Horde.DynamicSupervisor.start_child(ElixirAi.ChatRunnerSupervisor, module) do + {:ok, _pid} -> + :ok + + {:error, {:already_started, _pid}} -> + :ok + + {:error, :already_present} -> + :ok + + {:error, reason} -> + Logger.warning( + "ClusterSingleton: failed to start #{inspect(module)}: #{inspect(reason)}" + ) + end end end {:noreply, :started} end + + defp singleton_exists?(module) do + case Horde.Registry.lookup(ElixirAi.ChatRegistry, module) do + [{pid, _metadata} | _] when is_pid(pid) -> + true + + _ -> + false + end + end end diff --git a/lib/elixir_ai/conversation_manager.ex b/lib/elixir_ai/conversation_manager.ex index 4b39182..2126283 100644 --- a/lib/elixir_ai/conversation_manager.ex +++ b/lib/elixir_ai/conversation_manager.ex @@ -19,19 +19,8 @@ defmodule ElixirAi.ConversationManager do def init(_) do Logger.info("ConversationManager initializing...") - conversation_list = Conversation.all_names() - Logger.info("Loaded #{length(conversation_list)} conversations from DB") - - # Log each conversation and check for UTF-8 issues - Enum.each(conversation_list, fn conv -> - Logger.info( - "Conversation: #{inspect(conv, limit: :infinity, printable_limit: :infinity, binaries: :as_binaries)}" - ) - end) - - conversations = Map.new(conversation_list, fn %{name: name} -> {name, []} end) - Logger.info("Conversation map keys: #{inspect(Map.keys(conversations))}") - {:ok, conversations} + send(self(), :load_conversations) + {:ok, :loading_conversations} end def create_conversation(name, ai_provider_id) do @@ -50,6 +39,15 @@ defmodule ElixirAi.ConversationManager do GenServer.call(@name, {:get_messages, name}) end + def handle_call(message, from, :loading_conversations) do + Logger.warning( + "Received call #{inspect(message)} from #{inspect(from)} while loading conversations. Retrying after delay." + ) + + Process.send_after(self(), {:retry_call, message, from}, 100) + {:noreply, :loading_conversations} + end + def handle_call({:create, name, ai_provider_id}, _from, conversations) do if Map.has_key?(conversations, name) do {:reply, {:error, :already_exists}, conversations} @@ -94,13 +92,37 @@ defmodule ElixirAi.ConversationManager do def handle_info({:store_message, name, message}, conversations) do case Conversation.find_id(name) do - {:ok, conv_id} -> Message.insert(conv_id, message) - _ -> :ok + {:ok, conv_id} -> + Message.insert(conv_id, message, topic: ElixirAi.ChatRunner.message_topic(name)) + + _ -> + :ok end {:noreply, Map.update(conversations, name, [message], &(&1 ++ [message]))} end + def handle_info(:load_conversations, _conversations) do + conversation_list = Conversation.all_names() + Logger.info("Loaded #{length(conversation_list)} conversations from DB") + + conversations = Map.new(conversation_list, fn %{name: name} -> {name, []} end) + Logger.info("Conversation map keys: #{inspect(Map.keys(conversations))}") + # {:ok, conversations} + {:noreply, conversations} + end + + def handle_info({:retry_call, message, from}, state) do + case handle_call(message, from, state) do + {:reply, reply, new_state} -> + GenServer.reply(from, reply) + {:noreply, new_state} + + {:noreply, new_state} -> + {:noreply, new_state} + end + end + defp start_and_subscribe(name) do result = case Horde.DynamicSupervisor.start_child( @@ -114,7 +136,7 @@ defmodule ElixirAi.ConversationManager do case result do {:ok, _pid} -> - Phoenix.PubSub.subscribe(ElixirAi.PubSub, "conversation_messages:#{name}") + Phoenix.PubSub.subscribe(ElixirAi.PubSub, ElixirAi.ChatRunner.message_topic(name)) result _ -> diff --git a/lib/elixir_ai/data/ai_provider.ex b/lib/elixir_ai/data/ai_provider.ex index 14a5d37..e847b7b 100644 --- a/lib/elixir_ai/data/ai_provider.ex +++ b/lib/elixir_ai/data/ai_provider.ex @@ -1,25 +1,31 @@ defmodule ElixirAi.AiProvider do - import Ecto.Query + use ElixirAi.Data alias ElixirAi.Repo alias ElixirAi.Data.AiProviderSchema - require Logger def all do - results = - Repo.all( - from(p in AiProviderSchema, - select: %{ - id: p.id, - name: p.name, - model_name: p.model_name - } - ) - ) - |> Enum.map(&convert_id_to_string/1) + broadcast_error topic: "ai_providers" do + sql = "SELECT id, name, model_name FROM ai_providers" + result = Ecto.Adapters.SQL.query!(Repo, sql, []) - Logger.debug("AiProvider.all() returning: #{inspect(results)}") + results = + Enum.map(result.rows, fn [id, name, model_name] -> + attrs = %{id: id, name: name, model_name: model_name} |> convert_id_to_string() - results + case Zoi.parse(AiProviderSchema.partial_schema(), attrs) do + {:ok, valid} -> + struct(AiProviderSchema, valid) + + {:error, errors} -> + Logger.error("Invalid provider data from DB: #{inspect(errors)}") + raise ArgumentError, "Invalid provider data: #{inspect(errors)}" + end + end) + + Logger.debug("AiProvider.all() returning: #{inspect(results)}") + + results + end end # Convert binary UUID to string for frontend @@ -30,69 +36,83 @@ defmodule ElixirAi.AiProvider do defp convert_id_to_string(provider), do: provider def create(attrs) do - now = DateTime.truncate(DateTime.utc_now(), :second) + broadcast_error topic: "ai_providers" do + now = DateTime.truncate(DateTime.utc_now(), :second) - case Repo.insert_all("ai_providers", [ - [ - name: attrs.name, - model_name: attrs.model_name, - api_token: attrs.api_token, - completions_url: attrs.completions_url, - inserted_at: now, - updated_at: now - ] - ]) do - {1, _} -> - Phoenix.PubSub.broadcast( - ElixirAi.PubSub, - "ai_providers", - {:provider_added, attrs} - ) + sql = """ + INSERT INTO ai_providers (name, model_name, api_token, completions_url, inserted_at, updated_at) + VALUES ($1, $2, $3, $4, $5, $6) + """ - :ok + params = [attrs.name, attrs.model_name, attrs.api_token, attrs.completions_url, now, now] - _ -> - {:error, :db_error} + Ecto.Adapters.SQL.query!(Repo, sql, params) + + Phoenix.PubSub.broadcast( + ElixirAi.PubSub, + "ai_providers", + {:provider_added, attrs} + ) + + :ok end - rescue - e in Ecto.ConstraintError -> - if e.constraint == "ai_providers_name_key", - do: {:error, :already_exists}, - else: {:error, :db_error} end def find_by_name(name) do - case Repo.one( - from(p in "ai_providers", - where: p.name == ^name, - select: %{ - id: p.id, - name: p.name, - model_name: p.model_name, - api_token: p.api_token, - completions_url: p.completions_url - } - ) - ) do - nil -> {:error, :not_found} - provider -> {:ok, convert_id_to_string(provider)} + broadcast_error topic: "ai_providers" do + sql = """ + SELECT id, name, model_name, api_token, completions_url + FROM ai_providers + WHERE name = $1 + LIMIT 1 + """ + + case Ecto.Adapters.SQL.query!(Repo, sql, [name]) do + %{rows: []} -> + {:error, :not_found} + + %{rows: [[id, name, model_name, api_token, completions_url] | _]} -> + attrs = + %{ + id: id, + name: name, + model_name: model_name, + api_token: api_token, + completions_url: completions_url + } + |> convert_id_to_string() + + case Zoi.parse(AiProviderSchema.schema(), attrs) do + {:ok, valid} -> + {:ok, struct(AiProviderSchema, valid)} + + {:error, errors} -> + Logger.error("Invalid provider data from DB: #{inspect(errors)}") + {:error, :invalid_data} + end + end end end def ensure_default_provider do - case Repo.aggregate(from(p in "ai_providers"), :count) do - 0 -> - attrs = %{ - name: "default", - model_name: Application.fetch_env!(:elixir_ai, :ai_model), - api_token: Application.fetch_env!(:elixir_ai, :ai_token), - completions_url: Application.fetch_env!(:elixir_ai, :ai_endpoint) - } + broadcast_error topic: "ai_providers" do + sql = "SELECT COUNT(*) FROM ai_providers" + result = Ecto.Adapters.SQL.query!(Repo, sql, []) - create(attrs) + case result.rows do + [[0]] -> + attrs = %{ + name: "default", + model_name: Application.fetch_env!(:elixir_ai, :ai_model), + api_token: Application.fetch_env!(:elixir_ai, :ai_token), + completions_url: Application.fetch_env!(:elixir_ai, :ai_endpoint) + } - _ -> - :ok + create(attrs) + + _ -> + :ok + end end end end diff --git a/lib/elixir_ai/data/conversation.ex b/lib/elixir_ai/data/conversation.ex index d8c4baa..0692025 100644 --- a/lib/elixir_ai/data/conversation.ex +++ b/lib/elixir_ai/data/conversation.ex @@ -1,103 +1,99 @@ defmodule ElixirAi.Conversation do - import Ecto.Query + use ElixirAi.Data alias ElixirAi.Repo - alias ElixirAi.Data.ConversationSchema - alias ElixirAi.Data.AiProviderSchema - require Logger defmodule Provider do - use Ecto.Schema - import Ecto.Changeset + defstruct [:name, :model_name, :api_token, :completions_url] - @primary_key false - embedded_schema do - field(:name, :string) - field(:model_name, :string) - field(:api_token, :string) - field(:completions_url, :string) - end - - def changeset(provider, attrs) do - provider - |> cast(attrs, [:name, :model_name, :api_token, :completions_url]) - |> validate_required([:name, :model_name, :api_token, :completions_url]) + def schema do + Zoi.object(%{ + name: Zoi.string(), + model_name: Zoi.string(), + api_token: Zoi.string(), + completions_url: Zoi.string() + }) end end defmodule ConversationInfo do - use Ecto.Schema - import Ecto.Changeset + defstruct [:name, :provider] - @primary_key false - embedded_schema do - field(:name, :string) - embeds_one(:provider, Provider) - end - - def changeset(conversation, attrs) do - conversation - |> cast(attrs, [:name]) - |> validate_required([:name]) - |> cast_embed(:provider, with: &Provider.changeset/2, required: true) + def schema do + Zoi.object(%{ + name: Zoi.string(), + provider: + Zoi.object(%{ + name: Zoi.string(), + model_name: Zoi.string(), + api_token: Zoi.string(), + completions_url: Zoi.string() + }) + }) end end def all_names do - results = - Repo.all( - from(c in ConversationSchema, - left_join: p in AiProviderSchema, - on: c.ai_provider_id == p.id, - select: %{ - name: c.name, - provider: %{ - name: p.name, - model_name: p.model_name, - api_token: p.api_token, - completions_url: p.completions_url - } + broadcast_error topic: "conversations" do + sql = """ + SELECT c.name, p.name, p.model_name, p.api_token, p.completions_url + FROM conversations c + LEFT JOIN ai_providers p ON c.ai_provider_id = p.id + """ + + result = Ecto.Adapters.SQL.query!(Repo, sql, []) + + Enum.map(result.rows, fn [name, provider_name, model_name, api_token, completions_url] -> + attrs = %{ + name: name, + provider: %{ + name: provider_name, + model_name: model_name, + api_token: api_token, + completions_url: completions_url } - ) - ) + } - Enum.map(results, fn attrs -> - changeset = ConversationInfo.changeset(%ConversationInfo{}, attrs) + case Zoi.parse(ConversationInfo.schema(), attrs) do + {:ok, valid} -> + struct(ConversationInfo, Map.put(valid, :provider, struct(Provider, valid.provider))) - if changeset.valid? do - Ecto.Changeset.apply_changes(changeset) - else - Logger.error("Invalid conversation data: #{inspect(changeset.errors)}") - raise ArgumentError, "Invalid conversation data: #{inspect(changeset.errors)}" - end - end) + {:error, errors} -> + Logger.error("Invalid conversation data: #{inspect(errors)}") + raise ArgumentError, "Invalid conversation data: #{inspect(errors)}" + end + end) + end end def create(name, ai_provider_id) when is_binary(ai_provider_id) do - # Convert string UUID from frontend to binary UUID for database - case Ecto.UUID.dump(ai_provider_id) do - {:ok, binary_id} -> - Repo.insert_all("conversations", [ - [name: name, ai_provider_id: binary_id, inserted_at: now(), updated_at: now()] - ]) - |> case do - {1, _} -> :ok - _ -> {:error, :db_error} - end + broadcast_error topic: "conversations" do + case Ecto.UUID.dump(ai_provider_id) do + {:ok, binary_id} -> + sql = """ + INSERT INTO conversations (name, ai_provider_id, inserted_at, updated_at) + VALUES ($1, $2, $3, $4) + """ - :error -> - {:error, :invalid_uuid} + timestamp = now() + params = [name, binary_id, timestamp, timestamp] + + Ecto.Adapters.SQL.query!(Repo, sql, params) + :ok + + :error -> + {:error, :invalid_uuid} + end end - rescue - e in Ecto.ConstraintError -> - if e.constraint == "conversations_name_index", - do: {:error, :already_exists}, - else: {:error, :db_error} end def find_id(name) do - case Repo.one(from(c in ConversationSchema, where: c.name == ^name, select: c.id)) do - nil -> {:error, :not_found} - id -> {:ok, id} + broadcast_error topic: "conversations" do + sql = "SELECT id FROM conversations WHERE name = $1 LIMIT 1" + + case Ecto.Adapters.SQL.query!(Repo, sql, [name]) do + %{rows: []} -> {:error, :not_found} + %{rows: [[id] | _]} -> {:ok, id} + end end end diff --git a/lib/elixir_ai/data/data.ex b/lib/elixir_ai/data/data.ex new file mode 100644 index 0000000..0c8831d --- /dev/null +++ b/lib/elixir_ai/data/data.ex @@ -0,0 +1,32 @@ +defmodule ElixirAi.Data do + defmacro __using__(_opts) do + quote do + import ElixirAi.Data + require Logger + end + end + + defmacro broadcast_error(opts, do: block) do + topic = Keyword.get(opts, :topic) + build_with_db(block, topic) + end + + defp build_with_db(block, topic) do + quote do + try do + unquote(block) + rescue + exception -> + Logger.error("Database error: #{Exception.message(exception)}") + + Phoenix.PubSub.broadcast( + ElixirAi.PubSub, + unquote(topic), + {:db_error, Exception.message(exception)} + ) + + {:error, :db_error} + end + end + end +end diff --git a/lib/elixir_ai/data/db_helpers.ex b/lib/elixir_ai/data/db_helpers.ex new file mode 100644 index 0000000..9664597 --- /dev/null +++ b/lib/elixir_ai/data/db_helpers.ex @@ -0,0 +1,26 @@ +defmodule ElixirAi.Data.DbHelpers do + @get_named_param ~r/\$\((\w+)\)/ + + def named_params_to_positional_params(query, params) do + param_occurrences = Regex.scan(@get_named_param, query) + + {param_to_index, ordered_values} = + param_occurrences + |> Enum.reduce({%{}, []}, fn [_full_match, param_name], {index_map, values} -> + if Map.has_key?(index_map, param_name) do + {index_map, values} + else + next_index = map_size(index_map) + 1 + param_value = Map.fetch!(params, param_name) + {Map.put(index_map, param_name, next_index), values ++ [param_value]} + end + end) + + positional_sql = + Regex.replace(@get_named_param, query, fn _full_match, param_name -> + "$#{param_to_index[param_name]}" + end) + + {positional_sql, ordered_values} + end +end diff --git a/lib/elixir_ai/data/message.ex b/lib/elixir_ai/data/message.ex index 385dcbd..9bc9aff 100644 --- a/lib/elixir_ai/data/message.ex +++ b/lib/elixir_ai/data/message.ex @@ -1,50 +1,95 @@ defmodule ElixirAi.Message do - import Ecto.Query + use ElixirAi.Data alias ElixirAi.Repo alias ElixirAi.Data.MessageSchema - def load_for_conversation(conversation_id) do - Repo.all( - from m in MessageSchema, - where: m.conversation_id == ^conversation_id, - order_by: m.id, - select: %{ - role: m.role, - content: m.content, - reasoning_content: m.reasoning_content, - tool_calls: m.tool_calls, - tool_call_id: m.tool_call_id - } - ) - |> Enum.map(&decode_message/1) + def load_for_conversation(conversation_id, topic: topic) do + broadcast_error topic: topic do + with {:ok, db_conversation_id} <- dump_uuid(conversation_id) do + sql = """ + SELECT role, content, reasoning_content, tool_calls, tool_call_id + FROM messages + WHERE conversation_id = $1 + ORDER BY id + """ + + result = Ecto.Adapters.SQL.query!(Repo, sql, [db_conversation_id]) + + Enum.map(result.rows, fn row -> + raw = %{ + role: Enum.at(row, 0), + content: Enum.at(row, 1), + reasoning_content: Enum.at(row, 2), + tool_calls: Enum.at(row, 3), + tool_call_id: Enum.at(row, 4) + } + + case Zoi.parse(MessageSchema.schema(), raw) do + {:ok, _valid} -> + struct(MessageSchema, decode_message(raw)) + + {:error, errors} -> + Logger.error("Invalid message data from DB: #{inspect(errors)}") + raise ArgumentError, "Invalid message data: #{inspect(errors)}" + end + end) + else + :error -> [] + end + end end - def insert(conversation_id, message) do - Repo.insert_all("messages", [ - [ - conversation_id: conversation_id, - role: to_string(message.role), - content: message[:content], - reasoning_content: message[:reasoning_content], - tool_calls: encode_tool_calls(message[:tool_calls]), - tool_call_id: message[:tool_call_id], - inserted_at: DateTime.truncate(DateTime.utc_now(), :second) - ] - ]) + def insert(conversation_id, message, topic: topic) do + broadcast_error topic: topic do + with {:ok, db_conversation_id} <- dump_uuid(conversation_id) do + sql = """ + INSERT INTO messages ( + conversation_id, role, content, reasoning_content, + tool_calls, tool_call_id, inserted_at + ) VALUES ($1, $2, $3, $4, $5, $6, $7) + """ + + params = [ + db_conversation_id, + to_string(message.role), + message[:content], + message[:reasoning_content], + encode_tool_calls(message[:tool_calls]), + message[:tool_call_id], + DateTime.truncate(DateTime.utc_now(), :second) + ] + + Ecto.Adapters.SQL.query!(Repo, sql, params) + Logger.debug("Inserted message for conversation_id=#{Ecto.UUID.cast!(conversation_id)}") + {:ok, 1} + else + :error -> + Logger.error("Invalid conversation_id for message insert: #{inspect(conversation_id)}") + {:error, :invalid_conversation_id} + end + end end defp encode_tool_calls(nil), do: nil defp encode_tool_calls(calls), do: Jason.encode!(calls) + defp dump_uuid(id) when is_binary(id) and byte_size(id) == 16, do: {:ok, id} + defp dump_uuid(id) when is_binary(id), do: Ecto.UUID.dump(id) + defp dump_uuid(_), do: :error + defp decode_message(row) do row |> Map.update!(:role, &String.to_existing_atom/1) |> Map.update(:tool_calls, nil, fn - nil -> nil - json when is_binary(json) -> - json |> Jason.decode!() |> Enum.map(&atomize_keys/1) - already_decoded -> Enum.map(already_decoded, &atomize_keys/1) - end) + nil -> + nil + + json when is_binary(json) -> + json |> Jason.decode!() |> Enum.map(&atomize_keys/1) + + already_decoded -> + Enum.map(already_decoded, &atomize_keys/1) + end) |> drop_nil_fields() end diff --git a/lib/elixir_ai/data/schemas/ai_provider_schema.ex b/lib/elixir_ai/data/schemas/ai_provider_schema.ex index cfc1a83..80474f6 100644 --- a/lib/elixir_ai/data/schemas/ai_provider_schema.ex +++ b/lib/elixir_ai/data/schemas/ai_provider_schema.ex @@ -1,15 +1,21 @@ defmodule ElixirAi.Data.AiProviderSchema do - use Ecto.Schema + defstruct [:id, :name, :model_name, :api_token, :completions_url, :inserted_at, :updated_at] - @primary_key {:id, :binary_id, autogenerate: true} - @foreign_key_type :binary_id + def schema do + Zoi.object(%{ + id: Zoi.string(), + name: Zoi.string(), + model_name: Zoi.string(), + api_token: Zoi.string(), + completions_url: Zoi.string() + }) + end - schema "ai_providers" do - field(:name, :string) - field(:model_name, :string) - field(:api_token, :string) - field(:completions_url, :string) - - timestamps(type: :utc_datetime) + def partial_schema do + Zoi.object(%{ + id: Zoi.string(), + name: Zoi.string(), + model_name: Zoi.string() + }) end end diff --git a/lib/elixir_ai/data/schemas/conversation_schema.ex b/lib/elixir_ai/data/schemas/conversation_schema.ex index 3d43743..bef6a7e 100644 --- a/lib/elixir_ai/data/schemas/conversation_schema.ex +++ b/lib/elixir_ai/data/schemas/conversation_schema.ex @@ -1,13 +1,11 @@ defmodule ElixirAi.Data.ConversationSchema do - use Ecto.Schema + defstruct [:id, :name, :ai_provider_id, :inserted_at, :updated_at] - @primary_key {:id, :binary_id, autogenerate: true} - @foreign_key_type :binary_id - - schema "conversations" do - field(:name, :string) - belongs_to(:ai_provider, ElixirAi.Data.AiProviderSchema, type: :binary_id) - - timestamps(type: :utc_datetime) + def schema do + Zoi.object(%{ + id: Zoi.string(), + name: Zoi.string(), + ai_provider_id: Zoi.string() + }) end end diff --git a/lib/elixir_ai/data/schemas/message_schema.ex b/lib/elixir_ai/data/schemas/message_schema.ex index f6dded3..4dbc973 100644 --- a/lib/elixir_ai/data/schemas/message_schema.ex +++ b/lib/elixir_ai/data/schemas/message_schema.ex @@ -1,16 +1,18 @@ defmodule ElixirAi.Data.MessageSchema do - use Ecto.Schema + defstruct [ + :id, + :conversation_id, + :role, + :content, + :reasoning_content, + :tool_calls, + :tool_call_id, + :inserted_at + ] - @primary_key {:id, :id, autogenerate: true} - - schema "messages" do - belongs_to(:conversation, ElixirAi.Data.ConversationSchema, type: :binary_id) - field(:role, :string) - field(:content, :string) - field(:reasoning_content, :string) - field(:tool_calls, :map) - field(:tool_call_id, :string) - - timestamps(inserted_at: :inserted_at, updated_at: false, type: :utc_datetime) + def schema do + Zoi.object(%{ + role: Zoi.string() + }) end end diff --git a/lib/elixir_ai_web/live/ai_providers_live.ex b/lib/elixir_ai_web/live/ai_providers_live.ex index 346d379..615f64f 100644 --- a/lib/elixir_ai_web/live/ai_providers_live.ex +++ b/lib/elixir_ai_web/live/ai_providers_live.ex @@ -134,8 +134,6 @@ defmodule ElixirAiWeb.AiProvidersLive do ) |> assign(error: nil)} - {:error, :already_exists} -> - {:noreply, assign(socket, error: "A provider with that name already exists")} _ -> {:noreply, assign(socket, error: "Failed to create provider")} diff --git a/lib/elixir_ai_web/live/home_live.ex b/lib/elixir_ai_web/live/home_live.ex index b117d95..ba75a72 100644 --- a/lib/elixir_ai_web/live/home_live.ex +++ b/lib/elixir_ai_web/live/home_live.ex @@ -7,24 +7,13 @@ defmodule ElixirAiWeb.HomeLive do def mount(_params, _session, socket) do if connected?(socket) do Phoenix.PubSub.subscribe(ElixirAi.PubSub, "ai_providers") + send(self(), :load_data) end - conversations = ConversationManager.list_conversations() - - Logger.debug( - "Conversations: #{inspect(conversations, limit: :infinity, printable_limit: :infinity)}" - ) - - ai_providers = AiProvider.all() - - Logger.debug( - "AI Providers: #{inspect(ai_providers, limit: :infinity, printable_limit: :infinity)}" - ) - {:ok, socket - |> assign(conversations: conversations) - |> assign(ai_providers: ai_providers) + |> assign(conversations: []) + |> assign(ai_providers: []) |> assign(new_name: "") |> assign(error: nil)} end @@ -109,6 +98,25 @@ defmodule ElixirAiWeb.HomeLive do end end + def handle_info(:load_data, socket) do + conversations = ConversationManager.list_conversations() + + Logger.debug( + "Conversations: #{inspect(conversations, limit: :infinity, printable_limit: :infinity)}" + ) + + ai_providers = AiProvider.all() + + Logger.debug( + "AI Providers: #{inspect(ai_providers, limit: :infinity, printable_limit: :infinity)}" + ) + + {:noreply, + socket + |> assign(conversations: conversations) + |> assign(ai_providers: ai_providers)} + end + def handle_info({:provider_added, _attrs}, socket) do {:noreply, assign(socket, ai_providers: AiProvider.all())} end diff --git a/mix.exs b/mix.exs index a7be998..67fb510 100644 --- a/mix.exs +++ b/mix.exs @@ -59,7 +59,8 @@ defmodule ElixirAi.MixProject do {:ecto_sql, "~> 3.11"}, {:postgrex, ">= 0.0.0"}, {:horde, "~> 0.9"}, - {:credo, "~> 1.7", only: [:dev, :test], runtime: false} + {:credo, "~> 1.7", only: [:dev, :test], runtime: false}, + {:zoi, "~> 0.17"} ] end diff --git a/mix.lock b/mix.lock index bdbc9fa..edd8cf2 100644 --- a/mix.lock +++ b/mix.lock @@ -66,4 +66,5 @@ "unicode_util_compat": {:hex, :unicode_util_compat, "0.7.1", "a48703a25c170eedadca83b11e88985af08d35f37c6f664d6dcfb106a97782fc", [:rebar3], [], "hexpm", "b3a917854ce3ae233619744ad1e0102e05673136776fb2fa76234f3e03b23642"}, "websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"}, "websock_adapter": {:hex, :websock_adapter, "0.5.9", "43dc3ba6d89ef5dec5b1d0a39698436a1e856d000d84bf31a3149862b01a287f", [:mix], [{:bandit, ">= 0.6.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "5534d5c9adad3c18a0f58a9371220d75a803bf0b9a3d87e6fe072faaeed76a08"}, + "zoi": {:hex, :zoi, "0.17.1", "406aa87bb4181f41dee64336b75434367b7d3e88db813b0e6db0ae2d0f81f743", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:phoenix_html, "~> 2.14.2 or ~> 3.0 or ~> 4.1", [hex: :phoenix_html, repo: "hexpm", optional: true]}], "hexpm", "3a11bf3bc9189f988ac74e81b5d7ca0c689b2a20eed220746a7043aa528e2aab"}, } diff --git a/test/db_helper_test.exs b/test/db_helper_test.exs new file mode 100644 index 0000000..8394d40 --- /dev/null +++ b/test/db_helper_test.exs @@ -0,0 +1,113 @@ +defmodule SQLTest do + use ExUnit.Case + alias ElixirAi.Data.DbHelpers + + test "converts simple named parameters" do + query = "SELECT * FROM users WHERE id = $(id) AND email = $(email)" + + params = %{ + "id" => 10, + "email" => "test@example.com" + } + + assert DbHelpers.named_params_to_positional_params(query, params) == + { + "SELECT * FROM users WHERE id = $1 AND email = $2", + [10, "test@example.com"] + } + end + + test "reuses positional parameter for repeated named parameter" do + query = """ + SELECT * FROM users + WHERE id = $(id) + OR owner_id = $(id) + """ + + params = %{"id" => 42} + + assert DbHelpers.named_params_to_positional_params(query, params) == + { + """ + SELECT * FROM users + WHERE id = $1 + OR owner_id = $1 + """, + [42] + } + end + + test "assigns parameters in order of first appearance" do + query = "SELECT * FROM items WHERE category = $(category) AND owner = $(owner)" + + params = %{ + "owner" => 5, + "category" => "books" + } + + assert DbHelpers.named_params_to_positional_params(query, params) == + { + "SELECT * FROM items WHERE category = $1 AND owner = $2", + ["books", 5] + } + end + + test "handles multiple distinct parameters" do + query = "INSERT INTO posts(title, body, author_id) VALUES($(title), $(body), $(author))" + + params = %{ + "title" => "Hello", + "body" => "World", + "author" => 7 + } + + assert DbHelpers.named_params_to_positional_params(query, params) == + { + "INSERT INTO posts(title, body, author_id) VALUES($1, $2, $3)", + ["Hello", "World", 7] + } + end + + test "works when same parameter appears many times" do + query = """ + SELECT * + FROM logs + WHERE user_id = $(user) + OR editor_id = $(user) + OR reviewer_id = $(user) + """ + + params = %{"user" => 99} + + assert DbHelpers.named_params_to_positional_params(query, params) == + { + """ + SELECT * + FROM logs + WHERE user_id = $1 + OR editor_id = $1 + OR reviewer_id = $1 + """, + [99] + } + end + + test "raises if parameter is missing" do + query = "SELECT * FROM users WHERE id = $(id)" + + params = %{} + + assert_raise KeyError, fn -> + DbHelpers.named_params_to_positional_params(query, params) + end + end + + test "query without named parameters returns unchanged query and empty params" do + query = "SELECT * FROM users" + + params = %{} + + assert DbHelpers.named_params_to_positional_params(query, params) == + {"SELECT * FROM users", []} + end +end diff --git a/test/elixir_ai_web/controllers/page_controller_test.exs b/test/elixir_ai_web/controllers/page_controller_test.exs index abcb18b..33ad87e 100644 --- a/test/elixir_ai_web/controllers/page_controller_test.exs +++ b/test/elixir_ai_web/controllers/page_controller_test.exs @@ -1,8 +1,9 @@ defmodule ElixirAiWeb.PageControllerTest do use ElixirAiWeb.ConnCase + # homepage need db to test test "GET /", %{conn: conn} do conn = get(conn, ~p"/") - assert html_response(conn, 200) =~ "Peace of mind from prototype to production" + assert html_response(conn, 200) =~ "Conversations" end end