preserve state in singleton managers
This commit is contained in:
@@ -2,13 +2,16 @@ defmodule Backend.Application do
|
||||
@moduledoc false
|
||||
use Application
|
||||
|
||||
# application fully started on each node
|
||||
|
||||
@impl true
|
||||
def start(_type, _args) do
|
||||
children = [
|
||||
BackendWeb.Telemetry,
|
||||
{Phoenix.PubSub, name: Backend.PubSub},
|
||||
Backend.Cluster,
|
||||
{Backend.GlobalSingleton, Backend.GameRunner},
|
||||
{Backend.GlobalSingleton,
|
||||
module: Backend.GameRunner, name: "GameRunner", startup_args: %{}, pubsub_channel: "GameRunner.StartupArgs"},
|
||||
BackendWeb.Endpoint
|
||||
]
|
||||
|
||||
|
||||
@@ -12,13 +12,12 @@ defmodule Backend.GameRunner do
|
||||
|
||||
# Client API
|
||||
|
||||
def start_link(_opts) do
|
||||
case GenServer.start_link(__MODULE__, %{}, name: @name) do
|
||||
def start_link(startup_args) do
|
||||
case GenServer.start_link(__MODULE__, startup_args, name: @name) do
|
||||
{:ok, pid} ->
|
||||
{:ok, pid}
|
||||
|
||||
{:error, {:already_started, _pid}} ->
|
||||
# Another instance is already running globally
|
||||
:ignore
|
||||
end
|
||||
end
|
||||
@@ -50,15 +49,23 @@ defmodule Backend.GameRunner do
|
||||
GenServer.call(@name, :get_node_name)
|
||||
end
|
||||
|
||||
def get_pid do
|
||||
GenServer.call(@name, :get_pid)
|
||||
end
|
||||
|
||||
def crash_game do
|
||||
GenServer.cast(@name, :crash)
|
||||
end
|
||||
|
||||
# Server Callbacks
|
||||
|
||||
@impl true
|
||||
def init(_) do
|
||||
def init(startup_args) do
|
||||
Logger.info("GameState starting on node: #{node()}")
|
||||
sleep_delay = round(1000 / @fps)
|
||||
:timer.send_interval(sleep_delay, :tick)
|
||||
|
||||
{:ok, %{players: %{}, tick_number: 0}}
|
||||
{:ok, %{players: %{}, tick_number: 0} |> Map.merge(startup_args)}
|
||||
end
|
||||
|
||||
@impl true
|
||||
@@ -103,6 +110,12 @@ defmodule Backend.GameRunner do
|
||||
{:noreply, broadcast_state(new_state)}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_cast(:crash, _state) do
|
||||
Logger.error("Simulated crash of GameRunner on node #{node()}")
|
||||
raise "Simulated crash"
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_info(:tick, state) do
|
||||
if rem(state.tick_number, 100) == 0 do
|
||||
@@ -146,8 +159,19 @@ defmodule Backend.GameRunner do
|
||||
{:reply, node(), state}
|
||||
end
|
||||
|
||||
def handle_call(:get_pid, _from, state) do
|
||||
{:reply, self(), state}
|
||||
end
|
||||
|
||||
defp broadcast_state(state) do
|
||||
Phoenix.PubSub.broadcast(Backend.PubSub, "game_state", {:game_state_updated, state})
|
||||
|
||||
Phoenix.PubSub.broadcast(
|
||||
Backend.PubSub,
|
||||
"GameRunner.StartupArgs",
|
||||
{:startup_args_updated, state}
|
||||
)
|
||||
|
||||
state
|
||||
end
|
||||
end
|
||||
|
||||
@@ -3,63 +3,88 @@ defmodule Backend.GlobalSingleton do
|
||||
Supervisor that ensures a global singleton process runs across the cluster.
|
||||
If the node running it crashes, another node will take over.
|
||||
"""
|
||||
use Supervisor
|
||||
use GenServer
|
||||
require Logger
|
||||
|
||||
def start_link(module) do
|
||||
Supervisor.start_link(__MODULE__, module, name: :"#{module}.GlobalSingleton")
|
||||
def start_link(
|
||||
module: module,
|
||||
name: name,
|
||||
startup_args: startup_args,
|
||||
pubsub_channel: pubsub_channel
|
||||
) do
|
||||
GenServer.start_link(__MODULE__, {module, name, startup_args, pubsub_channel},
|
||||
name: get_name(module, name)
|
||||
)
|
||||
end
|
||||
|
||||
defp get_name(module, name), do: :"#{name}.#{module}.GlobalSingleton"
|
||||
|
||||
@impl true
|
||||
def init({module, name, startup_args, pubsub_channel}) do
|
||||
# immediately schedule first check
|
||||
Process.send(self(), :check, [])
|
||||
|
||||
if pubsub_channel do
|
||||
Phoenix.PubSub.subscribe(Backend.PubSub, pubsub_channel)
|
||||
end
|
||||
|
||||
{:ok, %{module: module, name: name, startup_args: startup_args, monitor_ref: nil}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def init(module) do
|
||||
children = [
|
||||
%{
|
||||
id: :monitor_task,
|
||||
start: {Task, :start_link, [fn -> monitor_loop(module) end]},
|
||||
restart: :permanent
|
||||
}
|
||||
]
|
||||
def handle_info(
|
||||
:check,
|
||||
%{module: module, startup_args: startup_args} = state
|
||||
) do
|
||||
Process.send_after(self(), :check, 100)
|
||||
|
||||
Supervisor.init(children, strategy: :one_for_one)
|
||||
process_pid =
|
||||
case :global.whereis_name(module) do
|
||||
:undefined ->
|
||||
attempt_to_start_child_here(%{module: module, startup_args: startup_args})
|
||||
|
||||
pid when is_pid(pid) ->
|
||||
pid
|
||||
end
|
||||
|
||||
{:noreply, monitor_if_not_already(process_pid, state)}
|
||||
end
|
||||
|
||||
defp monitor_loop(module) do
|
||||
case :global.whereis_name(module) do
|
||||
:undefined ->
|
||||
# Double-check before attempting to start
|
||||
Process.sleep(50)
|
||||
@impl true
|
||||
def handle_info({:startup_args_updated, new_args}, state) do
|
||||
Logger.info("Received updated startup args for #{state.module}: #{inspect(new_args)}")
|
||||
{:noreply, %{state | startup_args: new_args}}
|
||||
end
|
||||
|
||||
case :global.whereis_name(module) do
|
||||
:undefined ->
|
||||
Logger.info("#{module} not running, attempting to start on #{node()}")
|
||||
@impl true
|
||||
def handle_info(
|
||||
{:DOWN, ref, :process, _pid, _reason},
|
||||
%{module: module, monitor_ref: ref} = state
|
||||
) do
|
||||
Logger.warning("#{module} went down, attempting takeover on #{node()}")
|
||||
send(self(), :check)
|
||||
{:noreply, %{state | monitor_ref: nil}}
|
||||
end
|
||||
|
||||
case module.start_link([]) do
|
||||
{:ok, _pid} ->
|
||||
Logger.info("#{module} started on #{node()}")
|
||||
defp monitor_if_not_already(pid, %{monitor_ref: nil} = state) when is_pid(pid) do
|
||||
ref = Process.monitor(pid)
|
||||
%{state | monitor_ref: ref}
|
||||
end
|
||||
|
||||
{:error, {:already_started, _pid}} ->
|
||||
Logger.debug("#{module} already started by another node")
|
||||
defp monitor_if_not_already(_pid, state), do: state
|
||||
|
||||
_ ->
|
||||
:ok
|
||||
end
|
||||
defp attempt_to_start_child_here(%{module: module, startup_args: startup_args}) do
|
||||
case module.start_link(startup_args) do
|
||||
{:ok, pid} ->
|
||||
Logger.info("#{module} started on #{node()}")
|
||||
pid
|
||||
|
||||
Process.sleep(100)
|
||||
monitor_loop(module)
|
||||
{:error, {:already_started, pid}} ->
|
||||
Logger.debug("#{module} already started by another node")
|
||||
pid
|
||||
|
||||
_pid ->
|
||||
# Another node won the race
|
||||
monitor_loop(module)
|
||||
end
|
||||
|
||||
pid when is_pid(pid) ->
|
||||
ref = Process.monitor(pid)
|
||||
|
||||
receive do
|
||||
{:DOWN, ^ref, :process, ^pid, _reason} ->
|
||||
Logger.warning("#{module} went down, attempting takeover")
|
||||
monitor_loop(module)
|
||||
end
|
||||
_ ->
|
||||
nil
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
Reference in New Issue
Block a user