This commit is contained in:
47
lib/elixir_ai/audio/audio_processing.ex
Normal file
47
lib/elixir_ai/audio/audio_processing.ex
Normal file
@@ -0,0 +1,47 @@
|
||||
defmodule ElixirAi.AudioProcessing do
|
||||
@moduledoc """
|
||||
Public API for the demand-driven audio transcription pool.
|
||||
|
||||
Dispatch strategy:
|
||||
1. Pick a random idle worker from the :available pg group.
|
||||
2. If none are idle and the pool is below @max_workers, spawn a fresh worker
|
||||
under AudioWorkerSupervisor and route the job directly to it.
|
||||
3. If already at @max_workers, queue the job to a random existing worker via
|
||||
its Erlang mailbox — it will process it when its current job finishes.
|
||||
|
||||
Scale-up is fully automatic (on demand). Scale-down is handled by each worker's
|
||||
idle-timeout logic; workers exit after idling and the pool can reach 0.
|
||||
"""
|
||||
|
||||
@max_workers 10
|
||||
@all_group :all
|
||||
@available_group :available
|
||||
|
||||
@doc """
|
||||
Submit audio for transcription. The result is delivered asynchronously to
|
||||
`caller_pid` as:
|
||||
|
||||
{:transcription_result, {:ok, text} | {:error, reason}}
|
||||
"""
|
||||
def submit(audio_binary, mime_type, caller_pid) do
|
||||
case :pg.get_members(ElixirAi.AudioProcessingPG, @available_group) do
|
||||
[] ->
|
||||
all = :pg.get_members(ElixirAi.AudioProcessingPG, @all_group)
|
||||
|
||||
if length(all) < @max_workers do
|
||||
{:ok, pid} =
|
||||
DynamicSupervisor.start_child(ElixirAi.AudioWorkerSupervisor, ElixirAi.AudioWorker)
|
||||
|
||||
GenServer.cast(pid, {:transcribe, caller_pid, audio_binary, mime_type})
|
||||
else
|
||||
# At max capacity — overflow to a random worker's mailbox
|
||||
GenServer.cast(Enum.random(all), {:transcribe, caller_pid, audio_binary, mime_type})
|
||||
end
|
||||
|
||||
available ->
|
||||
GenServer.cast(Enum.random(available), {:transcribe, caller_pid, audio_binary, mime_type})
|
||||
end
|
||||
|
||||
:ok
|
||||
end
|
||||
end
|
||||
20
lib/elixir_ai/audio/audio_processing_pg.ex
Normal file
20
lib/elixir_ai/audio/audio_processing_pg.ex
Normal file
@@ -0,0 +1,20 @@
|
||||
defmodule ElixirAi.AudioProcessingPG do
|
||||
@moduledoc """
|
||||
Named :pg scope for tracking audio transcription workers across the cluster.
|
||||
|
||||
Workers join two groups:
|
||||
- :all — always a member while alive (used for pool-size accounting)
|
||||
- :available — member only while idle (used for dispatch; left while processing)
|
||||
|
||||
:pg automatically removes dead processes, so no manual cleanup is needed.
|
||||
"""
|
||||
|
||||
def child_spec(_opts) do
|
||||
%{
|
||||
id: __MODULE__,
|
||||
start: {:pg, :start_link, [__MODULE__]},
|
||||
type: :worker,
|
||||
restart: :permanent
|
||||
}
|
||||
end
|
||||
end
|
||||
114
lib/elixir_ai/audio/audio_worker.ex
Normal file
114
lib/elixir_ai/audio/audio_worker.ex
Normal file
@@ -0,0 +1,114 @@
|
||||
defmodule ElixirAi.AudioWorker do
|
||||
@moduledoc """
|
||||
GenServer that transcribes audio by posting to a Whisper-compatible HTTP endpoint.
|
||||
|
||||
Pool membership in AudioProcessingPG:
|
||||
- :all — joined on init; left only on exit
|
||||
- :available — joined on init and after each job; left while processing
|
||||
|
||||
This join/leave pattern lets the AudioProcessing dispatcher know which workers are
|
||||
idle without any central coordinator. When a worker finishes a job it rejoins
|
||||
:available and becomes eligible for the next dispatch.
|
||||
|
||||
Scale-down: workers exit after @idle_timeout_ms of inactivity, allowing the pool
|
||||
to reach 0. New workers are spawned on demand when the next job arrives.
|
||||
|
||||
Results are delivered to the calling LiveView process as:
|
||||
{:transcription_result, {:ok, text} | {:error, reason}}
|
||||
"""
|
||||
|
||||
use GenServer
|
||||
require Logger
|
||||
|
||||
@all_group :all
|
||||
@available_group :available
|
||||
@idle_timeout_ms 30_000
|
||||
|
||||
def start_link(opts), do: GenServer.start_link(__MODULE__, opts)
|
||||
|
||||
@impl true
|
||||
def init(_opts) do
|
||||
:pg.join(ElixirAi.AudioProcessingPG, @all_group, self())
|
||||
:pg.join(ElixirAi.AudioProcessingPG, @available_group, self())
|
||||
schedule_idle_check()
|
||||
{:ok, %{busy: false, idle_since: monotonic_sec()}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_cast({:transcribe, caller_pid, audio_binary, mime_type}, state) do
|
||||
:pg.leave(ElixirAi.AudioProcessingPG, @available_group, self())
|
||||
worker = self()
|
||||
|
||||
Task.start(fn ->
|
||||
result = do_transcribe(audio_binary, mime_type)
|
||||
send(worker, {:transcription_done, caller_pid, result})
|
||||
end)
|
||||
|
||||
{:noreply, %{state | busy: true}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_info({:transcription_done, caller_pid, result}, state) do
|
||||
send(caller_pid, {:transcription_result, result})
|
||||
:pg.join(ElixirAi.AudioProcessingPG, @available_group, self())
|
||||
{:noreply, %{state | busy: false, idle_since: monotonic_sec()}}
|
||||
end
|
||||
|
||||
def handle_info(:idle_check, %{busy: true} = state) do
|
||||
schedule_idle_check()
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
def handle_info(:idle_check, %{busy: false, idle_since: idle_since} = state) do
|
||||
idle_ms = (monotonic_sec() - idle_since) * 1000
|
||||
|
||||
if idle_ms >= @idle_timeout_ms do
|
||||
Logger.debug("AudioWorker #{inspect(self())} exiting — idle for #{div(idle_ms, 1000)}s")
|
||||
|
||||
{:stop, :normal, state}
|
||||
else
|
||||
schedule_idle_check()
|
||||
{:noreply, state}
|
||||
end
|
||||
end
|
||||
|
||||
defp schedule_idle_check do
|
||||
Process.send_after(self(), :idle_check, @idle_timeout_ms)
|
||||
end
|
||||
|
||||
defp monotonic_sec, do: System.monotonic_time(:second)
|
||||
|
||||
defp filename_for(mime_type) do
|
||||
cond do
|
||||
String.starts_with?(mime_type, "audio/webm") -> "audio.webm"
|
||||
String.starts_with?(mime_type, "audio/ogg") -> "audio.ogg"
|
||||
String.starts_with?(mime_type, "audio/mp4") -> "audio.mp4"
|
||||
true -> "audio.bin"
|
||||
end
|
||||
end
|
||||
|
||||
defp do_transcribe(audio_binary, mime_type) do
|
||||
endpoint = Application.get_env(:elixir_ai, :whisper_endpoint)
|
||||
filename = filename_for(mime_type)
|
||||
|
||||
case Req.post(endpoint,
|
||||
form_multipart: [
|
||||
file: {audio_binary, filename: filename, content_type: mime_type},
|
||||
response_format: "json",
|
||||
language: "en"
|
||||
],
|
||||
receive_timeout: 30_000
|
||||
) do
|
||||
{:ok, %{status: 200, body: %{"text" => text}}} ->
|
||||
{:ok, String.trim(text)}
|
||||
|
||||
{:ok, %{status: status, body: body}} ->
|
||||
Logger.warning("AudioWorker: Whisper returned HTTP #{status}: #{inspect(body)}")
|
||||
{:error, {:http_error, status}}
|
||||
|
||||
{:error, reason} ->
|
||||
Logger.error("AudioWorker: request failed: #{inspect(reason)}")
|
||||
{:error, reason}
|
||||
end
|
||||
end
|
||||
end
|
||||
Reference in New Issue
Block a user