PubSub and Process.send_after weird behavior in Phoenix Live View

Hello guys, i’m having some trouble in the game i’m currently developing, let me explain. It’s a song guessing game, so users can join a room and then start a game, some song will play and the players should try to guess. Fine enough, the architecture so on is:

A DynamicSupervisor which gets up a GameServer (genserver) which holds the game logic and state (Game struct).

Thing’s got weird when i tried to add a countdown for show the time’s for song is ending, what happening is: the time decrease by 1, but it actually does that for each connected client on live view, so let’s suppose the preview have 30 seconds and i have 3 lv clients (players) the countdown would be like: 27/30, 24/30, 21/30 and so on, i’m truly not understanding what is happening here.

I’ll leave the code here to get more context but you can reach the project repo as well

GameServer

defmodule Eureka.GameServer do
  alias Eureka.{Accounts, Game, Song}
  alias Phoenix.PubSub
  use GenServer
  require Logger

  @pubsub Eureka.PubSub

  # Initialization
  def start_link(room_code: room_code, players: players) do
    GenServer.start_link(__MODULE__, room_code: room_code, players: players)
  end

  @impl true
  def init(room_code: room_code, players: players) do
    new_game = Game.new_game(room_code, players)
    {:ok, new_game, {:continue, :fetch_song}}
  end

  # Client API
  @doc """
  Returns the current game state
  """
  @spec game(pid()) :: Game.t()
  def game(game_server) do
    GenServer.call(game_server, :get_game)
  end

  @doc """
  Get the players in the game
  """
  @spec get_players(pid()) :: [Accounts.User.t()]
  def get_players(game_server) do
    GenServer.call(game_server, :get_players)
  end

  @doc """
  Get the scores of the players in the game
  """
  def get_scores(game_server) do
    GenServer.call(game_server, :get_scores)
  end

  @doc """
  Starts the countdown for the current song
  """
  @spec song_countdown(pid()) :: :ok
  def song_countdown(game_server) do
    GenServer.cast(game_server, :timer)
  end

  @doc """
  Guess the current song
  """
  @spec guess_song(pid(), %{guess: String.t(), player: String.t()}) :: Game.t()
  def guess_song(game_server, %{guess: guess, player: player}) do
    GenServer.cast(game_server, {:guess_song, %{guess: guess, player: player}})
  end

  @doc """
  Subscribe to the game's topic
  """
  @spec subscribe_game(pid()) :: :ok
  def subscribe_game(game_server) do
    game = game(game_server)
    PubSub.subscribe(@pubsub, topic(game.id))
  end

  # Server API

  @impl true
  def handle_continue(:fetch_song, %Game{} = game) do
    Song.search(Game.next_song(game))
    {:noreply, game}
  end

  @impl true
  def handle_cast(:timer, game) do
    Process.send_after(self(), :countdown, 1_000)
    {:noreply, game}
  end

  def handle_cast({:guess_song, %{guess: guess, player: player}}, %Game{} = game) do
    {valid?, game} = Game.guess_song(game, %{guess: guess, player: player})
    score = Game.get_score(game, player)
    broadcast_update!(game, {:guess_result, %{score: score, valid?: valid?}})
    {:noreply, game}
  end

  @impl true
  def handle_call(:get_game, _from, state) do
    {:reply, state, state}
  end

  def handle_call(:get_players, _from, %Game{} = game) do
    players = Accounts.get_users_map(game.players)
    {:reply, players, game}
  end

  def handle_call(:get_scores, _from, %Game{} = game) do
    {:reply, game.score, game}
  end

  @impl true
  def handle_info({:DOWN, _ref, :process, _pid, :normal}, state) do
    {:noreply, state}
  end

  def handle_info({_ref, %Song.Response{} = fetched_song}, %Game{} = game) do
    Logger.info("Fetched song: #{inspect(fetched_song)}")

    {:noreply,
     Game.update_song(game, fetched_song)
     |> broadcast_update!({:current_song, fetched_song})}
  end

  def handle_info(:countdown, %Game{} = game) do
    %Game{song_timer: current_timer} = game = Game.countdown_timer(game)

    if current_timer <= 0 do
      {:noreply, game}
    else
      Logger.debug("Countdown: #{current_timer}")

      countdown = div(current_timer, 1000)
      duration = div(Song.duration(game.current_song), 1000)
      broadcast_update!(game, {:countdown, %{duration: duration, countdown: countdown}})

      Process.send_after(self(), :countdown, 1000)

      {:noreply, game}
    end
  end

  defp broadcast_update!(%Game{id: game_id} = game, message) do
    PubSub.broadcast!(@pubsub, topic(game_id), message)
    game
  end

  defp topic(game_id) do
    "game:" <> game_id
  end
end

Game

defmodule Eureka.Game do
  @moduledoc """
  This module defines the struct who will store the game state.
  """
  alias __MODULE__
  alias Eureka.Song

  @type t :: %{
          id: String.t(),
          room_code: String.t(),
          song_queue: [Song.t()],
          current_song: Song.Response.t(),
          score: [Game.Score.t()],
          round: integer(),
          winner: integer(),
          valid_answers: [String.t()],
          players: [integer()],
          song_timer: non_neg_integer()
        }

  @enforce_keys [
    :id,
    :room_code,
    :song_queue,
    :current_song,
    :score,
    :round,
    :winner,
    :valid_answers,
    :players,
    :song_timer
  ]
  defstruct [
    :id,
    :room_code,
    :song_queue,
    :current_song,
    :score,
    :round,
    :winner,
    :valid_answers,
    :players,
    :song_timer
  ]

  @spec new_game(integer(), [integer()]) :: Game.t()
  def new_game(room_code, players) do
    queue = [
      # This will be mocked for now but will come through AI in the future
      %Song{
        artist: "Matuê",
        track: "333"
      },
      %Song{
        artist: "Adele",
        track: "Easy On Me"
      },
      %Song{
        artist: "Rihanna",
        track: "Umbrella"
      }
    ]

    score = Enum.map(players, fn player_id -> %Game.Score{score: 0, player: player_id} end)

    %Eureka.Game{
      id: generate_game_id(),
      room_code: room_code,
      song_queue: queue,
      current_song: nil,
      score: score,
      round: 0,
      winner: nil,
      valid_answers: [],
      players: players,
      song_timer: 0
    }
  end

  @doc """
  Returns the next song in the queue
  """
  @spec next_song(game :: Game.t()) :: Song.t()
  def next_song(%__MODULE__{song_queue: song_queue}) do
    hd(song_queue)
  end

  @doc """
  Updates the song queue removing the first element and setting the current fetched song
  """
  @spec update_song(game :: Game.t(), song :: Song.Response.t()) :: Game.t()
  def update_song(%__MODULE__{} = game, %Song.Response{} = current_song) do
    song_queue = tl(game.song_queue)

    %Game{
      game
      | song_queue: song_queue,
        current_song: current_song,
        song_timer: Song.duration(current_song),
        round: game.round + 1,
        valid_answers:
          get_valid_answers(%Song{track: current_song.name, artist: current_song.artist})
    }
  end

  @doc """
  Returns the score of a player
  """
  @spec get_score(Game.t(), player_id :: non_neg_integer()) :: Game.Score.t()
  def get_score(%Game{score: score}, player_id) do
    Enum.find(score, fn %Game.Score{player: player} -> player == player_id end)
  end

  @doc """
  Decreases the song timer by 1 second
  """
  @spec countdown_timer(Game.t()) :: Game.t()
  def countdown_timer(%__MODULE__{} = game) do
    %Game{game | song_timer: game.song_timer - :timer.seconds(1)}
  end

  @doc """
  Checks if the user input is a valid answer
  """
  @spec valid_guess?(Game.t(), String.t()) :: boolean()
  def valid_guess?(%Game{valid_answers: valid_answers}, guess) do
    guess = String.normalize(guess, :nfd) |> String.trim() |> String.downcase()
    Enum.member?(valid_answers, guess)
  end

  @doc """
  Processes a player's song guess and updates the game state accordingly.

  Takes a game state and a map containing the player's guess information. Returns a tuple
  containing a boolean indicating if the guess was correct and the updated game state.

  ## Parameters

    * game - A %Game{} struct representing the current game state
    * guess_info - A map containing:
      * :guess - The player's song guess
      * :player - The id of player making the guess

  ## Returns

    * {true, updated_game} - If the guess was correct, returns true and the game with updated score
    * {false, game} - If the guess was incorrect, returns false and the unchanged game

  ## Examples

      iex> game = %Game{current_song: "Yesterday", scores: %{}}
      iex> guess_info = %{guess: "Yesterday", player: 1}
      iex> guess_song(game, guess_info)
      {true, %Game{current_song: "Yesterday", scores: %{"Player1" => 1}}}

      iex> game = %Game{current_song: "Hey Jude", scores: %{}}
      iex> guess_info = %{guess: "Yesterday", player: 2}
      iex> guess_song(game, guess_info)
      {false, %Game{current_song: "Hey Jude", scores: %{}}}
  """
  @spec guess_song(Game.t(), Map.t()) :: {boolean(), Game.t()}
  def guess_song(%Game{} = game, %{guess: guess, player: player}) do
    if valid_guess?(game, guess) do
      {true, update_score(game, player)}
    else
      {false, game}
    end
  end

  defp update_score(%Game{} = game, player) do
    score =
      Enum.map(game.score, fn %Game.Score{player: player_id, score: score} ->
        if player_id == player do
          %Game.Score{player: player_id, score: score + 10}
        else
          %Game.Score{player: player_id, score: score}
        end
      end)

    %Game{game | score: score}
  end

  defp generate_game_id do
    Ecto.UUID.generate()
  end

  # This function will return the valid answers for the current song
  # The main idea is to handle the possible cases of the user input
  # Take the song "Easy on Me" by Adele as an example
  # The valid answers will be ["Easy on Me", "easy on me", "Adele - Easy on Me", "adele - easy on me", "EASY ON ME", "ADELE - EASY ON ME", "easy on me - adele", "EASY ON ME - ADELE"]
  defp get_valid_answers(%Song{track: track, artist: artist}) do
    artist = String.normalize(artist, :nfd)

    [
      track,
      String.downcase(track),
      String.capitalize(track),
      String.upcase(track),
      "#{artist} - #{track}",
      "#{String.downcase(artist)} - #{String.downcase(track)}",
      "#{String.capitalize(artist)} - #{String.capitalize(track)}",
      "#{String.upcase(artist)} - #{String.upcase(track)}",
      "#{track} - #{artist}",
      "#{String.downcase(track)} - #{String.downcase(artist)}",
      "#{String.capitalize(track)} - #{String.capitalize(artist)}",
      "#{String.upcase(track)} - #{String.upcase(artist)}"
    ]
    |> Enum.map(&String.trim/1)
  end
end

Live view UI

defmodule EurekaWeb.GameLive.Show do
  use EurekaWeb, :live_view
  alias Eureka.{Game, GameServer, GameSupervisor, Song}

  @impl true
  def render(assigns) do
    ~H"""
    <audio
      :if={@loading == false}
      id="audio"
      src={@song.preview_url}
      controls
      autoplay
      preload
      class="hidden"
    />

    <%= if @loading == false && @countdown > 0 do %>
      <h3>
        <%= @countdown %> / <%= @duration %>
      </h3>
    <% end %>

    <section class="grid grid-cols-3 h-[calc(100vh-14rem)] gap-20">
      <aside class="bg-white space-y-4 shadow-brutalism min-w-20 border-2 border-black px-6 py-4">
        <h3 class="font-mono text-xl text-center font-semibold">Leaderboard</h3>
        <ul class="w-full space-y-2 divide-y-4 divide-black">
          <li
            :for={{player_id, player} <- @players}
            id={"user-#{player_id}"}
            class="pt-2 flex items-center justify-between"
          >
            <div class="flex items-center gap-2">
              <div class="size-12 rounded-full"><%= player.avatar |> raw() %></div>
              <%= if player_id == @current_user.id do %>
                <p class="font-mono font-medium text-lg text-center">
                  You
                </p>
              <% else %>
                <p class="font-mono font-medium text-lg text-center">
                  <%= player.nickname || player.email %>
                </p>
              <% end %>
            </div>

            <span class="font-mono font-medium text-lg text-center">
              Points: <%= Enum.find(@scores, fn {id, _} -> id == player_id end) |> elem(1) %>
            </span>
          </li>
        </ul>
      </aside>
      <div class="relative w-full col-span-1">
        <%= if @loading == false do %>
          <img src={@song.cover} class="w-full h-[56%] rounded-xl" alt="song cover" />
          <div class="min-w-full bg-brown-700 bg-clip-padding backdrop-filter backdrop-blur-lg bg-opacity-40 border border-gray-100 rounded-xl absolute inset-0 h-[56%]" />
        <% end %>
      </div>
      <div class="border-l-4 border-black px-4 flex">
        <.simple_form for={@form} class="self-end w-full" id="guess-song-form" phx-submit="guess_song">
          <.input
            type="text"
            id="guessing"
            field={@form[:guess]}
            disabled={player_scored?(@current_user.id, @scores) || @loading}
            name="guessing"
            label="What song?"
            placeholder="Your guess"
            class="!rounded-full bg-white !outline-none !border-2 !border-black shadow-brutalism font-mono font-medium h-12 focus:ring-offset-0 focus:ring-0 focus:border-current"
            required
          />
        </.simple_form>
      </div>
    </section>
    """
  end

  @impl true
  def mount(%{"game_id" => game_id}, _session, socket) do
    case GameSupervisor.get_game(game_id) do
      {:ok, game_server_pid, game} ->
        if connected?(socket) do
          GameServer.subscribe_game(game_server_pid)
        end

        scores =
          GameServer.get_scores(game_server_pid)
          |> Enum.map(&{&1.player, &1.score})

        players = GameServer.get_players(game_server_pid)

        {:ok,
         assign(socket,
           game: game,
           game_server_pid: game_server_pid,
           players: players,
           scores: scores,
           song: game.current_song,
           valid_answers: [],
           countdown: 0,
           duration: 0,
           loading: true
         )
         |> assign_new(:form, fn ->
           to_form(%{"guess" => ""})
         end)}

      {:error, :game_not_found} ->
        {:ok, put_flash(socket, :error, "Game not found")}
    end
  end

  @impl true
  def handle_event("guess_song", %{"guessing" => guess}, socket) do
    GameServer.guess_song(socket.assigns.game_server_pid, %{
      guess: guess,
      player: socket.assigns.current_user.id
    })

    {:noreply, socket}
  end

  @impl true
  def handle_info({:current_song, %Song.Response{} = song}, socket) do
    GameServer.song_countdown(socket.assigns.game_server_pid)
    {:noreply, assign(socket, song: song, loading: false)}
  end

  def handle_info({:countdown, %{countdown: countdown, duration: duration}}, socket) do
    {:noreply, assign(socket, countdown: countdown, duration: duration)}
  end

  def handle_info(
        {:guess_result, %{score: %Game.Score{player: player, score: score}}},
        socket
      ) do
    socket =
      socket
      |> assign(
        scores:
          Enum.map(socket.assigns.scores, fn
            {player_id, _} when player_id == player -> {player_id, score}
            {player_id, score} -> {player_id, score}
          end)
      )

    {:noreply, socket}
  end

  defp player_scored?(player_id, scores) do
    Enum.find(scores, fn {id, _} -> id == player_id end) |> elem(1) != 0
  end
end

You mean you’re expecting all the lv-connected players has to be in “sync” with the countdown timer, correct? If yes, then I have [maybe] similar thing (simple sync timer) but the problem with mine was it all messed up when one of the connected user(s) refreshed their page (timer’s in sync until someone refreshes their page).

defmodule TimerxWeb.CountdownLive.Index do
  use TimerxWeb, :live_view

  @tick 1_000
  @duration_seconds 20

  @impl true
  def mount(_params, _session, socket) do
    if connected?(socket), do: Phoenix.PubSub.subscribe(Timerx.PubSub, "alerts")
    {:ok, assign(socket, :seconds, @duration_seconds)}
  end

  @impl true
  def handle_event("countdown", _unsigned_params, socket) do
    if connected?(socket), do: Phoenix.PubSub.broadcast(Timerx.PubSub, "alerts", :tick)
    {:noreply, socket}
  end

  @impl true
  def handle_info(:tick, %{assigns: %{seconds: 0}} = socket) do
    {:noreply, assign(socket, :seconds, 0)}
  end

  @impl true
  def handle_info(:tick, %{assigns: %{seconds: seconds}} = socket) do
    :timer.sleep(@tick)
    if connected?(socket), do: Phoenix.PubSub.broadcast(Timerx.PubSub, "alerts", :tick)
    {:noreply, assign(socket, :seconds, seconds - 1)}
  end
end
<h1>
  <%= gettext("Clock!", name: "Phoenix") %>
</h1>
<div>
  <%= @seconds %>
</div>
<button class="p-2 border" phx-click="countdown">
  Start
</button>
# config/dev.exs
config :timerx, TimerxWeb.Endpoint,
  # Binding to loopback ipv4 address prevents access from other machines.
  # Change to `ip: {0, 0, 0, 0}` to allow access from other machines.
  http: [ip: {0, 0, 0, 0}, port: port],

Yes, i’d expect all lv to be in sync, but what is happening is that the the timer decreases according to the number of clients connected, so instead decreases each 1 second, if there’s 2 clients connected will decrease by 2 second. In practice, but what’s actually happening is that decreases by 1 second twice (in this case of 2 lv connected)

May be I am not sure your :timer handle event call multiple time instead of single time.When fetch song you broadcast :current_song to client if multiple client then timer call will be multiple hence countdown increase is double.If you check timer is already running don’t start the timer for Debugging then correct the flow.

It doesn’t make sense for me (with all due respect), but i’m certainly going to try, but take a look of this project: lv_stopwatch

In this code, the counter is started from live view just like i did but is perfectly in sync

Counter can be started from live view but only one timer tick at a time.

In The lv_stopwatch project only one timer tick run at a time.Mean only one person can start timer.e.g if person A start the timer then his live view process run the tick. Person B can only stop the timer.if Person A disconnect.Timer will stop because tick timer is run on Person A liveview process.In Your case i Think multiple timer update the state.You can check via log

1 Like

True! I made a test and put a button as event to start a counter, and worked as expected, but i am still not sure how can i check if the timer is already running before start in my application flow, do you have some suggestion?

I think you should use your gameserver(GenServer) as source of truth.Move all Tick logic there if gameserver is per game .Client(lv) can only start and stop the timer which run in gameserver.I thinks its a little better approach.Just Like lv_stopwatch mention in their project
https://github.com/dwyl/phoenix-liveview-stopwatch?tab=readme-ov-file#genserver

1 Like

I got the solution few minutes before you answer me this :sweat_smile:, but the previous answers really helped a lot, was necessary just move the timer logic to the game server itself and worked ! Thanks a lot bro :handshake: