centralizing pubsub topics
Some checks failed
CI/CD Pipeline / build (push) Failing after 4s

This commit is contained in:
2026-03-13 15:29:20 -06:00
parent 0fd243d259
commit 927c19dd17
10 changed files with 75 additions and 17 deletions

View File

@@ -3,10 +3,9 @@ defmodule ElixirAi.ChatRunner do
use GenServer use GenServer
import ElixirAi.ChatUtils, only: [ai_tool: 1] import ElixirAi.ChatUtils, only: [ai_tool: 1]
alias ElixirAi.{Conversation, Message} alias ElixirAi.{Conversation, Message}
import ElixirAi.PubsubTopics
defp via(name), do: {:via, Horde.Registry, {ElixirAi.ChatRegistry, name}} defp via(name), do: {:via, Horde.Registry, {ElixirAi.ChatRegistry, name}}
defp topic(name), do: "ai_chat:#{name}"
def message_topic(name), do: "conversation_messages:#{name}"
def new_user_message(name, text_content) do def new_user_message(name, text_content) do
GenServer.cast(via(name), {:user_message, text_content}) GenServer.cast(via(name), {:user_message, text_content})
@@ -28,7 +27,7 @@ defmodule ElixirAi.ChatRunner do
def init(name) do def init(name) do
messages = messages =
case Conversation.find_id(name) do case Conversation.find_id(name) do
{:ok, conv_id} -> Message.load_for_conversation(conv_id, topic: message_topic(name)) {:ok, conv_id} -> Message.load_for_conversation(conv_id, topic: conversation_message_topic(name))
_ -> [] _ -> []
end end
@@ -78,7 +77,7 @@ defmodule ElixirAi.ChatRunner do
function: fn %{"color" => color} -> function: fn %{"color" => color} ->
Phoenix.PubSub.broadcast( Phoenix.PubSub.broadcast(
ElixirAi.PubSub, ElixirAi.PubSub,
"ai_chat:#{name}", chat_topic(name),
{:set_background_color, color} {:set_background_color, color}
) )
end, end,
@@ -309,7 +308,7 @@ defmodule ElixirAi.ChatRunner do
{:reply, state.streaming_response, state} {:reply, state.streaming_response, state}
end end
defp broadcast_ui(name, msg), do: Phoenix.PubSub.broadcast(ElixirAi.PubSub, topic(name), msg) defp broadcast_ui(name, msg), do: Phoenix.PubSub.broadcast(ElixirAi.PubSub, chat_topic(name), msg)
defp store_message(name, messages) when is_list(messages) do defp store_message(name, messages) when is_list(messages) do
Enum.each(messages, &store_message(name, &1)) Enum.each(messages, &store_message(name, &1))
@@ -319,7 +318,7 @@ defmodule ElixirAi.ChatRunner do
defp store_message(name, message) do defp store_message(name, message) do
Phoenix.PubSub.broadcast( Phoenix.PubSub.broadcast(
ElixirAi.PubSub, ElixirAi.PubSub,
message_topic(name), conversation_message_topic(name),
{:store_message, name, message} {:store_message, name, message}
) )

View File

@@ -1,6 +1,7 @@
defmodule ElixirAi.ConversationManager do defmodule ElixirAi.ConversationManager do
use GenServer use GenServer
alias ElixirAi.{Conversation, Message} alias ElixirAi.{Conversation, Message}
import ElixirAi.PubsubTopics, only: [conversation_message_topic: 1]
require Logger require Logger
@name {:via, Horde.Registry, {ElixirAi.ChatRegistry, __MODULE__}} @name {:via, Horde.Registry, {ElixirAi.ChatRegistry, __MODULE__}}
@@ -117,7 +118,7 @@ defmodule ElixirAi.ConversationManager do
def handle_info({:store_message, name, message}, %{conversations: conversations} = state) do def handle_info({:store_message, name, message}, %{conversations: conversations} = state) do
case Conversation.find_id(name) do case Conversation.find_id(name) do
{:ok, conv_id} -> {:ok, conv_id} ->
Message.insert(conv_id, message, topic: ElixirAi.ChatRunner.message_topic(name)) Message.insert(conv_id, message, topic: conversation_message_topic(name))
_ -> _ ->
:ok :ok
@@ -164,7 +165,7 @@ defmodule ElixirAi.ConversationManager do
if MapSet.member?(subscriptions, name) do if MapSet.member?(subscriptions, name) do
subscriptions subscriptions
else else
Phoenix.PubSub.subscribe(ElixirAi.PubSub, ElixirAi.ChatRunner.message_topic(name)) Phoenix.PubSub.subscribe(ElixirAi.PubSub, conversation_message_topic(name))
MapSet.put(subscriptions, name) MapSet.put(subscriptions, name)
end end

View File

@@ -1,6 +1,7 @@
defmodule ElixirAi.AiProvider do defmodule ElixirAi.AiProvider do
alias ElixirAi.Data.DbHelpers alias ElixirAi.Data.DbHelpers
require Logger require Logger
import ElixirAi.PubsubTopics
defmodule AiProviderSchema do defmodule AiProviderSchema do
defstruct [:id, :name, :model_name, :api_token, :completions_url] defstruct [:id, :name, :model_name, :api_token, :completions_url]
@@ -77,14 +78,14 @@ defmodule ElixirAi.AiProvider do
"updated_at" => now "updated_at" => now
} }
case DbHelpers.run_sql(sql, params, "ai_providers") do case DbHelpers.run_sql(sql, params, providers_topic()) do
{:error, :db_error} -> {:error, :db_error} ->
{:error, :db_error} {:error, :db_error}
_result -> _result ->
Phoenix.PubSub.broadcast( Phoenix.PubSub.broadcast(
ElixirAi.PubSub, ElixirAi.PubSub,
"ai_providers", providers_topic(),
{:provider_added, attrs} {:provider_added, attrs}
) )
@@ -102,7 +103,7 @@ defmodule ElixirAi.AiProvider do
params = %{"name" => name} params = %{"name" => name}
case DbHelpers.run_sql(sql, params, "ai_providers", AiProviderSchema.schema()) do case DbHelpers.run_sql(sql, params, providers_topic(), AiProviderSchema.schema()) do
{:error, _} -> {:error, :db_error} {:error, _} -> {:error, :db_error}
[] -> {:error, :not_found} [] -> {:error, :not_found}
[row | _] -> {:ok, row |> convert_uuid_to_string() |> then(&struct(AiProviderSchema, &1))} [row | _] -> {:ok, row |> convert_uuid_to_string() |> then(&struct(AiProviderSchema, &1))}
@@ -113,7 +114,7 @@ defmodule ElixirAi.AiProvider do
sql = "SELECT COUNT(*) FROM ai_providers" sql = "SELECT COUNT(*) FROM ai_providers"
params = %{} params = %{}
case DbHelpers.run_sql(sql, params, "ai_providers") do case DbHelpers.run_sql(sql, params, providers_topic()) do
{:error, :db_error} -> {:error, :db_error} ->
{:error, :db_error} {:error, :db_error}

View File

@@ -0,0 +1,6 @@
defmodule ElixirAi.PubsubTopics do
def conversation_message_topic(name), do: "conversation_messages:#{name}"
def chat_topic(name), do: "ai_chat:#{name}"
def providers_topic, do: "providers"
end

View File

@@ -2,10 +2,11 @@ defmodule ElixirAiWeb.AiProvidersLive do
use ElixirAiWeb, :live_component use ElixirAiWeb, :live_component
import ElixirAiWeb.FormComponents import ElixirAiWeb.FormComponents
alias ElixirAi.AiProvider alias ElixirAi.AiProvider
import ElixirAi.PubsubTopics
def update(assigns, socket) do def update(assigns, socket) do
if connected?(socket) do if connected?(socket) do
Phoenix.PubSub.subscribe(ElixirAi.PubSub, "ai_providers") Phoenix.PubSub.subscribe(ElixirAi.PubSub, providers_topic())
end end
{:ok, {:ok,

View File

@@ -5,6 +5,7 @@ defmodule ElixirAiWeb.ChatLive do
import ElixirAiWeb.ChatMessage import ElixirAiWeb.ChatMessage
alias ElixirAi.ChatRunner alias ElixirAi.ChatRunner
alias ElixirAi.ConversationManager alias ElixirAi.ConversationManager
import ElixirAi.PubsubTopics
def valid_background_colors do def valid_background_colors do
[ [
@@ -21,8 +22,10 @@ defmodule ElixirAiWeb.ChatLive do
def mount(%{"name" => name}, _session, socket) do def mount(%{"name" => name}, _session, socket) do
case ConversationManager.open_conversation(name) do case ConversationManager.open_conversation(name) do
{:ok, _pid} -> {:ok, _pid} ->
if connected?(socket), if connected?(socket) do
do: Phoenix.PubSub.subscribe(ElixirAi.PubSub, "ai_chat:#{name}") Phoenix.PubSub.subscribe(ElixirAi.PubSub, chat_topic(name))
Phoenix.PubSub.subscribe(ElixirAi.PubSub, conversation_message_topic(name))
end
conversation = ChatRunner.get_conversation(name) conversation = ChatRunner.get_conversation(name)
@@ -32,7 +35,8 @@ defmodule ElixirAiWeb.ChatLive do
|> assign(user_input: "") |> assign(user_input: "")
|> assign(messages: conversation.messages) |> assign(messages: conversation.messages)
|> assign(streaming_response: conversation.streaming_response) |> assign(streaming_response: conversation.streaming_response)
|> assign(background_color: "bg-cyan-950/30")} |> assign(background_color: "bg-cyan-950/30")
|> assign(db_error: nil)}
{:error, :not_found} -> {:error, :not_found} ->
{:ok, push_navigate(socket, to: "/")} {:ok, push_navigate(socket, to: "/")}
@@ -48,6 +52,11 @@ defmodule ElixirAiWeb.ChatLive do
</.link> </.link>
{@conversation_name} {@conversation_name}
</div> </div>
<%= if @db_error do %>
<div class="mx-4 mt-2 px-3 py-2 rounded text-sm text-red-400 bg-red-950/40" role="alert">
Database error: {@db_error}
</div>
<% end %>
<div <div
id="chat-messages" id="chat-messages"
phx-hook="ScrollBottom" phx-hook="ScrollBottom"
@@ -191,6 +200,10 @@ defmodule ElixirAiWeb.ChatLive do
|> assign(streaming_response: nil)} |> assign(streaming_response: nil)}
end end
def handle_info({:db_error, reason}, socket) do
{:noreply, assign(socket, db_error: reason)}
end
def handle_info({:set_background_color, color}, socket) do def handle_info({:set_background_color, color}, socket) do
Logger.info("setting background color to #{color}") Logger.info("setting background color to #{color}")
{:noreply, assign(socket, background_color: color)} {:noreply, assign(socket, background_color: color)}

View File

@@ -3,10 +3,11 @@ defmodule ElixirAiWeb.HomeLive do
import ElixirAiWeb.FormComponents import ElixirAiWeb.FormComponents
alias ElixirAi.{ConversationManager, AiProvider} alias ElixirAi.{ConversationManager, AiProvider}
require Logger require Logger
import ElixirAi.PubsubTopics
def mount(_params, _session, socket) do def mount(_params, _session, socket) do
if connected?(socket) do if connected?(socket) do
Phoenix.PubSub.subscribe(ElixirAi.PubSub, "ai_providers") Phoenix.PubSub.subscribe(ElixirAi.PubSub, providers_topic())
send(self(), :load_data) send(self(), :load_data)
end end

View File

@@ -0,0 +1,26 @@
defmodule ElixirAiWeb.ChatLiveTest do
use ElixirAiWeb.ConnCase, async: false
import ElixirAi.PubsubTopics, only: [conversation_message_topic: 1]
setup do
stub(ElixirAi.ConversationManager, :open_conversation, fn _name -> {:ok, self()} end)
stub(ElixirAi.ChatRunner, :get_conversation, fn _name ->
%{messages: [], streaming_response: nil}
end)
:ok
end
test "displays a db error when a db_error message is broadcast", %{conn: conn} do
{:ok, view, _html} = live(conn, ~p"/chat/test_conv")
Phoenix.PubSub.broadcast(
ElixirAi.PubSub,
conversation_message_topic("test_conv"),
{:db_error, "unique constraint violated"}
)
assert render(view) =~ "unique constraint violated"
end
end

View File

@@ -16,6 +16,7 @@ defmodule ElixirAiWeb.ConnCase do
""" """
use ExUnit.CaseTemplate use ExUnit.CaseTemplate
use Mimic
using do using do
quote do quote do
@@ -27,11 +28,18 @@ defmodule ElixirAiWeb.ConnCase do
# Import conveniences for testing with connections # Import conveniences for testing with connections
import Plug.Conn import Plug.Conn
import Phoenix.ConnTest import Phoenix.ConnTest
import Phoenix.LiveViewTest
import ElixirAiWeb.ConnCase import ElixirAiWeb.ConnCase
use Mimic
end end
end end
setup :set_mimic_global
setup _tags do setup _tags do
stub(ElixirAi.Data.DbHelpers, :run_sql, fn _sql, _params, _topic -> [] end)
stub(ElixirAi.Data.DbHelpers, :run_sql, fn _sql, _params, _topic, _schema -> [] end)
stub(ElixirAi.ChatUtils, :request_ai_response, fn _server, _messages, _tools -> :ok end)
{:ok, conn: Phoenix.ConnTest.build_conn()} {:ok, conn: Phoenix.ConnTest.build_conn()}
end end
end end

View File

@@ -1,3 +1,5 @@
ExUnit.start() ExUnit.start()
Mimic.copy(ElixirAi.Data.DbHelpers) Mimic.copy(ElixirAi.Data.DbHelpers)
Mimic.copy(ElixirAi.ChatUtils) Mimic.copy(ElixirAi.ChatUtils)
Mimic.copy(ElixirAi.ConversationManager)
Mimic.copy(ElixirAi.ChatRunner)