This commit is contained in:
@@ -1,25 +1,31 @@
|
||||
defmodule ElixirAi.AiProvider do
|
||||
import Ecto.Query
|
||||
use ElixirAi.Data
|
||||
alias ElixirAi.Repo
|
||||
alias ElixirAi.Data.AiProviderSchema
|
||||
require Logger
|
||||
|
||||
def all do
|
||||
results =
|
||||
Repo.all(
|
||||
from(p in AiProviderSchema,
|
||||
select: %{
|
||||
id: p.id,
|
||||
name: p.name,
|
||||
model_name: p.model_name
|
||||
}
|
||||
)
|
||||
)
|
||||
|> Enum.map(&convert_id_to_string/1)
|
||||
broadcast_error topic: "ai_providers" do
|
||||
sql = "SELECT id, name, model_name FROM ai_providers"
|
||||
result = Ecto.Adapters.SQL.query!(Repo, sql, [])
|
||||
|
||||
Logger.debug("AiProvider.all() returning: #{inspect(results)}")
|
||||
results =
|
||||
Enum.map(result.rows, fn [id, name, model_name] ->
|
||||
attrs = %{id: id, name: name, model_name: model_name} |> convert_id_to_string()
|
||||
|
||||
results
|
||||
case Zoi.parse(AiProviderSchema.partial_schema(), attrs) do
|
||||
{:ok, valid} ->
|
||||
struct(AiProviderSchema, valid)
|
||||
|
||||
{:error, errors} ->
|
||||
Logger.error("Invalid provider data from DB: #{inspect(errors)}")
|
||||
raise ArgumentError, "Invalid provider data: #{inspect(errors)}"
|
||||
end
|
||||
end)
|
||||
|
||||
Logger.debug("AiProvider.all() returning: #{inspect(results)}")
|
||||
|
||||
results
|
||||
end
|
||||
end
|
||||
|
||||
# Convert binary UUID to string for frontend
|
||||
@@ -30,69 +36,83 @@ defmodule ElixirAi.AiProvider do
|
||||
defp convert_id_to_string(provider), do: provider
|
||||
|
||||
def create(attrs) do
|
||||
now = DateTime.truncate(DateTime.utc_now(), :second)
|
||||
broadcast_error topic: "ai_providers" do
|
||||
now = DateTime.truncate(DateTime.utc_now(), :second)
|
||||
|
||||
case Repo.insert_all("ai_providers", [
|
||||
[
|
||||
name: attrs.name,
|
||||
model_name: attrs.model_name,
|
||||
api_token: attrs.api_token,
|
||||
completions_url: attrs.completions_url,
|
||||
inserted_at: now,
|
||||
updated_at: now
|
||||
]
|
||||
]) do
|
||||
{1, _} ->
|
||||
Phoenix.PubSub.broadcast(
|
||||
ElixirAi.PubSub,
|
||||
"ai_providers",
|
||||
{:provider_added, attrs}
|
||||
)
|
||||
sql = """
|
||||
INSERT INTO ai_providers (name, model_name, api_token, completions_url, inserted_at, updated_at)
|
||||
VALUES ($1, $2, $3, $4, $5, $6)
|
||||
"""
|
||||
|
||||
:ok
|
||||
params = [attrs.name, attrs.model_name, attrs.api_token, attrs.completions_url, now, now]
|
||||
|
||||
_ ->
|
||||
{:error, :db_error}
|
||||
Ecto.Adapters.SQL.query!(Repo, sql, params)
|
||||
|
||||
Phoenix.PubSub.broadcast(
|
||||
ElixirAi.PubSub,
|
||||
"ai_providers",
|
||||
{:provider_added, attrs}
|
||||
)
|
||||
|
||||
:ok
|
||||
end
|
||||
rescue
|
||||
e in Ecto.ConstraintError ->
|
||||
if e.constraint == "ai_providers_name_key",
|
||||
do: {:error, :already_exists},
|
||||
else: {:error, :db_error}
|
||||
end
|
||||
|
||||
def find_by_name(name) do
|
||||
case Repo.one(
|
||||
from(p in "ai_providers",
|
||||
where: p.name == ^name,
|
||||
select: %{
|
||||
id: p.id,
|
||||
name: p.name,
|
||||
model_name: p.model_name,
|
||||
api_token: p.api_token,
|
||||
completions_url: p.completions_url
|
||||
}
|
||||
)
|
||||
) do
|
||||
nil -> {:error, :not_found}
|
||||
provider -> {:ok, convert_id_to_string(provider)}
|
||||
broadcast_error topic: "ai_providers" do
|
||||
sql = """
|
||||
SELECT id, name, model_name, api_token, completions_url
|
||||
FROM ai_providers
|
||||
WHERE name = $1
|
||||
LIMIT 1
|
||||
"""
|
||||
|
||||
case Ecto.Adapters.SQL.query!(Repo, sql, [name]) do
|
||||
%{rows: []} ->
|
||||
{:error, :not_found}
|
||||
|
||||
%{rows: [[id, name, model_name, api_token, completions_url] | _]} ->
|
||||
attrs =
|
||||
%{
|
||||
id: id,
|
||||
name: name,
|
||||
model_name: model_name,
|
||||
api_token: api_token,
|
||||
completions_url: completions_url
|
||||
}
|
||||
|> convert_id_to_string()
|
||||
|
||||
case Zoi.parse(AiProviderSchema.schema(), attrs) do
|
||||
{:ok, valid} ->
|
||||
{:ok, struct(AiProviderSchema, valid)}
|
||||
|
||||
{:error, errors} ->
|
||||
Logger.error("Invalid provider data from DB: #{inspect(errors)}")
|
||||
{:error, :invalid_data}
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def ensure_default_provider do
|
||||
case Repo.aggregate(from(p in "ai_providers"), :count) do
|
||||
0 ->
|
||||
attrs = %{
|
||||
name: "default",
|
||||
model_name: Application.fetch_env!(:elixir_ai, :ai_model),
|
||||
api_token: Application.fetch_env!(:elixir_ai, :ai_token),
|
||||
completions_url: Application.fetch_env!(:elixir_ai, :ai_endpoint)
|
||||
}
|
||||
broadcast_error topic: "ai_providers" do
|
||||
sql = "SELECT COUNT(*) FROM ai_providers"
|
||||
result = Ecto.Adapters.SQL.query!(Repo, sql, [])
|
||||
|
||||
create(attrs)
|
||||
case result.rows do
|
||||
[[0]] ->
|
||||
attrs = %{
|
||||
name: "default",
|
||||
model_name: Application.fetch_env!(:elixir_ai, :ai_model),
|
||||
api_token: Application.fetch_env!(:elixir_ai, :ai_token),
|
||||
completions_url: Application.fetch_env!(:elixir_ai, :ai_endpoint)
|
||||
}
|
||||
|
||||
_ ->
|
||||
:ok
|
||||
create(attrs)
|
||||
|
||||
_ ->
|
||||
:ok
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -1,103 +1,99 @@
|
||||
defmodule ElixirAi.Conversation do
|
||||
import Ecto.Query
|
||||
use ElixirAi.Data
|
||||
alias ElixirAi.Repo
|
||||
alias ElixirAi.Data.ConversationSchema
|
||||
alias ElixirAi.Data.AiProviderSchema
|
||||
require Logger
|
||||
|
||||
defmodule Provider do
|
||||
use Ecto.Schema
|
||||
import Ecto.Changeset
|
||||
defstruct [:name, :model_name, :api_token, :completions_url]
|
||||
|
||||
@primary_key false
|
||||
embedded_schema do
|
||||
field(:name, :string)
|
||||
field(:model_name, :string)
|
||||
field(:api_token, :string)
|
||||
field(:completions_url, :string)
|
||||
end
|
||||
|
||||
def changeset(provider, attrs) do
|
||||
provider
|
||||
|> cast(attrs, [:name, :model_name, :api_token, :completions_url])
|
||||
|> validate_required([:name, :model_name, :api_token, :completions_url])
|
||||
def schema do
|
||||
Zoi.object(%{
|
||||
name: Zoi.string(),
|
||||
model_name: Zoi.string(),
|
||||
api_token: Zoi.string(),
|
||||
completions_url: Zoi.string()
|
||||
})
|
||||
end
|
||||
end
|
||||
|
||||
defmodule ConversationInfo do
|
||||
use Ecto.Schema
|
||||
import Ecto.Changeset
|
||||
defstruct [:name, :provider]
|
||||
|
||||
@primary_key false
|
||||
embedded_schema do
|
||||
field(:name, :string)
|
||||
embeds_one(:provider, Provider)
|
||||
end
|
||||
|
||||
def changeset(conversation, attrs) do
|
||||
conversation
|
||||
|> cast(attrs, [:name])
|
||||
|> validate_required([:name])
|
||||
|> cast_embed(:provider, with: &Provider.changeset/2, required: true)
|
||||
def schema do
|
||||
Zoi.object(%{
|
||||
name: Zoi.string(),
|
||||
provider:
|
||||
Zoi.object(%{
|
||||
name: Zoi.string(),
|
||||
model_name: Zoi.string(),
|
||||
api_token: Zoi.string(),
|
||||
completions_url: Zoi.string()
|
||||
})
|
||||
})
|
||||
end
|
||||
end
|
||||
|
||||
def all_names do
|
||||
results =
|
||||
Repo.all(
|
||||
from(c in ConversationSchema,
|
||||
left_join: p in AiProviderSchema,
|
||||
on: c.ai_provider_id == p.id,
|
||||
select: %{
|
||||
name: c.name,
|
||||
provider: %{
|
||||
name: p.name,
|
||||
model_name: p.model_name,
|
||||
api_token: p.api_token,
|
||||
completions_url: p.completions_url
|
||||
}
|
||||
broadcast_error topic: "conversations" do
|
||||
sql = """
|
||||
SELECT c.name, p.name, p.model_name, p.api_token, p.completions_url
|
||||
FROM conversations c
|
||||
LEFT JOIN ai_providers p ON c.ai_provider_id = p.id
|
||||
"""
|
||||
|
||||
result = Ecto.Adapters.SQL.query!(Repo, sql, [])
|
||||
|
||||
Enum.map(result.rows, fn [name, provider_name, model_name, api_token, completions_url] ->
|
||||
attrs = %{
|
||||
name: name,
|
||||
provider: %{
|
||||
name: provider_name,
|
||||
model_name: model_name,
|
||||
api_token: api_token,
|
||||
completions_url: completions_url
|
||||
}
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
Enum.map(results, fn attrs ->
|
||||
changeset = ConversationInfo.changeset(%ConversationInfo{}, attrs)
|
||||
case Zoi.parse(ConversationInfo.schema(), attrs) do
|
||||
{:ok, valid} ->
|
||||
struct(ConversationInfo, Map.put(valid, :provider, struct(Provider, valid.provider)))
|
||||
|
||||
if changeset.valid? do
|
||||
Ecto.Changeset.apply_changes(changeset)
|
||||
else
|
||||
Logger.error("Invalid conversation data: #{inspect(changeset.errors)}")
|
||||
raise ArgumentError, "Invalid conversation data: #{inspect(changeset.errors)}"
|
||||
end
|
||||
end)
|
||||
{:error, errors} ->
|
||||
Logger.error("Invalid conversation data: #{inspect(errors)}")
|
||||
raise ArgumentError, "Invalid conversation data: #{inspect(errors)}"
|
||||
end
|
||||
end)
|
||||
end
|
||||
end
|
||||
|
||||
def create(name, ai_provider_id) when is_binary(ai_provider_id) do
|
||||
# Convert string UUID from frontend to binary UUID for database
|
||||
case Ecto.UUID.dump(ai_provider_id) do
|
||||
{:ok, binary_id} ->
|
||||
Repo.insert_all("conversations", [
|
||||
[name: name, ai_provider_id: binary_id, inserted_at: now(), updated_at: now()]
|
||||
])
|
||||
|> case do
|
||||
{1, _} -> :ok
|
||||
_ -> {:error, :db_error}
|
||||
end
|
||||
broadcast_error topic: "conversations" do
|
||||
case Ecto.UUID.dump(ai_provider_id) do
|
||||
{:ok, binary_id} ->
|
||||
sql = """
|
||||
INSERT INTO conversations (name, ai_provider_id, inserted_at, updated_at)
|
||||
VALUES ($1, $2, $3, $4)
|
||||
"""
|
||||
|
||||
:error ->
|
||||
{:error, :invalid_uuid}
|
||||
timestamp = now()
|
||||
params = [name, binary_id, timestamp, timestamp]
|
||||
|
||||
Ecto.Adapters.SQL.query!(Repo, sql, params)
|
||||
:ok
|
||||
|
||||
:error ->
|
||||
{:error, :invalid_uuid}
|
||||
end
|
||||
end
|
||||
rescue
|
||||
e in Ecto.ConstraintError ->
|
||||
if e.constraint == "conversations_name_index",
|
||||
do: {:error, :already_exists},
|
||||
else: {:error, :db_error}
|
||||
end
|
||||
|
||||
def find_id(name) do
|
||||
case Repo.one(from(c in ConversationSchema, where: c.name == ^name, select: c.id)) do
|
||||
nil -> {:error, :not_found}
|
||||
id -> {:ok, id}
|
||||
broadcast_error topic: "conversations" do
|
||||
sql = "SELECT id FROM conversations WHERE name = $1 LIMIT 1"
|
||||
|
||||
case Ecto.Adapters.SQL.query!(Repo, sql, [name]) do
|
||||
%{rows: []} -> {:error, :not_found}
|
||||
%{rows: [[id] | _]} -> {:ok, id}
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
32
lib/elixir_ai/data/data.ex
Normal file
32
lib/elixir_ai/data/data.ex
Normal file
@@ -0,0 +1,32 @@
|
||||
defmodule ElixirAi.Data do
|
||||
defmacro __using__(_opts) do
|
||||
quote do
|
||||
import ElixirAi.Data
|
||||
require Logger
|
||||
end
|
||||
end
|
||||
|
||||
defmacro broadcast_error(opts, do: block) do
|
||||
topic = Keyword.get(opts, :topic)
|
||||
build_with_db(block, topic)
|
||||
end
|
||||
|
||||
defp build_with_db(block, topic) do
|
||||
quote do
|
||||
try do
|
||||
unquote(block)
|
||||
rescue
|
||||
exception ->
|
||||
Logger.error("Database error: #{Exception.message(exception)}")
|
||||
|
||||
Phoenix.PubSub.broadcast(
|
||||
ElixirAi.PubSub,
|
||||
unquote(topic),
|
||||
{:db_error, Exception.message(exception)}
|
||||
)
|
||||
|
||||
{:error, :db_error}
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
26
lib/elixir_ai/data/db_helpers.ex
Normal file
26
lib/elixir_ai/data/db_helpers.ex
Normal file
@@ -0,0 +1,26 @@
|
||||
defmodule ElixirAi.Data.DbHelpers do
|
||||
@get_named_param ~r/\$\((\w+)\)/
|
||||
|
||||
def named_params_to_positional_params(query, params) do
|
||||
param_occurrences = Regex.scan(@get_named_param, query)
|
||||
|
||||
{param_to_index, ordered_values} =
|
||||
param_occurrences
|
||||
|> Enum.reduce({%{}, []}, fn [_full_match, param_name], {index_map, values} ->
|
||||
if Map.has_key?(index_map, param_name) do
|
||||
{index_map, values}
|
||||
else
|
||||
next_index = map_size(index_map) + 1
|
||||
param_value = Map.fetch!(params, param_name)
|
||||
{Map.put(index_map, param_name, next_index), values ++ [param_value]}
|
||||
end
|
||||
end)
|
||||
|
||||
positional_sql =
|
||||
Regex.replace(@get_named_param, query, fn _full_match, param_name ->
|
||||
"$#{param_to_index[param_name]}"
|
||||
end)
|
||||
|
||||
{positional_sql, ordered_values}
|
||||
end
|
||||
end
|
||||
@@ -1,50 +1,95 @@
|
||||
defmodule ElixirAi.Message do
|
||||
import Ecto.Query
|
||||
use ElixirAi.Data
|
||||
alias ElixirAi.Repo
|
||||
alias ElixirAi.Data.MessageSchema
|
||||
|
||||
def load_for_conversation(conversation_id) do
|
||||
Repo.all(
|
||||
from m in MessageSchema,
|
||||
where: m.conversation_id == ^conversation_id,
|
||||
order_by: m.id,
|
||||
select: %{
|
||||
role: m.role,
|
||||
content: m.content,
|
||||
reasoning_content: m.reasoning_content,
|
||||
tool_calls: m.tool_calls,
|
||||
tool_call_id: m.tool_call_id
|
||||
}
|
||||
)
|
||||
|> Enum.map(&decode_message/1)
|
||||
def load_for_conversation(conversation_id, topic: topic) do
|
||||
broadcast_error topic: topic do
|
||||
with {:ok, db_conversation_id} <- dump_uuid(conversation_id) do
|
||||
sql = """
|
||||
SELECT role, content, reasoning_content, tool_calls, tool_call_id
|
||||
FROM messages
|
||||
WHERE conversation_id = $1
|
||||
ORDER BY id
|
||||
"""
|
||||
|
||||
result = Ecto.Adapters.SQL.query!(Repo, sql, [db_conversation_id])
|
||||
|
||||
Enum.map(result.rows, fn row ->
|
||||
raw = %{
|
||||
role: Enum.at(row, 0),
|
||||
content: Enum.at(row, 1),
|
||||
reasoning_content: Enum.at(row, 2),
|
||||
tool_calls: Enum.at(row, 3),
|
||||
tool_call_id: Enum.at(row, 4)
|
||||
}
|
||||
|
||||
case Zoi.parse(MessageSchema.schema(), raw) do
|
||||
{:ok, _valid} ->
|
||||
struct(MessageSchema, decode_message(raw))
|
||||
|
||||
{:error, errors} ->
|
||||
Logger.error("Invalid message data from DB: #{inspect(errors)}")
|
||||
raise ArgumentError, "Invalid message data: #{inspect(errors)}"
|
||||
end
|
||||
end)
|
||||
else
|
||||
:error -> []
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def insert(conversation_id, message) do
|
||||
Repo.insert_all("messages", [
|
||||
[
|
||||
conversation_id: conversation_id,
|
||||
role: to_string(message.role),
|
||||
content: message[:content],
|
||||
reasoning_content: message[:reasoning_content],
|
||||
tool_calls: encode_tool_calls(message[:tool_calls]),
|
||||
tool_call_id: message[:tool_call_id],
|
||||
inserted_at: DateTime.truncate(DateTime.utc_now(), :second)
|
||||
]
|
||||
])
|
||||
def insert(conversation_id, message, topic: topic) do
|
||||
broadcast_error topic: topic do
|
||||
with {:ok, db_conversation_id} <- dump_uuid(conversation_id) do
|
||||
sql = """
|
||||
INSERT INTO messages (
|
||||
conversation_id, role, content, reasoning_content,
|
||||
tool_calls, tool_call_id, inserted_at
|
||||
) VALUES ($1, $2, $3, $4, $5, $6, $7)
|
||||
"""
|
||||
|
||||
params = [
|
||||
db_conversation_id,
|
||||
to_string(message.role),
|
||||
message[:content],
|
||||
message[:reasoning_content],
|
||||
encode_tool_calls(message[:tool_calls]),
|
||||
message[:tool_call_id],
|
||||
DateTime.truncate(DateTime.utc_now(), :second)
|
||||
]
|
||||
|
||||
Ecto.Adapters.SQL.query!(Repo, sql, params)
|
||||
Logger.debug("Inserted message for conversation_id=#{Ecto.UUID.cast!(conversation_id)}")
|
||||
{:ok, 1}
|
||||
else
|
||||
:error ->
|
||||
Logger.error("Invalid conversation_id for message insert: #{inspect(conversation_id)}")
|
||||
{:error, :invalid_conversation_id}
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
defp encode_tool_calls(nil), do: nil
|
||||
defp encode_tool_calls(calls), do: Jason.encode!(calls)
|
||||
|
||||
defp dump_uuid(id) when is_binary(id) and byte_size(id) == 16, do: {:ok, id}
|
||||
defp dump_uuid(id) when is_binary(id), do: Ecto.UUID.dump(id)
|
||||
defp dump_uuid(_), do: :error
|
||||
|
||||
defp decode_message(row) do
|
||||
row
|
||||
|> Map.update!(:role, &String.to_existing_atom/1)
|
||||
|> Map.update(:tool_calls, nil, fn
|
||||
nil -> nil
|
||||
json when is_binary(json) ->
|
||||
json |> Jason.decode!() |> Enum.map(&atomize_keys/1)
|
||||
already_decoded -> Enum.map(already_decoded, &atomize_keys/1)
|
||||
end)
|
||||
nil ->
|
||||
nil
|
||||
|
||||
json when is_binary(json) ->
|
||||
json |> Jason.decode!() |> Enum.map(&atomize_keys/1)
|
||||
|
||||
already_decoded ->
|
||||
Enum.map(already_decoded, &atomize_keys/1)
|
||||
end)
|
||||
|> drop_nil_fields()
|
||||
end
|
||||
|
||||
|
||||
@@ -1,15 +1,21 @@
|
||||
defmodule ElixirAi.Data.AiProviderSchema do
|
||||
use Ecto.Schema
|
||||
defstruct [:id, :name, :model_name, :api_token, :completions_url, :inserted_at, :updated_at]
|
||||
|
||||
@primary_key {:id, :binary_id, autogenerate: true}
|
||||
@foreign_key_type :binary_id
|
||||
def schema do
|
||||
Zoi.object(%{
|
||||
id: Zoi.string(),
|
||||
name: Zoi.string(),
|
||||
model_name: Zoi.string(),
|
||||
api_token: Zoi.string(),
|
||||
completions_url: Zoi.string()
|
||||
})
|
||||
end
|
||||
|
||||
schema "ai_providers" do
|
||||
field(:name, :string)
|
||||
field(:model_name, :string)
|
||||
field(:api_token, :string)
|
||||
field(:completions_url, :string)
|
||||
|
||||
timestamps(type: :utc_datetime)
|
||||
def partial_schema do
|
||||
Zoi.object(%{
|
||||
id: Zoi.string(),
|
||||
name: Zoi.string(),
|
||||
model_name: Zoi.string()
|
||||
})
|
||||
end
|
||||
end
|
||||
|
||||
@@ -1,13 +1,11 @@
|
||||
defmodule ElixirAi.Data.ConversationSchema do
|
||||
use Ecto.Schema
|
||||
defstruct [:id, :name, :ai_provider_id, :inserted_at, :updated_at]
|
||||
|
||||
@primary_key {:id, :binary_id, autogenerate: true}
|
||||
@foreign_key_type :binary_id
|
||||
|
||||
schema "conversations" do
|
||||
field(:name, :string)
|
||||
belongs_to(:ai_provider, ElixirAi.Data.AiProviderSchema, type: :binary_id)
|
||||
|
||||
timestamps(type: :utc_datetime)
|
||||
def schema do
|
||||
Zoi.object(%{
|
||||
id: Zoi.string(),
|
||||
name: Zoi.string(),
|
||||
ai_provider_id: Zoi.string()
|
||||
})
|
||||
end
|
||||
end
|
||||
|
||||
@@ -1,16 +1,18 @@
|
||||
defmodule ElixirAi.Data.MessageSchema do
|
||||
use Ecto.Schema
|
||||
defstruct [
|
||||
:id,
|
||||
:conversation_id,
|
||||
:role,
|
||||
:content,
|
||||
:reasoning_content,
|
||||
:tool_calls,
|
||||
:tool_call_id,
|
||||
:inserted_at
|
||||
]
|
||||
|
||||
@primary_key {:id, :id, autogenerate: true}
|
||||
|
||||
schema "messages" do
|
||||
belongs_to(:conversation, ElixirAi.Data.ConversationSchema, type: :binary_id)
|
||||
field(:role, :string)
|
||||
field(:content, :string)
|
||||
field(:reasoning_content, :string)
|
||||
field(:tool_calls, :map)
|
||||
field(:tool_call_id, :string)
|
||||
|
||||
timestamps(inserted_at: :inserted_at, updated_at: false, type: :utc_datetime)
|
||||
def schema do
|
||||
Zoi.object(%{
|
||||
role: Zoi.string()
|
||||
})
|
||||
end
|
||||
end
|
||||
|
||||
Reference in New Issue
Block a user