Liveview + PubSub messages delay?

Hello there,

I have a Liveview that subscribes to a Pubsub topic and starts a Genserver when I click in a button.

The Genserver sends a message back to the Liveview using the Pubsub, those messages seems to only be processed after the map ends:

1k

If I increase the data, from 1000 to 10000000, I see that some of messages being received before:

Is there a way to receive those message before?

I have created a repository to reproduce: GitHub - rafaeliga/liveview_pubsub_update: Sample project to test Liveview + Pubsub.

The processes are running concurrently and there isn’t really a way to force the subscriber to process a message before the publisher can continue in a PubSub, I think. Looks like the sending is faster than receiving in this case (or the messages go slower through the PubSub system).

Anyway, why do you want the receiving to occur before? Is it leading to a time delay?

I think something like GenStage/Flow could provide backpressure, leading to blocking of the producer before consumers have had time to consume the items.

it’s not that sending it’s faster. what happens is that publishing the message won’t stop a process of running its reductions.

on your solution, what you want to be synchronous and what you want to be asynchronous?
The idea is to design the system to solve the thing the way you need. PubSub is a asynchronous solution.

My use case is showing a CSV import / insert data in database in real time.

What your example shows is that a) it is extremely quick to map through a list when you do no work in the map function and b) pubsub works. You can probably move on to the next stage.

You could delay processing the next chunk until after the message is received by calling the liveview instead of broadcasting, but I don’t imagine you want to do that in practice.

Sorry if my initial post isnt clear enough, let me try with more data:

There is a time difference between the Pubsub send message and Liveview receiving.

That time increases if I send more messages(more data):

dev
100: ~16ms
1000: ~50ms
100000: starts at: ~700ms, ends at: ~3000ms
1000000: starts at: ~7000ms, ends at: 38000ms

prod(fly.io)
1000: ~180ms
1000000: crashed

Why do we have this difference if I have more data? Its related to the Pubsub or the Liveview on receiving these messages?

Because message passing between process is not a synchronous procedure.

  • process A sends a message to the message box of process B.
  • scheduler alocate time for any process that has something to do.
  • process B starts processing messages on his message box.

PubSub publishing message is a non-blocking operation, just like doing a cast with a GenServer.
Since publishing is non-blocking, the process A will keep doing whatever his doing until the scheduler stop it from running(in your case, keep publishing messages until it finishes starting).

The delays you’re experiencing in your test is not particular to any implementation of the PubSub or Liveview but instead of how you choosed to implement your synthetic load.

Hi @cevado! I think there’s some misunderstanding between what @rafaeliga wants and what you are explaining. I don’t think he is expecting to process anything synchronously.

If I understand correctly, what @rafaeliga meant is that there’s some considerable delay from sending to receiving the message; this is nothing to do with the concurrency model per se.

You mentioned reductions and this made me think that perhaps, the LiveView process’s mailbox is getting too many messages and the delay he’s experiencing is the delta between processing the messages.

However, based on the data he provided, it seems that there’s a considerable delay between sending the message and receiving it, even with a low amount of messages to process.

Curiously enough, from what I’ve heard about Elixir/Phoenix in the past, I’d expect that broadcasting messages would be a little bit more performant than that. But perhaps, this is a question on how to better structure the message passing between the processes (perhaps batching it or something).

1 Like

please look at the code provide…
code that starts the “background processing”:

def handle_event("process", _params, socket) do
    LiveviewPubsubUpdate.Import.start()
    
    {:noreply, socket}
  end

the “background processing”:

def start() do
   GenServer.start(__MODULE__, nil)
 end

 @impl true
 def init(_params) do
   Enum.map(1..1000000, fn datum ->
     Logger.info("process: #{datum}")
     
     Phoenix.PubSub.broadcast(LiveviewPubsubUpdate.PubSub, "import_live", {:message, datum, Timex.now()})
   end)
   
   Logger.info("Enum map finished")
   
   {:ok, nil}
 end

when LiveviewPubsubUpdate.Import.start() is called it is a synchronous process, so start() will release the live view only when the init callback finishes running.
the init callback in the GenServer is that part that publishes.

1 Like

@cevado could you elaborate what you meant by this statement? Bear in mind that even though LiveviewPubsubUpdate.Import.start() is synchronous code, the Phoenix.PubSub.broadcast call is not. That is, messages arrive in the LiveView before the synchronous code finishes processing, which does not seem to be the problem IMHO.

The main question seems to be that after a message is dispatched to the LiveView, there’s some delay before it actually gets processed.

I think that if LiveView was receiving and processing its events fast enough, this perceived delay would not exist. I might be missing something here, but it doesn’t seem to be a problem about concurrency.

Yes, but start is blocking handle_event, which blocks handle_info. start does not return until init returns. This means that handle_event doesn’t return until init returns, which means that the live view is unable to do any handle_info calls until init has published all of the messages.

EDIT: To elaborate further: A live view is a genserver, and a genserver is a single process. A single process can only run code linearly, and that means that a given callback from a genserver can only run one at a time. As long as handle_event is blocked, the genserver loop of the whole liveview process is blocked, which prevents any handle_info clause from running. If you change your broadcaster to do:

 def init(_params) do   
   send(self(), :broadcast)
   {:ok, nil}
 end

def handle_info(:broadcast, state) do
   Enum.map(1..1000000, fn datum ->
     Logger.info("process: #{datum}")
     
     Phoenix.PubSub.broadcast(LiveviewPubsubUpdate.PubSub, "import_live", {:message, datum, Timex.now()})
   end)
   
   Logger.info("Enum map finished")
  {:noreply, state}
end

You should see more of what you expect.

6 Likes

@benwilson512 I think that handle_event is from another process; how is it blocked by start from another GenServer in this case?

You call start in handle_event. Start is not in another process, it’s in the liveview process. init is in another process, but start waits on init to return before it returns, that’s part of the standard genserver contract see:

iex(4)> defmodule Foo do
...(4)>   use GenServer
...(4)>
...(4)>   def start() do
...(4)>     GenServer.start(__MODULE__, nil)
...(4)>   end
...(4)>
...(4)>
...(4)>   def init(_) do
...(4)>     Process.sleep(1000)
...(4)>     {:ok, nil}
...(4)>   end
...(4)> end
iex(7)> :timer.tc(fn -> Foo.start() end)
{1000836, {:ok, #PID<0.145.0>}}

EDIT: Further reading
https://hexdocs.pm/elixir/GenServer.html#c:init/1

3 Likes

the liveview lives inside a single process for a particular session, so the handle_event of the liveview and the handle_info are blocking each other.

1 Like

Oh, I see, that makes total sense! Thanks for the well-rounded explanation @benwilson512. I completely forgot that the GenServer process everything linearly like you mentioned (because it’s basically just a loop waiting for its time to process something). This definitely is what’s going on here.

2 Likes

Thank you @benwilson512 !

The code below worked:

 def init(_params) do   
   send(self(), :broadcast)
   {:ok, nil}
 end

def handle_info(:broadcast, state) do

Then looking at the Genserver documentation, handle_continue did the job too:

  @impl true
  def init(_params) do
    {:ok, nil, {:continue, "continue"}}
  end
  
  @impl true
  def handle_continue("continue", state) do
5 Likes