Batching pubsub events to throttle socket updates

Marlus presented techniques for optimizing liveview. One of his last points was to batch socket updates (timestamp 25:55).

All I’ve imagined so far is updating a structure in the socket that the view doesn’t use and then copying it to the active socket assigns peridoically. How would you approach this?

Hi! That presentation is really great, right? I don’t have access to the code, but from what I could see (watching again the relevant part) is:

  • There’s a Batcher module that implements an on_mount hook
  • It intercepts handle_event (could also do the same for handle_info…)
  • See on_mount/1 and attach_hook/4 for how to accomplish the above
  • The :handle_event lifecycle hook calls module.handle_batch with a similar signature to the original either after a given number of items are collected or a timeout
  • Batcher is a behaviour that declares the handle_batch above

I could imagine implementing the above with no external dependency. Maybe something like GenStage — gen_stage v1.2.1 or Broadway — Broadway v1.1.0 can be useful for more complex cases.

A question might be where to store the current batch items. I think there are a few options:

  • Socket.private using put_private/3, so that changes are not tracked
  • A GenServer
  • Database (if one needs persistence?!)
1 Like

Here’s a single file demo implementation:

# Single-file Phoenix LiveView demo showing how to batch events to reduce/control the flow of UI updates.
#
# Run with `elixir batcher_demo.exs`, then open a browser at http://localhost:5001/.
#
# For comparison, the "inc" event is batched while the "dec" event is not.
#
# References
#
# https://github.com/phoenixframework/phoenix_live_view/blob/main/.github/single-file-samples/main.exs
# https://elixirforum.com/t/optimizing-liveview-for-realtime-applications-by-marlus-saraiva-elixirconf-eu-2023/59265?u=rhcarvalho
# https://elixirforum.com/t/batching-pubsub-events-to-throttle-socket-updates/64658/2?u=rhcarvalho

Application.put_env(:sample, Example.Endpoint,
  http: [ip: {127, 0, 0, 1}, port: 5001],
  server: true,
  live_view: [signing_salt: "aaaaaaaa"],
  secret_key_base: String.duplicate("a", 64)
)

Mix.install([
  {:plug_cowboy, "~> 2.5"},
  {:jason, "~> 1.0"},
  {:phoenix, "~> 1.7"},
  {:phoenix_live_view, "~> 0.20.17"}
])

# build the LiveView JavaScript assets (this needs mix and npm available in your path!)
path = Phoenix.LiveView.__info__(:compile)[:source] |> Path.dirname() |> Path.join("../")
System.cmd("mix", ["deps.get"], cd: path, into: IO.binstream())
System.cmd("npm", ["install"], cd: Path.join(path, "./assets"), into: IO.binstream())
System.cmd("mix", ["assets.build"], cd: path, into: IO.binstream())

defmodule Example.ErrorView do
  def render(template, _), do: Phoenix.Controller.status_message_from_template(template)
end

defmodule Example.HomeLive do
  use Phoenix.LiveView, layout: {__MODULE__, :live}

  on_mount({Batcher, {"inc", handler: __MODULE__, max_size: 5, timeout: 2000}})

  @behaviour Batcher

  def mount(_params, _session, socket) do
    {:ok, assign(socket, :count, 0)}
  end

  def render("live.html", assigns) do
    ~H"""
    <script src="/assets/phoenix/phoenix.js"></script>
    <script src="/assets/phoenix_live_view/phoenix_live_view.js"></script>
    <%!-- uncomment to use enable tailwind --%>
    <%!-- <script src="https://cdn.tailwindcss.com"></script> --%>
    <script>
      let liveSocket = new window.LiveView.LiveSocket("/live", window.Phoenix.Socket)
      liveSocket.connect()
    </script>
    <style>
      * { font-size: 1.1em; }
    </style>
    <%= @inner_content %>
    """
  end

  def render(assigns) do
    ~H"""
    <%= @count %>
    <button phx-click="inc">+</button>
    <button phx-click="dec">-</button>
    """
  end

  def handle_batch("inc", batch, socket) do
    {:noreply, assign(socket, :count, socket.assigns.count + length(batch))}
  end

  def handle_event("dec", _params, socket) do
    {:noreply, assign(socket, :count, socket.assigns.count - 1)}
  end
end

defmodule Batcher do
  @callback handle_batch(
              event :: binary(),
              batch :: [Phoenix.LiveView.unsigned_params()],
              socket :: Phoenix.LiveView.Socket.t()
            ) ::
              {:noreply, Phoenix.LiveView.Socket.t()}
              | {:reply, map(), Phoenix.LiveView.Socket.t()}

  def on_mount({event, opts}, _params, _session, socket) do
    handler = Keyword.fetch!(opts, :handler)
    max_size = Keyword.fetch!(opts, :max_size)
    timeout = Keyword.fetch!(opts, :timeout)

    socket =
      socket
      |> Phoenix.LiveView.attach_hook("batcher_event", :handle_event, fn
        ^event, params, socket ->
          socket =
            case add_to_batch(socket, event, params) do
              {socket, batch} when length(batch) == max_size ->
                flush_batch(socket, event, handler)

              {socket, _} ->
                reschedule_flush_batch(socket, event, timeout)
            end

          {:halt, socket}

        _event, _params, socket ->
          {:cont, socket}
      end)
      |> Phoenix.LiveView.attach_hook("batcher_info", :handle_info, fn
        {:flush_batch, event}, socket ->
          socket =
            case get_batch(socket, event) do
              batch when length(batch) > 0 -> flush_batch(socket, event, handler)
              _ -> socket
            end

          {:halt, socket}

        _msg, socket ->
          {:cont, socket}
      end)

    {:cont, socket}
  end

  defp add_to_batch(socket, event, params) do
    # note: batch events are in reverse order
    batch = [params | get_batch(socket, event)]
    {put_batch(socket, event, batch), batch}
  end

  defp flush_batch(socket, event, handler) do
    socket = cancel_timeout(socket, event)
    batch = get_batch(socket, event) |> Enum.reverse()
    {:noreply, socket} = handler.handle_batch(event, batch, socket)
    socket |> put_batch(event, [])
  end

  defp reschedule_flush_batch(socket, event, timeout) do
    socket = cancel_timeout(socket, event)
    timeout_ref = Process.send_after(self(), {:flush_batch, event}, timeout)
    socket |> put_timeout_ref(event, timeout_ref)
  end

  defp cancel_timeout(socket, event) do
    if timeout_ref = get_timeout_ref(socket, event) do
      Process.cancel_timer(timeout_ref)
    end

    socket |> put_timeout_ref(event, nil)
  end

  defp get_batch(socket, event) do
    socket.private[{__MODULE__, event, :batch}] || []
  end

  def get_timeout_ref(socket, event) do
    socket.private[{__MODULE__, event, :timeout_ref}]
  end

  defp put_batch(socket, event, batch) do
    socket |> Phoenix.LiveView.put_private({__MODULE__, event, :batch}, batch)
  end

  defp put_timeout_ref(socket, event, timeout_ref) do
    socket |> Phoenix.LiveView.put_private({__MODULE__, event, :timeout_ref}, timeout_ref)
  end
end

defmodule Example.Router do
  use Phoenix.Router
  import Phoenix.LiveView.Router

  pipeline :browser do
    plug(:accepts, ["html"])
  end

  scope "/", Example do
    pipe_through(:browser)

    live("/", HomeLive, :index)
  end
end

defmodule Example.Endpoint do
  use Phoenix.Endpoint, otp_app: :sample
  socket("/live", Phoenix.LiveView.Socket)

  plug(Plug.Static, from: {:phoenix, "priv/static"}, at: "/assets/phoenix")
  plug(Plug.Static, from: {:phoenix_live_view, "priv/static"}, at: "/assets/phoenix_live_view")

  plug(Example.Router)
end

{:ok, _} = Supervisor.start_link([Example.Endpoint], strategy: :one_for_one)
Process.sleep(:infinity)

(also available as a GitHub Gist: Phoenix LiveView event batching · GitHub)

3 Likes

First off - very nice and clean!! Thank you!

Your solution exposed me to a couple of facilities I hadn’t encountered before like LiveView.attach_hook and LiveView.put_private.

:+1: I liked your idea that the Batcher could also process handle_info.

:+1: I experimented with reschedule_flush_batch to not cancel the existing timer - only set one to get handle_batch after either the count or the timeout from the first message of a batch. Testament to your design that it was easy to try. Either timeout interpretation could be correct.

2 Likes

This might help you in the future

2 Likes

Well that’s fun and - bonus - now I know you can run Phoenix LiveView from Live Book!

I’ve reworked my version a bit. Thanks again to rhcarvalho. I couldn’t have done this without your version.

I realized I needed

  1. Throttle handle_info calls only - I don’t see handle_event calls at a sufficient rate to be concerned.
    Since the msg param of handle_info can be data of any shape, I added a callback to know when to throttle. The callback can use pattern matching or any other means.
  2. The first message should be handled immediately.
    Actually the first message and any other message when a batch hasn’t been processed since the timeout.
    Let’s say the timeout is 500ms and 1000ms goes by since the last batch. The next message should be released immediately.
  3. Messages should be batched based on timeout not count. I was shooting for some level of realtime-ness without overloading the socket. I want events to be published as available at a slower rate. I didn’t want to hold onto messages indefinitely.
  @behaviour HandleInfoThrottler

  on_mount({HandleInfoThrottler, {:vote_throttler, __MODULE__, 500}})

Here’s my updated version.

  defmodule WeddingGamesWeb.HandleInfoThrottler do
    @moduledoc """
    Throttles handle_info messages based on timeout.

    The first handle_info received is dispatched immediately.

    Subsequent handle_info messages are batched and dispatched after the timeout has elapsed.

    ## Usage
    You add one or more handle_info throttlers to you LiveView by adding on_mount hooks.
    - `throttler_name` is a unique name for the throttler.
    - `callback_module` is the module that implements the HandleInfoThrottler behaviour.
    - `timeout` is the timeout in milliseconds.

    ### on_mount

    ```elixir
    on_mount({HandleInfoThrottler, {:throttler_name, callback_module, timeout}})
    ```

    """

    @doc """
      Return true if the throttler should throttle the message.  Return false when it does not apply.
    """
    @callback throttle_message?(throttler_name :: atom(), msg :: any()) :: boolean()

    @doc """
    Called when one or more messages are ready to be processed.  This is either the first message
    or the collection of batched messages

    """
    @callback handle_batched_info(
                throttler_name :: atom(),
                batch :: list(),
                socket :: Phoenix.LiveView.Socket.t()
              ) :: {:noreply, Phoenix.LiveView.Socket.t()}

    @doc """
    Establishes a handle_info throttler
    """
    def on_mount({throttler_name, handler, timeout}, _params, _session, socket) do
      {:cont,
      socket
      |> put_handler(throttler_name, handler)
      |> put_timeout(throttler_name, timeout)
      |> put_last_batch_at(throttler_name, ~U[2021-01-01T00:00:00Z])
      |> Phoenix.LiveView.attach_hook(
        {"handle_info_throttler_#{throttler_name}"},
        :handle_info,
        fn
          {__MODULE__, throttler_name, :flush_batch}, socket ->
            {:halt, flush_batch(socket, throttler_name)}

          msg, socket ->
            handler = get_handler(socket, throttler_name)

            if handler.batch_message?(throttler_name, msg) do
              {:halt,
                socket
                |> add_to_batch(throttler_name, msg)
                |> maybe_flush_or_schedule(throttler_name)}
            else
              {:cont, socket}
            end
        end
      )}
    end

    defp maybe_flush_or_schedule(socket, throttler_name) do
      last_batch_at = get_last_batch_at(socket, throttler_name)
      timeout = get_timeout(socket, throttler_name)

      ms_since_last_batch = DateTime.diff(DateTime.utc_now(), last_batch_at, :millisecond)

      if ms_since_last_batch > timeout do
        socket
        |> flush_batch(throttler_name)
      else
        socket
        |> schedule_flush_batch_if_needed(throttler_name, timeout - ms_since_last_batch)
      end
    end

    defp flush_batch(socket, throttler_name) do
      handler = get_handler(socket, throttler_name)
      batch = get_batch(socket, throttler_name) |> Enum.reverse()

      {:noreply, socket} = handler.handle_batched_info(throttler_name, batch, socket)

      socket
      |> put_batch(throttler_name, [])
      |> cancel_timeout(throttler_name)
      |> put_last_batch_at(throttler_name, DateTime.utc_now())
    end

    defp schedule_flush_batch_if_needed(socket, throttler_name, send_after_ms) do
      get_timeout_ref(socket, throttler_name)
      |> case do
        nil ->
          schedule_flush_batch(socket, throttler_name, send_after_ms)

        _ ->
          socket
      end
    end

    defp schedule_flush_batch(socket, throttler_name, send_after_ms) do
      socket
      |> put_timeout_ref(
        throttler_name,
        Process.send_after(self(), {__MODULE__, throttler_name, :flush_batch}, send_after_ms)
      )
    end

    defp cancel_timeout(socket, throttler_name) do
      if timeout_ref = get_timeout_ref(socket, throttler_name) do
        Process.cancel_timer(timeout_ref)
      end

      socket |> put_timeout_ref(throttler_name, nil)
    end

    defp add_to_batch(socket, throttler_name, msg) do
      # note: batch throttler_names are in reverse order
      batch = [msg | get_batch(socket, throttler_name)]
      put_batch(socket, throttler_name, batch)
    end

    defp get_batch(socket, throttler_name) do
      socket.private[{__MODULE__, throttler_name, :batch}] || []
    end

    defp get_last_batch_at(socket, throttler_name) do
      socket.private[{__MODULE__, throttler_name, :last_batch_at}]
    end

    defp get_handler(socket, throttler_name) do
      socket.private[{__MODULE__, throttler_name, :handler}]
    end

    defp get_timeout(socket, throttler_name) do
      socket.private[{__MODULE__, throttler_name, :timeout}]
    end

    def get_timeout_ref(socket, throttler_name) do
      socket.private[{__MODULE__, throttler_name, :timeout_ref}]
    end

    defp put_batch(socket, throttler_name, batch) do
      socket |> Phoenix.LiveView.put_private({__MODULE__, throttler_name, :batch}, batch)
    end

    defp put_last_batch_at(socket, throttler_name, last_batch_at) do
      socket
      |> Phoenix.LiveView.put_private({__MODULE__, throttler_name, :last_batch_at}, last_batch_at)
    end

    defp put_handler(socket, throttler_name, handler) do
      socket |> Phoenix.LiveView.put_private({__MODULE__, throttler_name, :handler}, handler)
    end

    defp put_timeout(socket, throttler_name, timeout) do
      socket |> Phoenix.LiveView.put_private({__MODULE__, throttler_name, :timeout}, timeout)
    end

    defp put_timeout_ref(socket, throttler_name, timeout_ref) do
      socket
      |> Phoenix.LiveView.put_private({__MODULE__, throttler_name, :timeout_ref}, timeout_ref)
    end
  end