The (excellent) GenStage documentation includes a QueueBroadcaster module as an example for “a more robust implementation” with “tighter control over the events and demand by tracking … data locally”. I’ve included the example code in its entirety below for reference. My question concerns the inclusion of from in the queued data.
Following GenServer best practices, QueueBroadcaster provides sync_notify/2 as public API for placing an event into this GenStage producer. That function simply does a GenStage.call/3 which, when handled by handle_call/3, enqueues the tuple {from, event} in the internal queue held in state and calls dispatch_events/3 with the current state pending_demand. The handle_call return results from the dispatch_events/3 function, which when there is positive demand, dequeues {from, event} tuples from the queue in state, and for each tuple calls GenStage.reply(from, :ok) and adds the event to a list to be delivered in the returned :no_reply tuple.
All is well so far, but here’s the question. Why enqueue the value from and make the GenStage.reply(from, :ok) call at all? The reasons this seems superfluous are:
-
Since the public API
sync_notify/2function properly encapsulated theGenStage.call/3for:notifymessaging, any call fromsync_notify/2will havefrom == self(). SoGenStage.reply(from, :ok)will send a message to itself, and since theQueueBroadcastermodule does not provide ahandle_info/2function to actually process that message, the process effectively sends itself a message that it ignores. -
Even if there were a
handle_info/2function inQueueBroadcaster, sending a simple:okwithout any indication of theeventthat was handled would be of very limited value.
I would suggest the example code could be simplified by dispensing of tracking from in state (via queue) and removing the ignored GenStage.reply/2 call altogether. There would be no change in functionality and there would be no need for those learning GenStage to unwind the question: Why is from being kept in state?
Am I missing something?
defmodule QueueBroadcaster do
use GenStage
@doc "Starts the broadcaster."
def start_link() do
GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
end
@doc "Sends an event and returns only after the event is dispatched."
def sync_notify(event, timeout \\ 5000) do
GenStage.call(__MODULE__, {:notify, event}, timeout)
end
## Callbacks
def init(:ok) do
{:producer, {:queue.new, 0}, dispatcher: GenStage.BroadcastDispatcher}
end
def handle_call({:notify, event}, from, {queue, pending_demand}) do
queue = :queue.in({from, event}, queue)
dispatch_events(queue, pending_demand, [])
end
def handle_demand(incoming_demand, {queue, pending_demand}) do
dispatch_events(queue, incoming_demand + pending_demand, [])
end
defp dispatch_events(queue, 0, events) do
{:noreply, Enum.reverse(events), {queue, 0}}
end
defp dispatch_events(queue, demand, events) do
case :queue.out(queue) do
{{:value, {from, event}}, queue} ->
GenStage.reply(from, :ok)
dispatch_events(queue, demand - 1, [event | events])
{:empty, queue} ->
{:noreply, Enum.reverse(events), {queue, demand}}
end
end
end






















