Files
elixirAI/lib/elixir_ai/ai_utils/stream_line_utils.ex
Alex Mickelson 4dc4814b2f
Some checks failed
CI/CD Pipeline / build (push) Failing after 3s
working on db logic
2026-03-12 15:01:33 -06:00

153 lines
3.7 KiB
Elixir

defmodule ElixirAi.AiUtils.StreamLineUtils do
require Logger
def handle_stream_line(_server, "") do
:ok
end
def handle_stream_line(_server, "data: [DONE]") do
# send(server, :ai_stream_done)
:ok
end
def handle_stream_line(server, "data: " <> json) do
case Jason.decode(json) do
{:ok, body} ->
# Logger.debug("Received AI chunk: #{inspect(body)}")
handle_stream_line(server, body)
other ->
Logger.error("Failed to decode AI response chunk: #{inspect(other)}")
:ok
end
end
# first streamed response
def handle_stream_line(server, %{
"choices" => [%{"delta" => %{"content" => nil, "role" => "assistant"}}],
"id" => id
}) do
send(
server,
{:start_new_ai_response, id}
)
end
# last streamed response
def handle_stream_line(
server,
%{
"choices" => [%{"finish_reason" => "stop"}],
"id" => id
} = _msg
) do
# Logger.info("Received end of AI response stream for id #{id} with message: #{inspect(msg)}")
send(
server,
{:ai_text_stream_finish, id}
)
end
# streamed in reasoning
def handle_stream_line(server, %{
"choices" => [
%{
"delta" => %{"reasoning_content" => reasoning_content},
"finish_reason" => nil
}
],
"id" => id
}) do
send(
server,
{:ai_reasoning_chunk, id, reasoning_content}
)
end
# streamed in text
def handle_stream_line(server, %{
"choices" => [
%{
"delta" => %{"content" => reasoning_content},
"finish_reason" => nil
}
],
"id" => id
}) do
send(
server,
{:ai_text_chunk, id, reasoning_content}
)
end
# start and middle tool call
def handle_stream_line(server, %{
"choices" => [
%{
"delta" => %{
"tool_calls" => tool_calls
},
"finish_reason" => nil
}
],
"id" => id
})
when is_list(tool_calls) do
Enum.each(tool_calls, fn
%{
"id" => tool_call_id,
"index" => tool_index,
"type" => "function",
"function" => %{"name" => tool_name, "arguments" => tool_args_start}
} ->
# Logger.info("Received tool call start for tool #{tool_name}")
send(
server,
{:ai_tool_call_start, id, {tool_name, tool_args_start, tool_index, tool_call_id}}
)
%{"index" => tool_index, "function" => %{"arguments" => tool_args_diff}} ->
# Logger.info("Received tool call middle for index #{tool_index}")
send(server, {:ai_tool_call_middle, id, {tool_args_diff, tool_index}})
other ->
Logger.warning("Unmatched tool call item: #{inspect(other)}")
end)
end
# end tool call
def handle_stream_line(
server,
%{
"choices" => [%{"finish_reason" => "tool_calls"}],
"id" => id
}
) do
# Logger.info("Received tool_calls_finished with message: #{inspect(message)}")
send(server, {:ai_tool_call_end, id})
end
def handle_stream_line(_server, %{"error" => error_info}) do
Logger.error("Received error from AI stream: #{inspect(error_info)}")
:ok
end
def handle_stream_line(server, json) when is_binary(json) do
case Jason.decode(json) do
{:ok, body} ->
handle_stream_line(server, body)
_ ->
Logger.warning("Received unmatched stream line: #{inspect(json)}")
:ok
end
end
def handle_stream_line(_server, unmatched_message) do
Logger.warning("Received unmatched stream line: #{inspect(unmatched_message)}")
:ok
end
end