From 4ac6c09759eabaede095772a83787b2641817b5c Mon Sep 17 00:00:00 2001 From: Alex Mickelson Date: Tue, 3 Mar 2026 14:13:01 -0700 Subject: [PATCH] preserve state in singleton managers --- backend/lib/backend/application.ex | 5 +- backend/lib/backend/game_runner.ex | 34 +++++- backend/lib/backend/global_singleton.ex | 111 +++++++++++------- .../backend/node_cluster_integration_test.exs | 23 ++++ 4 files changed, 124 insertions(+), 49 deletions(-) diff --git a/backend/lib/backend/application.ex b/backend/lib/backend/application.ex index 8b2d6b1..d0e13a3 100644 --- a/backend/lib/backend/application.ex +++ b/backend/lib/backend/application.ex @@ -2,13 +2,16 @@ defmodule Backend.Application do @moduledoc false use Application + # application fully started on each node + @impl true def start(_type, _args) do children = [ BackendWeb.Telemetry, {Phoenix.PubSub, name: Backend.PubSub}, Backend.Cluster, - {Backend.GlobalSingleton, Backend.GameRunner}, + {Backend.GlobalSingleton, + module: Backend.GameRunner, name: "GameRunner", startup_args: %{}, pubsub_channel: "GameRunner.StartupArgs"}, BackendWeb.Endpoint ] diff --git a/backend/lib/backend/game_runner.ex b/backend/lib/backend/game_runner.ex index 815291e..277c781 100644 --- a/backend/lib/backend/game_runner.ex +++ b/backend/lib/backend/game_runner.ex @@ -12,13 +12,12 @@ defmodule Backend.GameRunner do # Client API - def start_link(_opts) do - case GenServer.start_link(__MODULE__, %{}, name: @name) do + def start_link(startup_args) do + case GenServer.start_link(__MODULE__, startup_args, name: @name) do {:ok, pid} -> {:ok, pid} {:error, {:already_started, _pid}} -> - # Another instance is already running globally :ignore end end @@ -50,15 +49,23 @@ defmodule Backend.GameRunner do GenServer.call(@name, :get_node_name) end + def get_pid do + GenServer.call(@name, :get_pid) + end + + def crash_game do + GenServer.cast(@name, :crash) + end + # Server Callbacks @impl true - def init(_) do + def init(startup_args) do Logger.info("GameState starting on node: #{node()}") sleep_delay = round(1000 / @fps) :timer.send_interval(sleep_delay, :tick) - {:ok, %{players: %{}, tick_number: 0}} + {:ok, %{players: %{}, tick_number: 0} |> Map.merge(startup_args)} end @impl true @@ -103,6 +110,12 @@ defmodule Backend.GameRunner do {:noreply, broadcast_state(new_state)} end + @impl true + def handle_cast(:crash, _state) do + Logger.error("Simulated crash of GameRunner on node #{node()}") + raise "Simulated crash" + end + @impl true def handle_info(:tick, state) do if rem(state.tick_number, 100) == 0 do @@ -146,8 +159,19 @@ defmodule Backend.GameRunner do {:reply, node(), state} end + def handle_call(:get_pid, _from, state) do + {:reply, self(), state} + end + defp broadcast_state(state) do Phoenix.PubSub.broadcast(Backend.PubSub, "game_state", {:game_state_updated, state}) + + Phoenix.PubSub.broadcast( + Backend.PubSub, + "GameRunner.StartupArgs", + {:startup_args_updated, state} + ) + state end end diff --git a/backend/lib/backend/global_singleton.ex b/backend/lib/backend/global_singleton.ex index 4099fbf..a8ec947 100644 --- a/backend/lib/backend/global_singleton.ex +++ b/backend/lib/backend/global_singleton.ex @@ -3,63 +3,88 @@ defmodule Backend.GlobalSingleton do Supervisor that ensures a global singleton process runs across the cluster. If the node running it crashes, another node will take over. """ - use Supervisor + use GenServer require Logger - def start_link(module) do - Supervisor.start_link(__MODULE__, module, name: :"#{module}.GlobalSingleton") + def start_link( + module: module, + name: name, + startup_args: startup_args, + pubsub_channel: pubsub_channel + ) do + GenServer.start_link(__MODULE__, {module, name, startup_args, pubsub_channel}, + name: get_name(module, name) + ) + end + + defp get_name(module, name), do: :"#{name}.#{module}.GlobalSingleton" + + @impl true + def init({module, name, startup_args, pubsub_channel}) do + # immediately schedule first check + Process.send(self(), :check, []) + + if pubsub_channel do + Phoenix.PubSub.subscribe(Backend.PubSub, pubsub_channel) + end + + {:ok, %{module: module, name: name, startup_args: startup_args, monitor_ref: nil}} end @impl true - def init(module) do - children = [ - %{ - id: :monitor_task, - start: {Task, :start_link, [fn -> monitor_loop(module) end]}, - restart: :permanent - } - ] + def handle_info( + :check, + %{module: module, startup_args: startup_args} = state + ) do + Process.send_after(self(), :check, 100) - Supervisor.init(children, strategy: :one_for_one) + process_pid = + case :global.whereis_name(module) do + :undefined -> + attempt_to_start_child_here(%{module: module, startup_args: startup_args}) + + pid when is_pid(pid) -> + pid + end + + {:noreply, monitor_if_not_already(process_pid, state)} end - defp monitor_loop(module) do - case :global.whereis_name(module) do - :undefined -> - # Double-check before attempting to start - Process.sleep(50) + @impl true + def handle_info({:startup_args_updated, new_args}, state) do + Logger.info("Received updated startup args for #{state.module}: #{inspect(new_args)}") + {:noreply, %{state | startup_args: new_args}} + end - case :global.whereis_name(module) do - :undefined -> - Logger.info("#{module} not running, attempting to start on #{node()}") + @impl true + def handle_info( + {:DOWN, ref, :process, _pid, _reason}, + %{module: module, monitor_ref: ref} = state + ) do + Logger.warning("#{module} went down, attempting takeover on #{node()}") + send(self(), :check) + {:noreply, %{state | monitor_ref: nil}} + end - case module.start_link([]) do - {:ok, _pid} -> - Logger.info("#{module} started on #{node()}") + defp monitor_if_not_already(pid, %{monitor_ref: nil} = state) when is_pid(pid) do + ref = Process.monitor(pid) + %{state | monitor_ref: ref} + end - {:error, {:already_started, _pid}} -> - Logger.debug("#{module} already started by another node") + defp monitor_if_not_already(_pid, state), do: state - _ -> - :ok - end + defp attempt_to_start_child_here(%{module: module, startup_args: startup_args}) do + case module.start_link(startup_args) do + {:ok, pid} -> + Logger.info("#{module} started on #{node()}") + pid - Process.sleep(100) - monitor_loop(module) + {:error, {:already_started, pid}} -> + Logger.debug("#{module} already started by another node") + pid - _pid -> - # Another node won the race - monitor_loop(module) - end - - pid when is_pid(pid) -> - ref = Process.monitor(pid) - - receive do - {:DOWN, ^ref, :process, ^pid, _reason} -> - Logger.warning("#{module} went down, attempting takeover") - monitor_loop(module) - end + _ -> + nil end end end diff --git a/backend/test/backend/node_cluster_integration_test.exs b/backend/test/backend/node_cluster_integration_test.exs index 8ec4067..1e4b997 100644 --- a/backend/test/backend/node_cluster_integration_test.exs +++ b/backend/test/backend/node_cluster_integration_test.exs @@ -30,6 +30,29 @@ defmodule Backend.NodeClusterIntegrationTests do assert length(peer_nodes -- [owner_node]) == 3, "Expected 3 non-owner peer nodes, got #{inspect(peer_nodes -- [owner_node])}" end + + test "crashing GameRunner gets picked up on other node" do + peers = start_cluster(2) + + game_runner_pid = + :peer.call(hd(peers) |> elem(0), :global, :whereis_name, [Backend.GameRunner]) + + assert is_pid(game_runner_pid), "Could not find GameRunner in :global registry" + + first_node = node(game_runner_pid) + + GameRunner.crash_game() + + :timer.sleep(100) + + new_pid = GameRunner.get_pid() + assert is_pid(new_pid), "GameRunner did not restart after crash " + + restarted_node = node(new_pid) + + assert restarted_node != first_node, + "GameRunner restarted on the same node after 10 crash attempts" + end end defp start_cluster(count) do