streaming single conversation, this is sick

This commit is contained in:
2026-03-05 16:11:11 -07:00
parent 76c164d931
commit ea2b6d4cbf
14 changed files with 517 additions and 17 deletions

View File

@@ -8,6 +8,7 @@ defmodule ElixirAi.Application do
ElixirAiWeb.Telemetry,
{DNSCluster, query: Application.get_env(:elixir_ai, :dns_cluster_query) || :ignore},
{Phoenix.PubSub, name: ElixirAi.PubSub},
ElixirAi.ChatRunner,
ElixirAiWeb.Endpoint
]

View File

@@ -0,0 +1,108 @@
defmodule ElixirAi.ChatRunner do
require Logger
use GenServer
import ElixirAi.ChatUtils
@topic "ai_chat"
def new_user_message(text_content) do
GenServer.cast(__MODULE__, {:user_message, text_content})
end
def get_conversation do
GenServer.call(__MODULE__, :get_conversation)
end
def start_link(_opts) do
GenServer.start_link(
__MODULE__,
%{
messages: [],
streaming_response: nil,
turn: :user
},
name: __MODULE__
)
end
def init(state) do
{:ok, state}
end
def handle_cast({:user_message, text_content}, state) do
new_message = %{role: :user, content: text_content}
broadcast({:user_chat_message, new_message})
new_state = %{state | messages: state.messages ++ [new_message], turn: :assistant}
request_ai_response(self(), new_state.messages)
{:noreply, new_state}
end
def handle_info({:start_new_ai_response, id}, state) do
starting_response = %{id: id, reasoning_content: "", content: ""}
broadcast({:start_ai_response_stream, starting_response})
{:noreply, %{state | streaming_response: starting_response}}
end
def handle_info(
msg,
%{streaming_response: %{id: current_id}} = state
)
when is_tuple(msg) and tuple_size(msg) in [2, 3] and elem(msg, 1) != current_id do
Logger.warning(
"Received #{elem(msg, 0)} for id #{elem(msg, 1)} but current streaming response is for id #{current_id}"
)
{:noreply, state}
end
def handle_info({:ai_reasoning_chunk, _id, reasoning_content}, state) do
broadcast({:reasoning_chunk_content, reasoning_content})
{:noreply,
%{
state
| streaming_response: %{
state.streaming_response
| reasoning_content: state.streaming_response.reasoning_content <> reasoning_content
}
}}
end
def handle_info({:ai_text_chunk, _id, text_content}, state) do
broadcast({:text_chunk_content, text_content})
{:noreply,
%{
state
| streaming_response: %{
state.streaming_response
| content: state.streaming_response.content <> text_content
}
}}
end
def handle_info({:ai_stream_finish, _id}, state) do
broadcast(:end_ai_response)
final_message = %{
role: :assistant,
content: state.streaming_response.content,
reasoning_content: state.streaming_response.reasoning_content
}
{:noreply,
%{
state
| streaming_response: nil,
messages: state.messages ++ [final_message],
turn: :user
}}
end
def handle_call(:get_conversation, _from, state) do
{:reply, state, state}
end
defp broadcast(msg), do: Phoenix.PubSub.broadcast(ElixirAi.PubSub, @topic, msg)
end

120
lib/elixir_ai/chat_utils.ex Normal file
View File

@@ -0,0 +1,120 @@
defmodule ElixirAi.ChatUtils do
require Logger
def request_ai_response(server, messages) do
Task.start(fn ->
api_url = Application.fetch_env!(:elixir_ai, :ai_endpoint)
api_key = Application.fetch_env!(:elixir_ai, :ai_token)
model = Application.fetch_env!(:elixir_ai, :ai_model)
body = %{
model: model,
stream: true,
messages: messages |> Enum.map(&api_message/1)
}
headers = [{"authorization", "Bearer #{api_key}"}]
case Req.post(api_url,
json: body,
headers: headers,
into: fn {:data, data}, acc ->
data
|> String.split("\n")
|> Enum.each(&handle_stream_line(server, &1))
{:cont, acc}
end
) do
{:ok, _} ->
:ok
{:error, reason} ->
IO.warn("AI request failed: #{inspect(reason)}")
end
end)
end
def handle_stream_line(_server, "") do
:ok
end
def handle_stream_line(server, "data: [DONE]") do
# send(server, :ai_stream_done)
:ok
end
def handle_stream_line(server, "data: " <> json) do
case Jason.decode(json) do
{:ok, body} ->
# Logger.debug("Received AI chunk: #{inspect(body)}")
handle_stream_line(server, body)
other ->
Logger.error("Failed to decode AI response chunk: #{inspect(other)}")
:ok
end
end
# first streamed response
def handle_stream_line(server, %{
"choices" => [%{"delta" => %{"content" => nil, "role" => "assistant"}}],
"id" => id
}) do
send(
server,
{:start_new_ai_response, id}
)
end
# last streamed response
def handle_stream_line(server, %{
"choices" => [%{"finish_reason" => "stop"}],
"id" => id
}) do
send(
server,
{:ai_stream_finish, id}
)
end
# streamed in reasoning
def handle_stream_line(server, %{
"choices" => [
%{
"delta" => %{"reasoning_content" => reasoning_content},
"finish_reason" => nil
}
],
"id" => id
}) do
send(
server,
{:ai_reasoning_chunk, id, reasoning_content}
)
end
def handle_stream_line(server, %{
"choices" => [
%{
"delta" => %{"content" => reasoning_content},
"finish_reason" => nil
}
],
"id" => id
}) do
send(
server,
{:ai_text_chunk, id, reasoning_content}
)
end
def handle_stream_line(_server, unmatched_message) do
Logger.warning("Received unmatched stream line: #{inspect(unmatched_message)}")
:ok
end
def api_message(%{role: role, content: content}) do
%{role: Atom.to_string(role), content: content}
end
end