Having problems understanding the QueueBroadcaster GenStage example

Hello,

I’m trying to wrap my head around QueueBroadcaster example in https://hexdocs.pm/gen_stage/Experimental.GenStage.html and there are few place which I can’t comprehend so far and will be very grateful for shedding light on them.

  1. Why handle_sync is used when later on GenStage.reply(from, :ok) is used to unblock the caller? Why not use handle_async?
  2. Upon playing with the example I noticed that Consumer receives events after {:noreply, Enum.reverse(events), {queue, demand}} being returned, what kind of magic is that? Internally it’s a plain GenServer.call(stage, request, timeout), so how and why does this happen?
1 Like

There is no handle_sync or handle_async in that page. What do you mean?

That’s the GenStage behaviour. Instead of {:noreply, state}, which is what a GenServer would return, all callbacks expect {:noreply, events, state}, allowing you to dispatch events to consumers from any callback.

1 Like

Sorry, my bad - it’s sync_notify and async_notify:

Why events are sent with sync_notify when later on GenStage.reply(from, :ok) is used to unblock the caller? Why not use async_notify?
Actually, I tried to do that myself and Producer didn’t receive an event :\

You could have written it async by doing a cast instead of call and invoking handle_cast or by replying directly in handle_call. The point of that example though is to precisely store the event in an internal queue so we unblock the process sending the event only when the event is dispatched. That’s a form of back-pressure itself. It guarantees a process won’t flood the producer by sending hundreds of messages before the producer being able to process even the first one.

1 Like

That means that overriding sync_notify is purely a stylistic decision - after each handle_call, handle_cast and handle_info callback return, events queue will be re-evaluated and in the case of available events and pending demand presence, those events will be automatically dispatched to the consumers. Is this correct?

After reading the documentation again and playing with the example again I understood that sync_notify is used to send a notification to the Consumers in form of interprocess message rather then enqueuing new event.

Thank you for clarifications!