Approach to GenServer to avoid copying data

I have the following scenario:

  • a live view which displays a large dataset
  • an API client that receive notification to update the dataset (controller)

I have am trying this approach:

  # live view
  def mount(assigns, session, socket) do
    {:ok, con} = Controller.start("...sesssion_data...")

    {:ok, 
    socket
    |> assign(controller: con))    
  end

  @impl true
  def handle_event(
        "send_data",
        %{"data" => data},
        %{assigns: %{controller: con}} = socket
      ) do
    con = Controller.push_data(con, data)

    {:noreply, 
    socket
    |> assign(controller: con))    
  end

  @impl true
  def handle_info(
        {:controller, msg},
        %{assigns: %{controller: con}} = socket
      ) do
    con = Controller.handle_message(con, msg)

    socket

    {:noreply, 
    socket
    |> assign(controller: con))    
  end


defmodule Controller do
  use GenServer

  alias Controller

  defstruct pid: nil, processing: false, super_large_dataset: []

  # API to be used by calling (parent) process
  def start(id) do
    {:ok, pid} = GenServer.start_link(__MODULE__, {self(), id})
    
    %Controller{pid: pid}
  end

  def push_data(%Controller{pid: pid} = con, data) do
    GenServer.cast(pid, {:push_data, data})
    %{con | processing: true}
  end

  def handle_message(con, {:data_processed, data}) do
    %{con | processing: false, super_large_dataset: [data | con.super_large_dataset]}
  end

  def handle_message(con, msg) do
    con
  end

  @impl true
  def init({parent_pid, id}) do
    {:ok, %{parent_pid: parent_pid, id: id}}
  end

  @impl true
  def handle_cast({:push_data, data}, state) do
    Process.sleep(500) # emulate processing
    send(state.parent_pid, {:controller, {:data_processed, data}})
    {:noreply, state}
  end

end

The idea here is to keep the super_large_dataset in the live view process to avoid copying it when data chunks are processed.

Is this pattern the way to go?

It is hard for me to understand what you try to achieve in this code. I think your objective is to avoid blocking the live view process for the lengthy processing of the data, right? Because otherwise you could just do it right there. If this’s the case, you could just use a Task or plain old spawn/1, and you can avoid all the trouble dealing with a GenServer.

Usually a GenServer is used when you need the process to live for extended period of the time, If you start_link a GenServer from a Liveview process, it wont live long. This is because a LV process is tied to the client websocket, and it breaks all the time.

1 Like

Yes, a Task could suffice, but the catch I didn’t mention in my example code, is that the GenServer “side” of the state will also have a large dataset.

The general flow is:

  • live view generate a bunch of data (which is large and I want to avoid to copy)
  • live view sends a small message to genserver
  • genserver use small message to generate its own data, which is also big and that live view don’t need in full
  • genserver sends a small message back to live view

I would do it all in the live view process, but I must do some computation which are blocking and require a lot of data, and I thought it was better to split the dataset between computational data and display data.

I realize that the genserver will die if live view disconnect but that’s fine.

1 Like

How large is large here exactly? What type of data?

1 Like

About 50 megabytes on each side. It’s a map and data is apended in a flow (about every 800ms). And depending on what was appended, a few keys are retrieved and a checksum is sent to the live view (producing the checksum takes about 500ms).

I read that passing a map like this between processes would force copy it, and I don’t want to copy 50MB every 800ms as this is per live view.

Hey @kuon yeah that’s definitely large enough you wouldn’t want to copy it around. If you’re only sending a few keys have you considered using an :ets table? While ets keys and values are still copied it would only be the specific keys and values queried by your liveview process instead of the whole table.

When you have non-trivial split states in 2 processes, it will always be troublesome. I think now is the time to revisit some of the decisions you made:

  • How long the processing will block the LV process? Can that be optimized?
  • Can you use LV streaming to reduce the amount of state kept in the LV process?
  • Can you use ets to keep some states in the shared memory, like @benwilson512 has suggested?

:ets could help, but my general question was more about designing a library which handle streaming, the problem is that I have to “forward” messages emitted by the library back to it for processing. It works, but I wonder if this pattern is good.

Have you considered PubSub for this?

1 Like

Maybe this is a good use case for LiveView streams to avoid keeping the whole large collection in memory for each LiveView process? The LiveView would send the large collection only once, won’t keep it in memory, and will only send updates. As it seems that you always append (or prepend?) new data, that’s particularly easy with streams.

Ok, I made some more tests and I came up with a solution that I think is more manageable.

I use :ets for data storage.

Then for “message forwarding”, I found a trick to properly pre-process my messages before sending them to my live view.

instead of this:

  @impl true
  def handle_event( "generate_stream", _, socket) do
    MyLibrary.generate_stream(self())

    socket
    |> noreply()
  end

  @impl true
  def handle_info(raw_message, socket) do
    msg = MyLibrary.process_message_and_get_display_data(raw_message)
    ...
    socket
    |> noreply()
  end

When the live view directly gets “raw messages”.

I do this:

  @impl true
  def handle_event( "generate_stream", _, socket) do
    MyLibrary.generate_stream(self())

    socket
    |> noreply()
  end

  @impl true
  def handle_info({:my_library, msg}, socket) do
    msg = MyLibrary.get_display_data(msg)
    ...
    socket
    |> noreply()
  end

and, in MyLibrary.generate_stream I do this pattern:

def generate_stream(dst_pid) do
    task = Task.async(fn ->
        receive do
           raw_msg -> 
               send(dst_pid, nicely_processed_message(raw_msg))
               # here recurse to receive again...
        end
    end)
    original_generate_stream(task.pid)
    Task.await(task)
end

The task will block while doing the computation, write to ets, and send nicely_processed_message to the live view with keys to display data.

I realize my example above may be a bit blurry, but it is quite complicate to replicate the gist of it without sharing everything. But in the end, I removed the GenServer and everything is way cleaner.

If I read your code correctly the super large dataset is stored in the liveview process, not the genserver process.

So yes, as said above, a simple Task would make the code far easier to understand.

Edit: I did not see your last answer.