This commit is contained in:
@@ -25,10 +25,15 @@ defmodule ElixirAi.ChatRunner do
|
||||
end
|
||||
|
||||
def init(name) do
|
||||
Phoenix.PubSub.subscribe(ElixirAi.PubSub, conversation_message_topic(name))
|
||||
|
||||
messages =
|
||||
case Conversation.find_id(name) do
|
||||
{:ok, conv_id} -> Message.load_for_conversation(conv_id, topic: conversation_message_topic(name))
|
||||
_ -> []
|
||||
{:ok, conv_id} ->
|
||||
Message.load_for_conversation(conv_id, topic: conversation_message_topic(name))
|
||||
|
||||
_ ->
|
||||
[]
|
||||
end
|
||||
|
||||
last_message = List.last(messages)
|
||||
@@ -106,6 +111,16 @@ defmodule ElixirAi.ChatRunner do
|
||||
{:noreply, new_state}
|
||||
end
|
||||
|
||||
@ai_stream_events [
|
||||
:ai_text_chunk,
|
||||
:ai_reasoning_chunk,
|
||||
:ai_text_stream_finish,
|
||||
:ai_tool_call_start,
|
||||
:ai_tool_call_middle,
|
||||
:ai_tool_call_end,
|
||||
:tool_response
|
||||
]
|
||||
|
||||
def handle_info({:start_new_ai_response, id}, state) do
|
||||
starting_response = %{id: id, reasoning_content: "", content: "", tool_calls: []}
|
||||
broadcast_ui(state.name, {:start_ai_response_stream, starting_response})
|
||||
@@ -117,9 +132,10 @@ defmodule ElixirAi.ChatRunner do
|
||||
msg,
|
||||
%{streaming_response: %{id: current_id}} = state
|
||||
)
|
||||
when is_tuple(msg) and tuple_size(msg) in [2, 3] and elem(msg, 1) != current_id do
|
||||
when is_tuple(msg) and tuple_size(msg) in [2, 3] and
|
||||
elem(msg, 0) in @ai_stream_events and elem(msg, 1) != current_id do
|
||||
Logger.warning(
|
||||
"Received #{elem(msg, 0)} for id #{elem(msg, 1)} but current streaming response is for id #{current_id}"
|
||||
"Received #{elem(msg, 0)} for id #{inspect(elem(msg, 1))} but current streaming response is for id #{inspect(current_id)}"
|
||||
)
|
||||
|
||||
{:noreply, state}
|
||||
@@ -220,8 +236,6 @@ defmodule ElixirAi.ChatRunner do
|
||||
end
|
||||
|
||||
def handle_info({:ai_tool_call_end, id}, state) do
|
||||
# Logger.info("ending tool call with tools: #{inspect(state.streaming_response.tool_calls)}")
|
||||
|
||||
tool_request_message = %{
|
||||
role: :assistant,
|
||||
content: state.streaming_response.content,
|
||||
@@ -294,6 +308,15 @@ defmodule ElixirAi.ChatRunner do
|
||||
}}
|
||||
end
|
||||
|
||||
def handle_info({:db_error, reason}, state) do
|
||||
broadcast_ui(state.name, {:db_error, reason})
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
def handle_info({:store_message, _name, _message}, state) do
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
def handle_info({:ai_request_error, reason}, state) do
|
||||
Logger.error("AI request error: #{inspect(reason)}")
|
||||
broadcast_ui(state.name, {:ai_request_error, reason})
|
||||
@@ -308,7 +331,8 @@ defmodule ElixirAi.ChatRunner do
|
||||
{:reply, state.streaming_response, state}
|
||||
end
|
||||
|
||||
defp broadcast_ui(name, msg), do: Phoenix.PubSub.broadcast(ElixirAi.PubSub, chat_topic(name), msg)
|
||||
defp broadcast_ui(name, msg),
|
||||
do: Phoenix.PubSub.broadcast(ElixirAi.PubSub, chat_topic(name), msg)
|
||||
|
||||
defp store_message(name, messages) when is_list(messages) do
|
||||
Enum.each(messages, &store_message(name, &1))
|
||||
|
||||
Reference in New Issue
Block a user