database updates
Some checks failed
CI/CD Pipeline / build (push) Failing after 3s

This commit is contained in:
2026-03-12 15:19:57 -06:00
parent 4dc4814b2f
commit b5504dbdca
9 changed files with 286 additions and 248 deletions

View File

@@ -26,3 +26,6 @@ elixir clustering examples and architecture: <https://oneuptime.com/blog/post/20
process groups for distributing workers across cluster: <https://memo.d.foundation/topics/elixir/pg-in-elixir> process groups for distributing workers across cluster: <https://memo.d.foundation/topics/elixir/pg-in-elixir>
zoi validation library: <https://elixirforum.com/t/zoi-schema-validation-library-inspired-by-zod/72108/17>

View File

@@ -1,13 +1,38 @@
defmodule ElixirAi.AiProvider do defmodule ElixirAi.AiProvider do
use ElixirAi.Data alias ElixirAi.Data.DbHelpers
alias ElixirAi.Repo require Logger
alias ElixirAi.Data.AiProviderSchema
defmodule AiProviderSchema do
defstruct [:id, :name, :model_name, :api_token, :completions_url]
def schema do
Zoi.object(%{
id: Zoi.optional(Zoi.string()),
name: Zoi.string(),
model_name: Zoi.string(),
api_token: Zoi.string(),
completions_url: Zoi.string()
})
end
def partial_schema do
Zoi.object(%{
id: Zoi.optional(Zoi.string()),
name: Zoi.string(),
model_name: Zoi.string()
})
end
end
def all do def all do
broadcast_error topic: "ai_providers" do
sql = "SELECT id, name, model_name FROM ai_providers" sql = "SELECT id, name, model_name FROM ai_providers"
result = Ecto.Adapters.SQL.query!(Repo, sql, []) params = %{}
case DbHelpers.run_sql(sql, params, "ai_providers") do
{:error, :db_error} ->
[]
result ->
results = results =
Enum.map(result.rows, fn [id, name, model_name] -> Enum.map(result.rows, fn [id, name, model_name] ->
attrs = %{id: id, name: name, model_name: model_name} |> convert_id_to_string() attrs = %{id: id, name: name, model_name: model_name} |> convert_id_to_string()
@@ -36,18 +61,27 @@ defmodule ElixirAi.AiProvider do
defp convert_id_to_string(provider), do: provider defp convert_id_to_string(provider), do: provider
def create(attrs) do def create(attrs) do
broadcast_error topic: "ai_providers" do
now = DateTime.truncate(DateTime.utc_now(), :second) now = DateTime.truncate(DateTime.utc_now(), :second)
sql = """ sql = """
INSERT INTO ai_providers (name, model_name, api_token, completions_url, inserted_at, updated_at) INSERT INTO ai_providers (name, model_name, api_token, completions_url, inserted_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6) VALUES ($(name), $(model_name), $(api_token), $(completions_url), $(inserted_at), $(updated_at))
""" """
params = [attrs.name, attrs.model_name, attrs.api_token, attrs.completions_url, now, now] params = %{
"name" => attrs.name,
"model_name" => attrs.model_name,
"api_token" => attrs.api_token,
"completions_url" => attrs.completions_url,
"inserted_at" => now,
"updated_at" => now
}
Ecto.Adapters.SQL.query!(Repo, sql, params) case DbHelpers.run_sql(sql, params, "ai_providers") do
{:error, :db_error} ->
{:error, :db_error}
_result ->
Phoenix.PubSub.broadcast( Phoenix.PubSub.broadcast(
ElixirAi.PubSub, ElixirAi.PubSub,
"ai_providers", "ai_providers",
@@ -59,15 +93,19 @@ defmodule ElixirAi.AiProvider do
end end
def find_by_name(name) do def find_by_name(name) do
broadcast_error topic: "ai_providers" do
sql = """ sql = """
SELECT id, name, model_name, api_token, completions_url SELECT id, name, model_name, api_token, completions_url
FROM ai_providers FROM ai_providers
WHERE name = $1 WHERE name = $(name)
LIMIT 1 LIMIT 1
""" """
case Ecto.Adapters.SQL.query!(Repo, sql, [name]) do params = %{"name" => name}
case DbHelpers.run_sql(sql, params, "ai_providers") do
{:error, :db_error} ->
{:error, :db_error}
%{rows: []} -> %{rows: []} ->
{:error, :not_found} {:error, :not_found}
@@ -92,13 +130,16 @@ defmodule ElixirAi.AiProvider do
end end
end end
end end
end
def ensure_default_provider do def ensure_default_provider do
broadcast_error topic: "ai_providers" do
sql = "SELECT COUNT(*) FROM ai_providers" sql = "SELECT COUNT(*) FROM ai_providers"
result = Ecto.Adapters.SQL.query!(Repo, sql, []) params = %{}
case DbHelpers.run_sql(sql, params, "ai_providers") do
{:error, :db_error} ->
{:error, :db_error}
result ->
case result.rows do case result.rows do
[[0]] -> [[0]] ->
attrs = %{ attrs = %{

View File

@@ -1,6 +1,6 @@
defmodule ElixirAi.Conversation do defmodule ElixirAi.Conversation do
use ElixirAi.Data alias ElixirAi.Data.DbHelpers
alias ElixirAi.Repo require Logger
defmodule Provider do defmodule Provider do
defstruct [:name, :model_name, :api_token, :completions_url] defstruct [:name, :model_name, :api_token, :completions_url]
@@ -33,15 +33,19 @@ defmodule ElixirAi.Conversation do
end end
def all_names do def all_names do
broadcast_error topic: "conversations" do
sql = """ sql = """
SELECT c.name, p.name, p.model_name, p.api_token, p.completions_url SELECT c.name, p.name, p.model_name, p.api_token, p.completions_url
FROM conversations c FROM conversations c
LEFT JOIN ai_providers p ON c.ai_provider_id = p.id LEFT JOIN ai_providers p ON c.ai_provider_id = p.id
""" """
result = Ecto.Adapters.SQL.query!(Repo, sql, []) params = %{}
case DbHelpers.run_sql(sql, params, "conversations") do
{:error, :db_error} ->
[]
result ->
Enum.map(result.rows, fn [name, provider_name, model_name, api_token, completions_url] -> Enum.map(result.rows, fn [name, provider_name, model_name, api_token, completions_url] ->
attrs = %{ attrs = %{
name: name, name: name,
@@ -55,7 +59,10 @@ defmodule ElixirAi.Conversation do
case Zoi.parse(ConversationInfo.schema(), attrs) do case Zoi.parse(ConversationInfo.schema(), attrs) do
{:ok, valid} -> {:ok, valid} ->
struct(ConversationInfo, Map.put(valid, :provider, struct(Provider, valid.provider))) struct(
ConversationInfo,
Map.put(valid, :provider, struct(Provider, valid.provider))
)
{:error, errors} -> {:error, errors} ->
Logger.error("Invalid conversation data: #{inspect(errors)}") Logger.error("Invalid conversation data: #{inspect(errors)}")
@@ -66,34 +73,48 @@ defmodule ElixirAi.Conversation do
end end
def create(name, ai_provider_id) when is_binary(ai_provider_id) do def create(name, ai_provider_id) when is_binary(ai_provider_id) do
broadcast_error topic: "conversations" do
case Ecto.UUID.dump(ai_provider_id) do case Ecto.UUID.dump(ai_provider_id) do
{:ok, binary_id} -> {:ok, binary_id} ->
sql = """ sql = """
INSERT INTO conversations (name, ai_provider_id, inserted_at, updated_at) INSERT INTO conversations (name, ai_provider_id, inserted_at, updated_at)
VALUES ($1, $2, $3, $4) VALUES ($(name), $(ai_provider_id), $(inserted_at), $(updated_at))
""" """
timestamp = now() timestamp = now()
params = [name, binary_id, timestamp, timestamp]
Ecto.Adapters.SQL.query!(Repo, sql, params) params = %{
"name" => name,
"ai_provider_id" => binary_id,
"inserted_at" => timestamp,
"updated_at" => timestamp
}
case DbHelpers.run_sql(sql, params, "conversations") do
{:error, :db_error} ->
{:error, :db_error}
_result ->
:ok :ok
end
:error -> :error ->
{:error, :invalid_uuid} {:error, :invalid_uuid}
end end
end end
end
def find_id(name) do def find_id(name) do
broadcast_error topic: "conversations" do sql = "SELECT id FROM conversations WHERE name = $(name) LIMIT 1"
sql = "SELECT id FROM conversations WHERE name = $1 LIMIT 1" params = %{"name" => name}
case Ecto.Adapters.SQL.query!(Repo, sql, [name]) do case DbHelpers.run_sql(sql, params, "conversations") do
%{rows: []} -> {:error, :not_found} {:error, :db_error} ->
%{rows: [[id] | _]} -> {:ok, id} {:error, :db_error}
end
%{rows: []} ->
{:error, :not_found}
%{rows: [[id] | _]} ->
{:ok, id}
end end
end end

View File

@@ -1,32 +0,0 @@
defmodule ElixirAi.Data do
defmacro __using__(_opts) do
quote do
import ElixirAi.Data
require Logger
end
end
defmacro broadcast_error(opts, do: block) do
topic = Keyword.get(opts, :topic)
build_with_db(block, topic)
end
defp build_with_db(block, topic) do
quote do
try do
unquote(block)
rescue
exception ->
Logger.error("Database error: #{Exception.message(exception)}")
Phoenix.PubSub.broadcast(
ElixirAi.PubSub,
unquote(topic),
{:db_error, Exception.message(exception)}
)
{:error, :db_error}
end
end
end
end

View File

@@ -1,6 +1,26 @@
defmodule ElixirAi.Data.DbHelpers do defmodule ElixirAi.Data.DbHelpers do
require Logger
@get_named_param ~r/\$\((\w+)\)/ @get_named_param ~r/\$\((\w+)\)/
def run_sql(sql, params, topic) do
{sql, params} = named_params_to_positional_params(sql, params)
try do
Ecto.Adapters.SQL.query!(ElixirAi.Repo, sql, params)
rescue
exception ->
Logger.error("Database error: #{Exception.message(exception)}")
Phoenix.PubSub.broadcast(
ElixirAi.PubSub,
topic,
{:db_error, Exception.message(exception)}
)
{:error, :db_error}
end
end
def named_params_to_positional_params(query, params) do def named_params_to_positional_params(query, params) do
param_occurrences = Regex.scan(@get_named_param, query) param_occurrences = Regex.scan(@get_named_param, query)

View File

@@ -1,20 +1,37 @@
defmodule ElixirAi.Message do defmodule ElixirAi.Message do
use ElixirAi.Data alias ElixirAi.Data.DbHelpers
alias ElixirAi.Repo require Logger
alias ElixirAi.Data.MessageSchema
def load_for_conversation(conversation_id, topic: topic) do defmodule MessageSchema do
broadcast_error topic: topic do defstruct [:role, :content, :reasoning_content, :tool_calls, :tool_call_id]
with {:ok, db_conversation_id} <- dump_uuid(conversation_id) do
def schema do
Zoi.object(%{
role: Zoi.enum([:user, :assistant, :tool]),
content: Zoi.optional(Zoi.string()),
reasoning_content: Zoi.optional(Zoi.string()),
tool_calls: Zoi.optional(Zoi.any()),
tool_call_id: Zoi.optional(Zoi.string())
})
end
end
def load_for_conversation(conversation_id, topic: topic)
when is_binary(conversation_id) and byte_size(conversation_id) == 16 do
sql = """ sql = """
SELECT role, content, reasoning_content, tool_calls, tool_call_id SELECT role, content, reasoning_content, tool_calls, tool_call_id
FROM messages FROM messages
WHERE conversation_id = $1 WHERE conversation_id = $(conversation_id)
ORDER BY id ORDER BY id
""" """
result = Ecto.Adapters.SQL.query!(Repo, sql, [db_conversation_id]) params = %{"conversation_id" => conversation_id}
case DbHelpers.run_sql(sql, params, topic) do
{:error, :db_error} ->
[]
result ->
Enum.map(result.rows, fn row -> Enum.map(result.rows, fn row ->
raw = %{ raw = %{
role: Enum.at(row, 0), role: Enum.at(row, 0),
@@ -24,51 +41,69 @@ defmodule ElixirAi.Message do
tool_call_id: Enum.at(row, 4) tool_call_id: Enum.at(row, 4)
} }
case Zoi.parse(MessageSchema.schema(), raw) do decoded = decode_message(raw)
case Zoi.parse(MessageSchema.schema(), decoded) do
{:ok, _valid} -> {:ok, _valid} ->
struct(MessageSchema, decode_message(raw)) struct(MessageSchema, decoded)
{:error, errors} -> {:error, errors} ->
Logger.error("Invalid message data from DB: #{inspect(errors)}") Logger.error("Invalid message data from DB: #{inspect(errors)}")
raise ArgumentError, "Invalid message data: #{inspect(errors)}" raise ArgumentError, "Invalid message data: #{inspect(errors)}"
end end
end) end)
else
:error -> []
end
end end
end end
def insert(conversation_id, message, topic: topic) do def load_for_conversation(conversation_id, topic: topic) do
broadcast_error topic: topic do case dump_uuid(conversation_id) do
with {:ok, db_conversation_id} <- dump_uuid(conversation_id) do {:ok, db_conversation_id} ->
load_for_conversation(db_conversation_id, topic: topic)
:error ->
[]
end
end
def insert(conversation_id, message, topic: topic)
when is_binary(conversation_id) and byte_size(conversation_id) == 16 do
sql = """ sql = """
INSERT INTO messages ( INSERT INTO messages (
conversation_id, role, content, reasoning_content, conversation_id, role, content, reasoning_content,
tool_calls, tool_call_id, inserted_at tool_calls, tool_call_id, inserted_at
) VALUES ($1, $2, $3, $4, $5, $6, $7) ) VALUES ($(conversation_id), $(role), $(content), $(reasoning_content), $(tool_calls), $(tool_call_id), $(inserted_at))
""" """
params = [ params = %{
db_conversation_id, "conversation_id" => conversation_id,
to_string(message.role), "role" => to_string(message.role),
message[:content], "content" => message[:content],
message[:reasoning_content], "reasoning_content" => message[:reasoning_content],
encode_tool_calls(message[:tool_calls]), "tool_calls" => encode_tool_calls(message[:tool_calls]),
message[:tool_call_id], "tool_call_id" => message[:tool_call_id],
DateTime.truncate(DateTime.utc_now(), :second) "inserted_at" => DateTime.truncate(DateTime.utc_now(), :second)
] }
Ecto.Adapters.SQL.query!(Repo, sql, params) case DbHelpers.run_sql(sql, params, topic) do
Logger.debug("Inserted message for conversation_id=#{Ecto.UUID.cast!(conversation_id)}") {:error, :db_error} ->
{:error, :db_error}
_result ->
# Logger.debug("Inserted message for conversation_id=#{Ecto.UUID.cast!(conversation_id)}")
{:ok, 1} {:ok, 1}
else end
end
def insert(conversation_id, message, topic: topic) do
case dump_uuid(conversation_id) do
{:ok, db_conversation_id} ->
insert(db_conversation_id, message, topic: topic)
:error -> :error ->
Logger.error("Invalid conversation_id for message insert: #{inspect(conversation_id)}") Logger.error("Invalid conversation_id for message insert: #{inspect(conversation_id)}")
{:error, :invalid_conversation_id} {:error, :invalid_conversation_id}
end end
end end
end
defp encode_tool_calls(nil), do: nil defp encode_tool_calls(nil), do: nil
defp encode_tool_calls(calls), do: Jason.encode!(calls) defp encode_tool_calls(calls), do: Jason.encode!(calls)

View File

@@ -1,21 +0,0 @@
defmodule ElixirAi.Data.AiProviderSchema do
defstruct [:id, :name, :model_name, :api_token, :completions_url, :inserted_at, :updated_at]
def schema do
Zoi.object(%{
id: Zoi.string(),
name: Zoi.string(),
model_name: Zoi.string(),
api_token: Zoi.string(),
completions_url: Zoi.string()
})
end
def partial_schema do
Zoi.object(%{
id: Zoi.string(),
name: Zoi.string(),
model_name: Zoi.string()
})
end
end

View File

@@ -1,11 +0,0 @@
defmodule ElixirAi.Data.ConversationSchema do
defstruct [:id, :name, :ai_provider_id, :inserted_at, :updated_at]
def schema do
Zoi.object(%{
id: Zoi.string(),
name: Zoi.string(),
ai_provider_id: Zoi.string()
})
end
end

View File

@@ -1,18 +0,0 @@
defmodule ElixirAi.Data.MessageSchema do
defstruct [
:id,
:conversation_id,
:role,
:content,
:reasoning_content,
:tool_calls,
:tool_call_id,
:inserted_at
]
def schema do
Zoi.object(%{
role: Zoi.string()
})
end
end