diff --git a/README.md b/README.md index 97caa4a..599b7b4 100644 --- a/README.md +++ b/README.md @@ -26,3 +26,6 @@ elixir clustering examples and architecture: + + +zoi validation library: \ No newline at end of file diff --git a/lib/elixir_ai/data/ai_provider.ex b/lib/elixir_ai/data/ai_provider.ex index e847b7b..f7356a4 100644 --- a/lib/elixir_ai/data/ai_provider.ex +++ b/lib/elixir_ai/data/ai_provider.ex @@ -1,30 +1,55 @@ defmodule ElixirAi.AiProvider do - use ElixirAi.Data - alias ElixirAi.Repo - alias ElixirAi.Data.AiProviderSchema + alias ElixirAi.Data.DbHelpers + require Logger + + defmodule AiProviderSchema do + defstruct [:id, :name, :model_name, :api_token, :completions_url] + + def schema do + Zoi.object(%{ + id: Zoi.optional(Zoi.string()), + name: Zoi.string(), + model_name: Zoi.string(), + api_token: Zoi.string(), + completions_url: Zoi.string() + }) + end + + def partial_schema do + Zoi.object(%{ + id: Zoi.optional(Zoi.string()), + name: Zoi.string(), + model_name: Zoi.string() + }) + end + end def all do - broadcast_error topic: "ai_providers" do - sql = "SELECT id, name, model_name FROM ai_providers" - result = Ecto.Adapters.SQL.query!(Repo, sql, []) + sql = "SELECT id, name, model_name FROM ai_providers" + params = %{} - results = - Enum.map(result.rows, fn [id, name, model_name] -> - attrs = %{id: id, name: name, model_name: model_name} |> convert_id_to_string() + case DbHelpers.run_sql(sql, params, "ai_providers") do + {:error, :db_error} -> + [] - case Zoi.parse(AiProviderSchema.partial_schema(), attrs) do - {:ok, valid} -> - struct(AiProviderSchema, valid) + result -> + results = + Enum.map(result.rows, fn [id, name, model_name] -> + attrs = %{id: id, name: name, model_name: model_name} |> convert_id_to_string() - {:error, errors} -> - Logger.error("Invalid provider data from DB: #{inspect(errors)}") - raise ArgumentError, "Invalid provider data: #{inspect(errors)}" - end - end) + case Zoi.parse(AiProviderSchema.partial_schema(), attrs) do + {:ok, valid} -> + struct(AiProviderSchema, valid) - Logger.debug("AiProvider.all() returning: #{inspect(results)}") + {:error, errors} -> + Logger.error("Invalid provider data from DB: #{inspect(errors)}") + raise ArgumentError, "Invalid provider data: #{inspect(errors)}" + end + end) - results + Logger.debug("AiProvider.all() returning: #{inspect(results)}") + + results end end @@ -36,83 +61,99 @@ defmodule ElixirAi.AiProvider do defp convert_id_to_string(provider), do: provider def create(attrs) do - broadcast_error topic: "ai_providers" do - now = DateTime.truncate(DateTime.utc_now(), :second) + now = DateTime.truncate(DateTime.utc_now(), :second) - sql = """ - INSERT INTO ai_providers (name, model_name, api_token, completions_url, inserted_at, updated_at) - VALUES ($1, $2, $3, $4, $5, $6) - """ + sql = """ + INSERT INTO ai_providers (name, model_name, api_token, completions_url, inserted_at, updated_at) + VALUES ($(name), $(model_name), $(api_token), $(completions_url), $(inserted_at), $(updated_at)) + """ - params = [attrs.name, attrs.model_name, attrs.api_token, attrs.completions_url, now, now] + params = %{ + "name" => attrs.name, + "model_name" => attrs.model_name, + "api_token" => attrs.api_token, + "completions_url" => attrs.completions_url, + "inserted_at" => now, + "updated_at" => now + } - Ecto.Adapters.SQL.query!(Repo, sql, params) + case DbHelpers.run_sql(sql, params, "ai_providers") do + {:error, :db_error} -> + {:error, :db_error} - Phoenix.PubSub.broadcast( - ElixirAi.PubSub, - "ai_providers", - {:provider_added, attrs} - ) + _result -> + Phoenix.PubSub.broadcast( + ElixirAi.PubSub, + "ai_providers", + {:provider_added, attrs} + ) - :ok + :ok end end def find_by_name(name) do - broadcast_error topic: "ai_providers" do - sql = """ - SELECT id, name, model_name, api_token, completions_url - FROM ai_providers - WHERE name = $1 - LIMIT 1 - """ + sql = """ + SELECT id, name, model_name, api_token, completions_url + FROM ai_providers + WHERE name = $(name) + LIMIT 1 + """ - case Ecto.Adapters.SQL.query!(Repo, sql, [name]) do - %{rows: []} -> - {:error, :not_found} + params = %{"name" => name} - %{rows: [[id, name, model_name, api_token, completions_url] | _]} -> - attrs = - %{ - id: id, - name: name, - model_name: model_name, - api_token: api_token, - completions_url: completions_url - } - |> convert_id_to_string() + case DbHelpers.run_sql(sql, params, "ai_providers") do + {:error, :db_error} -> + {:error, :db_error} - case Zoi.parse(AiProviderSchema.schema(), attrs) do - {:ok, valid} -> - {:ok, struct(AiProviderSchema, valid)} + %{rows: []} -> + {:error, :not_found} - {:error, errors} -> - Logger.error("Invalid provider data from DB: #{inspect(errors)}") - {:error, :invalid_data} - end - end + %{rows: [[id, name, model_name, api_token, completions_url] | _]} -> + attrs = + %{ + id: id, + name: name, + model_name: model_name, + api_token: api_token, + completions_url: completions_url + } + |> convert_id_to_string() + + case Zoi.parse(AiProviderSchema.schema(), attrs) do + {:ok, valid} -> + {:ok, struct(AiProviderSchema, valid)} + + {:error, errors} -> + Logger.error("Invalid provider data from DB: #{inspect(errors)}") + {:error, :invalid_data} + end end end def ensure_default_provider do - broadcast_error topic: "ai_providers" do - sql = "SELECT COUNT(*) FROM ai_providers" - result = Ecto.Adapters.SQL.query!(Repo, sql, []) + sql = "SELECT COUNT(*) FROM ai_providers" + params = %{} - case result.rows do - [[0]] -> - attrs = %{ - name: "default", - model_name: Application.fetch_env!(:elixir_ai, :ai_model), - api_token: Application.fetch_env!(:elixir_ai, :ai_token), - completions_url: Application.fetch_env!(:elixir_ai, :ai_endpoint) - } + case DbHelpers.run_sql(sql, params, "ai_providers") do + {:error, :db_error} -> + {:error, :db_error} - create(attrs) + result -> + case result.rows do + [[0]] -> + attrs = %{ + name: "default", + model_name: Application.fetch_env!(:elixir_ai, :ai_model), + api_token: Application.fetch_env!(:elixir_ai, :ai_token), + completions_url: Application.fetch_env!(:elixir_ai, :ai_endpoint) + } - _ -> - :ok - end + create(attrs) + + _ -> + :ok + end end end end diff --git a/lib/elixir_ai/data/conversation.ex b/lib/elixir_ai/data/conversation.ex index 0692025..0ab155f 100644 --- a/lib/elixir_ai/data/conversation.ex +++ b/lib/elixir_ai/data/conversation.ex @@ -1,6 +1,6 @@ defmodule ElixirAi.Conversation do - use ElixirAi.Data - alias ElixirAi.Repo + alias ElixirAi.Data.DbHelpers + require Logger defmodule Provider do defstruct [:name, :model_name, :api_token, :completions_url] @@ -33,67 +33,88 @@ defmodule ElixirAi.Conversation do end def all_names do - broadcast_error topic: "conversations" do - sql = """ - SELECT c.name, p.name, p.model_name, p.api_token, p.completions_url - FROM conversations c - LEFT JOIN ai_providers p ON c.ai_provider_id = p.id - """ + sql = """ + SELECT c.name, p.name, p.model_name, p.api_token, p.completions_url + FROM conversations c + LEFT JOIN ai_providers p ON c.ai_provider_id = p.id + """ - result = Ecto.Adapters.SQL.query!(Repo, sql, []) + params = %{} - Enum.map(result.rows, fn [name, provider_name, model_name, api_token, completions_url] -> - attrs = %{ - name: name, - provider: %{ - name: provider_name, - model_name: model_name, - api_token: api_token, - completions_url: completions_url + case DbHelpers.run_sql(sql, params, "conversations") do + {:error, :db_error} -> + [] + + result -> + Enum.map(result.rows, fn [name, provider_name, model_name, api_token, completions_url] -> + attrs = %{ + name: name, + provider: %{ + name: provider_name, + model_name: model_name, + api_token: api_token, + completions_url: completions_url + } } - } - case Zoi.parse(ConversationInfo.schema(), attrs) do - {:ok, valid} -> - struct(ConversationInfo, Map.put(valid, :provider, struct(Provider, valid.provider))) + case Zoi.parse(ConversationInfo.schema(), attrs) do + {:ok, valid} -> + struct( + ConversationInfo, + Map.put(valid, :provider, struct(Provider, valid.provider)) + ) - {:error, errors} -> - Logger.error("Invalid conversation data: #{inspect(errors)}") - raise ArgumentError, "Invalid conversation data: #{inspect(errors)}" - end - end) + {:error, errors} -> + Logger.error("Invalid conversation data: #{inspect(errors)}") + raise ArgumentError, "Invalid conversation data: #{inspect(errors)}" + end + end) end end def create(name, ai_provider_id) when is_binary(ai_provider_id) do - broadcast_error topic: "conversations" do - case Ecto.UUID.dump(ai_provider_id) do - {:ok, binary_id} -> - sql = """ - INSERT INTO conversations (name, ai_provider_id, inserted_at, updated_at) - VALUES ($1, $2, $3, $4) - """ + case Ecto.UUID.dump(ai_provider_id) do + {:ok, binary_id} -> + sql = """ + INSERT INTO conversations (name, ai_provider_id, inserted_at, updated_at) + VALUES ($(name), $(ai_provider_id), $(inserted_at), $(updated_at)) + """ - timestamp = now() - params = [name, binary_id, timestamp, timestamp] + timestamp = now() - Ecto.Adapters.SQL.query!(Repo, sql, params) - :ok + params = %{ + "name" => name, + "ai_provider_id" => binary_id, + "inserted_at" => timestamp, + "updated_at" => timestamp + } - :error -> - {:error, :invalid_uuid} - end + case DbHelpers.run_sql(sql, params, "conversations") do + {:error, :db_error} -> + {:error, :db_error} + + _result -> + :ok + end + + :error -> + {:error, :invalid_uuid} end end def find_id(name) do - broadcast_error topic: "conversations" do - sql = "SELECT id FROM conversations WHERE name = $1 LIMIT 1" + sql = "SELECT id FROM conversations WHERE name = $(name) LIMIT 1" + params = %{"name" => name} - case Ecto.Adapters.SQL.query!(Repo, sql, [name]) do - %{rows: []} -> {:error, :not_found} - %{rows: [[id] | _]} -> {:ok, id} - end + case DbHelpers.run_sql(sql, params, "conversations") do + {:error, :db_error} -> + {:error, :db_error} + + %{rows: []} -> + {:error, :not_found} + + %{rows: [[id] | _]} -> + {:ok, id} end end diff --git a/lib/elixir_ai/data/data.ex b/lib/elixir_ai/data/data.ex deleted file mode 100644 index 0c8831d..0000000 --- a/lib/elixir_ai/data/data.ex +++ /dev/null @@ -1,32 +0,0 @@ -defmodule ElixirAi.Data do - defmacro __using__(_opts) do - quote do - import ElixirAi.Data - require Logger - end - end - - defmacro broadcast_error(opts, do: block) do - topic = Keyword.get(opts, :topic) - build_with_db(block, topic) - end - - defp build_with_db(block, topic) do - quote do - try do - unquote(block) - rescue - exception -> - Logger.error("Database error: #{Exception.message(exception)}") - - Phoenix.PubSub.broadcast( - ElixirAi.PubSub, - unquote(topic), - {:db_error, Exception.message(exception)} - ) - - {:error, :db_error} - end - end - end -end diff --git a/lib/elixir_ai/data/db_helpers.ex b/lib/elixir_ai/data/db_helpers.ex index 9664597..2f82b80 100644 --- a/lib/elixir_ai/data/db_helpers.ex +++ b/lib/elixir_ai/data/db_helpers.ex @@ -1,6 +1,26 @@ defmodule ElixirAi.Data.DbHelpers do + require Logger @get_named_param ~r/\$\((\w+)\)/ + def run_sql(sql, params, topic) do + {sql, params} = named_params_to_positional_params(sql, params) + + try do + Ecto.Adapters.SQL.query!(ElixirAi.Repo, sql, params) + rescue + exception -> + Logger.error("Database error: #{Exception.message(exception)}") + + Phoenix.PubSub.broadcast( + ElixirAi.PubSub, + topic, + {:db_error, Exception.message(exception)} + ) + + {:error, :db_error} + end + end + def named_params_to_positional_params(query, params) do param_occurrences = Regex.scan(@get_named_param, query) diff --git a/lib/elixir_ai/data/message.ex b/lib/elixir_ai/data/message.ex index 9bc9aff..9966c8f 100644 --- a/lib/elixir_ai/data/message.ex +++ b/lib/elixir_ai/data/message.ex @@ -1,20 +1,37 @@ defmodule ElixirAi.Message do - use ElixirAi.Data - alias ElixirAi.Repo - alias ElixirAi.Data.MessageSchema + alias ElixirAi.Data.DbHelpers + require Logger - def load_for_conversation(conversation_id, topic: topic) do - broadcast_error topic: topic do - with {:ok, db_conversation_id} <- dump_uuid(conversation_id) do - sql = """ - SELECT role, content, reasoning_content, tool_calls, tool_call_id - FROM messages - WHERE conversation_id = $1 - ORDER BY id - """ + defmodule MessageSchema do + defstruct [:role, :content, :reasoning_content, :tool_calls, :tool_call_id] - result = Ecto.Adapters.SQL.query!(Repo, sql, [db_conversation_id]) + def schema do + Zoi.object(%{ + role: Zoi.enum([:user, :assistant, :tool]), + content: Zoi.optional(Zoi.string()), + reasoning_content: Zoi.optional(Zoi.string()), + tool_calls: Zoi.optional(Zoi.any()), + tool_call_id: Zoi.optional(Zoi.string()) + }) + end + end + def load_for_conversation(conversation_id, topic: topic) + when is_binary(conversation_id) and byte_size(conversation_id) == 16 do + sql = """ + SELECT role, content, reasoning_content, tool_calls, tool_call_id + FROM messages + WHERE conversation_id = $(conversation_id) + ORDER BY id + """ + + params = %{"conversation_id" => conversation_id} + + case DbHelpers.run_sql(sql, params, topic) do + {:error, :db_error} -> + [] + + result -> Enum.map(result.rows, fn row -> raw = %{ role: Enum.at(row, 0), @@ -24,49 +41,67 @@ defmodule ElixirAi.Message do tool_call_id: Enum.at(row, 4) } - case Zoi.parse(MessageSchema.schema(), raw) do + decoded = decode_message(raw) + + case Zoi.parse(MessageSchema.schema(), decoded) do {:ok, _valid} -> - struct(MessageSchema, decode_message(raw)) + struct(MessageSchema, decoded) {:error, errors} -> Logger.error("Invalid message data from DB: #{inspect(errors)}") raise ArgumentError, "Invalid message data: #{inspect(errors)}" end end) - else - :error -> [] - end + end + end + + def load_for_conversation(conversation_id, topic: topic) do + case dump_uuid(conversation_id) do + {:ok, db_conversation_id} -> + load_for_conversation(db_conversation_id, topic: topic) + + :error -> + [] + end + end + + def insert(conversation_id, message, topic: topic) + when is_binary(conversation_id) and byte_size(conversation_id) == 16 do + sql = """ + INSERT INTO messages ( + conversation_id, role, content, reasoning_content, + tool_calls, tool_call_id, inserted_at + ) VALUES ($(conversation_id), $(role), $(content), $(reasoning_content), $(tool_calls), $(tool_call_id), $(inserted_at)) + """ + + params = %{ + "conversation_id" => conversation_id, + "role" => to_string(message.role), + "content" => message[:content], + "reasoning_content" => message[:reasoning_content], + "tool_calls" => encode_tool_calls(message[:tool_calls]), + "tool_call_id" => message[:tool_call_id], + "inserted_at" => DateTime.truncate(DateTime.utc_now(), :second) + } + + case DbHelpers.run_sql(sql, params, topic) do + {:error, :db_error} -> + {:error, :db_error} + + _result -> + # Logger.debug("Inserted message for conversation_id=#{Ecto.UUID.cast!(conversation_id)}") + {:ok, 1} end end def insert(conversation_id, message, topic: topic) do - broadcast_error topic: topic do - with {:ok, db_conversation_id} <- dump_uuid(conversation_id) do - sql = """ - INSERT INTO messages ( - conversation_id, role, content, reasoning_content, - tool_calls, tool_call_id, inserted_at - ) VALUES ($1, $2, $3, $4, $5, $6, $7) - """ + case dump_uuid(conversation_id) do + {:ok, db_conversation_id} -> + insert(db_conversation_id, message, topic: topic) - params = [ - db_conversation_id, - to_string(message.role), - message[:content], - message[:reasoning_content], - encode_tool_calls(message[:tool_calls]), - message[:tool_call_id], - DateTime.truncate(DateTime.utc_now(), :second) - ] - - Ecto.Adapters.SQL.query!(Repo, sql, params) - Logger.debug("Inserted message for conversation_id=#{Ecto.UUID.cast!(conversation_id)}") - {:ok, 1} - else - :error -> - Logger.error("Invalid conversation_id for message insert: #{inspect(conversation_id)}") - {:error, :invalid_conversation_id} - end + :error -> + Logger.error("Invalid conversation_id for message insert: #{inspect(conversation_id)}") + {:error, :invalid_conversation_id} end end diff --git a/lib/elixir_ai/data/schemas/ai_provider_schema.ex b/lib/elixir_ai/data/schemas/ai_provider_schema.ex deleted file mode 100644 index 80474f6..0000000 --- a/lib/elixir_ai/data/schemas/ai_provider_schema.ex +++ /dev/null @@ -1,21 +0,0 @@ -defmodule ElixirAi.Data.AiProviderSchema do - defstruct [:id, :name, :model_name, :api_token, :completions_url, :inserted_at, :updated_at] - - def schema do - Zoi.object(%{ - id: Zoi.string(), - name: Zoi.string(), - model_name: Zoi.string(), - api_token: Zoi.string(), - completions_url: Zoi.string() - }) - end - - def partial_schema do - Zoi.object(%{ - id: Zoi.string(), - name: Zoi.string(), - model_name: Zoi.string() - }) - end -end diff --git a/lib/elixir_ai/data/schemas/conversation_schema.ex b/lib/elixir_ai/data/schemas/conversation_schema.ex deleted file mode 100644 index bef6a7e..0000000 --- a/lib/elixir_ai/data/schemas/conversation_schema.ex +++ /dev/null @@ -1,11 +0,0 @@ -defmodule ElixirAi.Data.ConversationSchema do - defstruct [:id, :name, :ai_provider_id, :inserted_at, :updated_at] - - def schema do - Zoi.object(%{ - id: Zoi.string(), - name: Zoi.string(), - ai_provider_id: Zoi.string() - }) - end -end diff --git a/lib/elixir_ai/data/schemas/message_schema.ex b/lib/elixir_ai/data/schemas/message_schema.ex deleted file mode 100644 index 4dbc973..0000000 --- a/lib/elixir_ai/data/schemas/message_schema.ex +++ /dev/null @@ -1,18 +0,0 @@ -defmodule ElixirAi.Data.MessageSchema do - defstruct [ - :id, - :conversation_id, - :role, - :content, - :reasoning_content, - :tool_calls, - :tool_call_id, - :inserted_at - ] - - def schema do - Zoi.object(%{ - role: Zoi.string() - }) - end -end