The processes are running concurrently and there isn’t really a way to force the subscriber to process a message before the publisher can continue in a PubSub, I think. Looks like the sending is faster than receiving in this case (or the messages go slower through the PubSub system).
Anyway, why do you want the receiving to occur before? Is it leading to a time delay?
I think something like GenStage/Flow could provide backpressure, leading to blocking of the producer before consumers have had time to consume the items.
it’s not that sending it’s faster. what happens is that publishing the message won’t stop a process of running its reductions.
on your solution, what you want to be synchronous and what you want to be asynchronous?
The idea is to design the system to solve the thing the way you need. PubSub is a asynchronous solution.
What your example shows is that a) it is extremely quick to map through a list when you do no work in the map function and b) pubsub works. You can probably move on to the next stage.
You could delay processing the next chunk until after the message is received by calling the liveview instead of broadcasting, but I don’t imagine you want to do that in practice.
Because message passing between process is not a synchronous procedure.
process A sends a message to the message box of process B.
scheduler alocate time for any process that has something to do.
process B starts processing messages on his message box.
PubSub publishing message is a non-blocking operation, just like doing a cast with a GenServer.
Since publishing is non-blocking, the process A will keep doing whatever his doing until the scheduler stop it from running(in your case, keep publishing messages until it finishes starting).
The delays you’re experiencing in your test is not particular to any implementation of the PubSub or Liveview but instead of how you choosed to implement your synthetic load.
Hi @cevado! I think there’s some misunderstanding between what @rafaeliga wants and what you are explaining. I don’t think he is expecting to process anything synchronously.
If I understand correctly, what @rafaeliga meant is that there’s some considerable delay from sending to receiving the message; this is nothing to do with the concurrency model per se.
You mentioned reductions and this made me think that perhaps, the LiveView process’s mailbox is getting too many messages and the delay he’s experiencing is the delta between processing the messages.
However, based on the data he provided, it seems that there’s a considerable delay between sending the message and receiving it, even with a low amount of messages to process.
Curiously enough, from what I’ve heard about Elixir/Phoenix in the past, I’d expect that broadcasting messages would be a little bit more performant than that. But perhaps, this is a question on how to better structure the message passing between the processes (perhaps batching it or something).
please look at the code provide…
code that starts the “background processing”:
def handle_event("process", _params, socket) do
LiveviewPubsubUpdate.Import.start()
{:noreply, socket}
end
the “background processing”:
def start() do
GenServer.start(__MODULE__, nil)
end
@impl true
def init(_params) do
Enum.map(1..1000000, fn datum ->
Logger.info("process: #{datum}")
Phoenix.PubSub.broadcast(LiveviewPubsubUpdate.PubSub, "import_live", {:message, datum, Timex.now()})
end)
Logger.info("Enum map finished")
{:ok, nil}
end
when LiveviewPubsubUpdate.Import.start() is called it is a synchronous process, so start() will release the live view only when the init callback finishes running.
the init callback in the GenServer is that part that publishes.
@cevado could you elaborate what you meant by this statement? Bear in mind that even though LiveviewPubsubUpdate.Import.start() is synchronous code, the Phoenix.PubSub.broadcast call is not. That is, messages arrive in the LiveView before the synchronous code finishes processing, which does not seem to be the problem IMHO.
The main question seems to be that after a message is dispatched to the LiveView, there’s some delay before it actually gets processed.
I think that if LiveView was receiving and processing its events fast enough, this perceived delay would not exist. I might be missing something here, but it doesn’t seem to be a problem about concurrency.
Yes, but start is blocking handle_event, which blocks handle_info. start does not return until init returns. This means that handle_event doesn’t return until init returns, which means that the live view is unable to do any handle_info calls until init has published all of the messages.
EDIT: To elaborate further: A live view is a genserver, and a genserver is a single process. A single process can only run code linearly, and that means that a given callback from a genserver can only run one at a time. As long as handle_event is blocked, the genserver loop of the whole liveview process is blocked, which prevents any handle_info clause from running. If you change your broadcaster to do:
def init(_params) do
send(self(), :broadcast)
{:ok, nil}
end
def handle_info(:broadcast, state) do
Enum.map(1..1000000, fn datum ->
Logger.info("process: #{datum}")
Phoenix.PubSub.broadcast(LiveviewPubsubUpdate.PubSub, "import_live", {:message, datum, Timex.now()})
end)
Logger.info("Enum map finished")
{:noreply, state}
end
You call start in handle_event. Start is not in another process, it’s in the liveview process. init is in another process, but start waits on init to return before it returns, that’s part of the standard genserver contract see:
iex(4)> defmodule Foo do
...(4)> use GenServer
...(4)>
...(4)> def start() do
...(4)> GenServer.start(__MODULE__, nil)
...(4)> end
...(4)>
...(4)>
...(4)> def init(_) do
...(4)> Process.sleep(1000)
...(4)> {:ok, nil}
...(4)> end
...(4)> end
iex(7)> :timer.tc(fn -> Foo.start() end)
{1000836, {:ok, #PID<0.145.0>}}
Oh, I see, that makes total sense! Thanks for the well-rounded explanation @benwilson512. I completely forgot that the GenServer process everything linearly like you mentioned (because it’s basically just a loop waiting for its time to process something). This definitely is what’s going on here.