GenStage docs QueueBroadcaster example code question

The (excellent) GenStage documentation includes a QueueBroadcaster module as an example for “a more robust implementation” with “tighter control over the events and demand by tracking … data locally”. I’ve included the example code in its entirety below for reference. My question concerns the inclusion of from in the queued data.

Following GenServer best practices, QueueBroadcaster provides sync_notify/2 as public API for placing an event into this GenStage producer. That function simply does a GenStage.call/3 which, when handled by handle_call/3, enqueues the tuple {from, event} in the internal queue held in state and calls dispatch_events/3 with the current state pending_demand. The handle_call return results from the dispatch_events/3 function, which when there is positive demand, dequeues {from, event} tuples from the queue in state, and for each tuple calls GenStage.reply(from, :ok) and adds the event to a list to be delivered in the returned :no_reply tuple.

All is well so far, but here’s the question. Why enqueue the value from and make the GenStage.reply(from, :ok) call at all? The reasons this seems superfluous are:

  • Since the public API sync_notify/2 function properly encapsulated the GenStage.call/3 for :notify messaging, any call from sync_notify/2 will have from == self(). So GenStage.reply(from, :ok) will send a message to itself, and since the QueueBroadcaster module does not provide a handle_info/2 function to actually process that message, the process effectively sends itself a message that it ignores.

  • Even if there were a handle_info/2 function in QueueBroadcaster, sending a simple :ok without any indication of the event that was handled would be of very limited value.

I would suggest the example code could be simplified by dispensing of tracking from in state (via queue) and removing the ignored GenStage.reply/2 call altogether. There would be no change in functionality and there would be no need for those learning GenStage to unwind the question: Why is from being kept in state?

Am I missing something?

defmodule QueueBroadcaster do
  use GenStage

  @doc "Starts the broadcaster."
  def start_link() do
    GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  @doc "Sends an event and returns only after the event is dispatched."
  def sync_notify(event, timeout \\ 5000) do
    GenStage.call(__MODULE__, {:notify, event}, timeout)
  end

  ## Callbacks

  def init(:ok) do
    {:producer, {:queue.new, 0}, dispatcher: GenStage.BroadcastDispatcher}
  end

  def handle_call({:notify, event}, from, {queue, pending_demand}) do
    queue = :queue.in({from, event}, queue)
    dispatch_events(queue, pending_demand, [])
  end

  def handle_demand(incoming_demand, {queue, pending_demand}) do
    dispatch_events(queue, incoming_demand + pending_demand, [])
  end

  defp dispatch_events(queue, 0, events) do
    {:noreply, Enum.reverse(events), {queue, 0}}
  end

  defp dispatch_events(queue, demand, events) do
    case :queue.out(queue) do
      {{:value, {from, event}}, queue} ->
        GenStage.reply(from, :ok)
        dispatch_events(queue, demand - 1, [event | events])
      {:empty, queue} ->
        {:noreply, Enum.reverse(events), {queue, demand}}
    end
  end
end

Okay, pardon the noise. I see now why the GenStage.reply(from, :ok) is necessary. The sync GenStage.call needs a corresponding GenStage.reply to “complete” the call (o/w a timeout occurs).

That’s not what it does. Say pid A calls sync_notify then the GenStage.call will lookup the pid registered with the name QueueBroadcaster, say pid B and queue an event.

Pid B being the GenStage producer will receive an opaque value from in handle_call, which does later allow GenStage.reply to reply with an answer to the call, because the initial handle_call doesn’t provide an answer with its return value.

This means the sync_notify doesn’t return for pid A until the event has been dispatched even if there hasn’t been any demand at the time the {:notify, event} message was received by pid B/the producer.