Limit phoenix liveviews stream size

Hello everyone,

I’m adding the new phoenix 1.7 liveview 0.18.16 stream to my app, to show live logs.

This logs list can be quite long as intented, but I would like to set a limit, say to a thousand logs displayed, and deleting the oldest ones from the dom while new logs are pouring in.

Before stream, I was simply using Enum.take(1000), but with the stream functions, I don’t know how to remove oldest logs :thinking: (apart from using a liveview hook, but… it seems not in line with the goal of liveview)

thanks!

1 Like

Heex


<tbody id="logs" phx-update="stream">
  <tr :for={{dom_id, log} <- @streams.logs}} id={dom_id}>
    <td><%= log.message %></td>
    <td><%= log.timestamp %></td>
  </tr>
</tbody>

Module


defmodule MyApp.MyLiveView do
  use Phoenix.LiveView

  @log_max 1000

  def mount(_params, _session, socket) do
    stream_logs(socket)
    {:ok, assigns(socket, :log_count,0)
  end

  def handle_info({:new_log, log}, socket) do
  socket =
    socket
    |> assign(:log_count, socket.assigns.log_count + 1)
    |> stream_insert(:logs, log)
    |>  then take_logs(socket)

  {:noreply, socket}
end

  defp stream_logs(socket) do
    stream(socket, :logs, [], dom_id: &("log-#{&1.id}"))
  end

defp take_logs(socket) when socket.assigns.log_count > @log_max do
[ delete_log| _ ] = @streams.logs
stream_delete(socket, :logs, delete_log)
socket

end
defp take_logs(socket), do: socket

end

|> #check to see how many you’ve streamed and if it is over 1000, start stream_delete the oldest item.

Sure, but then you have to keep track server side of all the dom ids (at least) currently rendered client side. The rendering would be more performant, but it’s more or less like a good old assign (but more complicated), am I right?

I modified it to be more complete. I’m still pretty new and I haven’t tried to run it but I think it should work based on documentation.

1 Like

Thanks,

I think this part [ delete_log| _ ] = @streams.logs is a bit flaky, since the all point is to not have the rendered logs stored server side after being sent to the client. So I expect stream.logs to not have records of the logs sent to the client a while ago.

Keeping track of the count of sent logs could be a nice track though, but we would need a function stream_delete that can take an offset of some sort instead of a specific element.

I think with this problem it’s good to keep in mind where the state lives.

In traditional assigns, state lives on the server (list of logs in assigns).

In stream assigns, state lives in the client browser (in the form of the list of DOM nodes with ids).

If you want to manipulate or access the state beyond what Phoenix API provides to you, you need to do it within the same medium (either server-side Elixir with traditional assigns, or client-side Javascript with streams).

So you will need to either implement JS hooks or track state server-side (DOM ids).

One way would be to update Phoenix stream API so it accepts options such as max stream length (e.g. stream(socket, :logs, logs) ← make it accept additional options). But that would be on Phoenix devs.

1 Like

If you use the dom_id you can calculate what dom_id to delete with stream_delete_by_dom_id,
then you only need to keep track of the counter, pretty cheap state to hold in the process ?

defmodule StreamerWeb.IndexLive do
  use Phoenix.LiveView

  @dom_idx 0
  @max_items_in_list 5

  def mount(_, _, socket) do
    if connected?(socket) do
      Process.send_after(self(), :tick, 5_000)
    end

    {data, counter} =
      [%{id: 0, name: "Thor"}, %{id: 1, name: "Sif"}, %{id: 2, name: "Odin"}]
      |> Enum.reduce({[], 0}, fn element, {l, c} ->
        {[Map.put(element, :dom_id, c) | l], c + 1}
      end)

    socket =
      socket
      |> stream(:data, data, dom_id: &"dog-#{&1.dom_id}")
      |> assign(counter: counter)

    {:ok, socket}
  end

  def handle_info(:tick, socket) do
    counter = socket.assigns.counter

    Process.send_after(self(), :tick, 5_000)

    socket =
      socket
      |> stream_insert(
        :data,
        %{id: counter, name: "name-#{counter}", dom_id: counter},
        at: 0
      )
      |> maybe_delete(counter)
      |> IO.inspect(label: "stream")

    {:noreply, assign(socket, counter: counter + 1)}
  end

  defp maybe_delete(socket, counter) do
    cond do
      counter >= @max_items_in_list ->
        stream_delete_by_dom_id(socket, :data, "dog-#{counter - @max_items_in_list}")

      true ->
        socket
    end
  end

  def render(assigns) do
    ~H"""
    IndexLive
    <div id="dogs" phx-update="stream">
      <div :for={{dom_id, dog} <- @streams.data} id={dom_id}>
        <%= dog.name %>
      </div>
    </div>
    """
  end
end
1 Like

Thanks @molsio !

First of all, your solution works :+1:

Aind, I find it checking all my pre-requisites:

  • the memory server side is kept under control (only an int on the assign)
  • the insert/delete is atomic within only one websocket frame, so unsync between client state and server state seems unlikely
  • no hook
  • it’s simple enough

Perfect :slight_smile: