Batching pubsub events to throttle socket updates

Here’s a single file demo implementation:

# Single-file Phoenix LiveView demo showing how to batch events to reduce/control the flow of UI updates.
#
# Run with `elixir batcher_demo.exs`, then open a browser at http://localhost:5001/.
#
# For comparison, the "inc" event is batched while the "dec" event is not.
#
# References
#
# https://github.com/phoenixframework/phoenix_live_view/blob/main/.github/single-file-samples/main.exs
# https://elixirforum.com/t/optimizing-liveview-for-realtime-applications-by-marlus-saraiva-elixirconf-eu-2023/59265?u=rhcarvalho
# https://elixirforum.com/t/batching-pubsub-events-to-throttle-socket-updates/64658/2?u=rhcarvalho

Application.put_env(:sample, Example.Endpoint,
  http: [ip: {127, 0, 0, 1}, port: 5001],
  server: true,
  live_view: [signing_salt: "aaaaaaaa"],
  secret_key_base: String.duplicate("a", 64)
)

Mix.install([
  {:plug_cowboy, "~> 2.5"},
  {:jason, "~> 1.0"},
  {:phoenix, "~> 1.7"},
  {:phoenix_live_view, "~> 0.20.17"}
])

# build the LiveView JavaScript assets (this needs mix and npm available in your path!)
path = Phoenix.LiveView.__info__(:compile)[:source] |> Path.dirname() |> Path.join("../")
System.cmd("mix", ["deps.get"], cd: path, into: IO.binstream())
System.cmd("npm", ["install"], cd: Path.join(path, "./assets"), into: IO.binstream())
System.cmd("mix", ["assets.build"], cd: path, into: IO.binstream())

defmodule Example.ErrorView do
  def render(template, _), do: Phoenix.Controller.status_message_from_template(template)
end

defmodule Example.HomeLive do
  use Phoenix.LiveView, layout: {__MODULE__, :live}

  on_mount({Batcher, {"inc", handler: __MODULE__, max_size: 5, timeout: 2000}})

  @behaviour Batcher

  def mount(_params, _session, socket) do
    {:ok, assign(socket, :count, 0)}
  end

  def render("live.html", assigns) do
    ~H"""
    <script src="/assets/phoenix/phoenix.js"></script>
    <script src="/assets/phoenix_live_view/phoenix_live_view.js"></script>
    <%!-- uncomment to use enable tailwind --%>
    <%!-- <script src="https://cdn.tailwindcss.com"></script> --%>
    <script>
      let liveSocket = new window.LiveView.LiveSocket("/live", window.Phoenix.Socket)
      liveSocket.connect()
    </script>
    <style>
      * { font-size: 1.1em; }
    </style>
    <%= @inner_content %>
    """
  end

  def render(assigns) do
    ~H"""
    <%= @count %>
    <button phx-click="inc">+</button>
    <button phx-click="dec">-</button>
    """
  end

  def handle_batch("inc", batch, socket) do
    {:noreply, assign(socket, :count, socket.assigns.count + length(batch))}
  end

  def handle_event("dec", _params, socket) do
    {:noreply, assign(socket, :count, socket.assigns.count - 1)}
  end
end

defmodule Batcher do
  @callback handle_batch(
              event :: binary(),
              batch :: [Phoenix.LiveView.unsigned_params()],
              socket :: Phoenix.LiveView.Socket.t()
            ) ::
              {:noreply, Phoenix.LiveView.Socket.t()}
              | {:reply, map(), Phoenix.LiveView.Socket.t()}

  def on_mount({event, opts}, _params, _session, socket) do
    handler = Keyword.fetch!(opts, :handler)
    max_size = Keyword.fetch!(opts, :max_size)
    timeout = Keyword.fetch!(opts, :timeout)

    socket =
      socket
      |> Phoenix.LiveView.attach_hook("batcher_event", :handle_event, fn
        ^event, params, socket ->
          socket =
            case add_to_batch(socket, event, params) do
              {socket, batch} when length(batch) == max_size ->
                flush_batch(socket, event, handler)

              {socket, _} ->
                reschedule_flush_batch(socket, event, timeout)
            end

          {:halt, socket}

        _event, _params, socket ->
          {:cont, socket}
      end)
      |> Phoenix.LiveView.attach_hook("batcher_info", :handle_info, fn
        {:flush_batch, event}, socket ->
          socket =
            case get_batch(socket, event) do
              batch when length(batch) > 0 -> flush_batch(socket, event, handler)
              _ -> socket
            end

          {:halt, socket}

        _msg, socket ->
          {:cont, socket}
      end)

    {:cont, socket}
  end

  defp add_to_batch(socket, event, params) do
    # note: batch events are in reverse order
    batch = [params | get_batch(socket, event)]
    {put_batch(socket, event, batch), batch}
  end

  defp flush_batch(socket, event, handler) do
    socket = cancel_timeout(socket, event)
    batch = get_batch(socket, event) |> Enum.reverse()
    {:noreply, socket} = handler.handle_batch(event, batch, socket)
    socket |> put_batch(event, [])
  end

  defp reschedule_flush_batch(socket, event, timeout) do
    socket = cancel_timeout(socket, event)
    timeout_ref = Process.send_after(self(), {:flush_batch, event}, timeout)
    socket |> put_timeout_ref(event, timeout_ref)
  end

  defp cancel_timeout(socket, event) do
    if timeout_ref = get_timeout_ref(socket, event) do
      Process.cancel_timer(timeout_ref)
    end

    socket |> put_timeout_ref(event, nil)
  end

  defp get_batch(socket, event) do
    socket.private[{__MODULE__, event, :batch}] || []
  end

  def get_timeout_ref(socket, event) do
    socket.private[{__MODULE__, event, :timeout_ref}]
  end

  defp put_batch(socket, event, batch) do
    socket |> Phoenix.LiveView.put_private({__MODULE__, event, :batch}, batch)
  end

  defp put_timeout_ref(socket, event, timeout_ref) do
    socket |> Phoenix.LiveView.put_private({__MODULE__, event, :timeout_ref}, timeout_ref)
  end
end

defmodule Example.Router do
  use Phoenix.Router
  import Phoenix.LiveView.Router

  pipeline :browser do
    plug(:accepts, ["html"])
  end

  scope "/", Example do
    pipe_through(:browser)

    live("/", HomeLive, :index)
  end
end

defmodule Example.Endpoint do
  use Phoenix.Endpoint, otp_app: :sample
  socket("/live", Phoenix.LiveView.Socket)

  plug(Plug.Static, from: {:phoenix, "priv/static"}, at: "/assets/phoenix")
  plug(Plug.Static, from: {:phoenix_live_view, "priv/static"}, at: "/assets/phoenix_live_view")

  plug(Example.Router)
end

{:ok, _} = Supervisor.start_link([Example.Endpoint], strategy: :one_for_one)
Process.sleep(:infinity)

(also available as a GitHub Gist: Phoenix LiveView event batching · GitHub)

4 Likes