The (excellent) GenStage
documentation includes a QueueBroadcaster
module as an example for “a more robust implementation” with “tighter control over the events and demand by tracking … data locally”. I’ve included the example code in its entirety below for reference. My question concerns the inclusion of from
in the queued data.
Following GenServer
best practices, QueueBroadcaster
provides sync_notify/2
as public API for placing an event
into this GenStage
producer. That function simply does a GenStage.call/3
which, when handled by handle_call/3
, enqueues the tuple {from, event}
in the internal queue held in state and calls dispatch_events/3
with the current state pending_demand
. The handle_call
return results from the dispatch_events/3
function, which when there is positive demand
, dequeues {from, event}
tuples from the queue
in state, and for each tuple calls GenStage.reply(from, :ok)
and adds the event
to a list to be delivered in the returned :no_reply
tuple.
All is well so far, but here’s the question. Why enqueue the value from
and make the GenStage.reply(from, :ok)
call at all? The reasons this seems superfluous are:
-
Since the public API
sync_notify/2
function properly encapsulated theGenStage.call/3
for:notify
messaging, any call fromsync_notify/2
will havefrom == self()
. SoGenStage.reply(from, :ok)
will send a message to itself, and since theQueueBroadcaster
module does not provide ahandle_info/2
function to actually process that message, the process effectively sends itself a message that it ignores. -
Even if there were a
handle_info/2
function inQueueBroadcaster
, sending a simple:ok
without any indication of theevent
that was handled would be of very limited value.
I would suggest the example code could be simplified by dispensing of tracking from
in state (via queue
) and removing the ignored GenStage.reply/2
call altogether. There would be no change in functionality and there would be no need for those learning GenStage
to unwind the question: Why is from
being kept in state?
Am I missing something?
defmodule QueueBroadcaster do
use GenStage
@doc "Starts the broadcaster."
def start_link() do
GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
end
@doc "Sends an event and returns only after the event is dispatched."
def sync_notify(event, timeout \\ 5000) do
GenStage.call(__MODULE__, {:notify, event}, timeout)
end
## Callbacks
def init(:ok) do
{:producer, {:queue.new, 0}, dispatcher: GenStage.BroadcastDispatcher}
end
def handle_call({:notify, event}, from, {queue, pending_demand}) do
queue = :queue.in({from, event}, queue)
dispatch_events(queue, pending_demand, [])
end
def handle_demand(incoming_demand, {queue, pending_demand}) do
dispatch_events(queue, incoming_demand + pending_demand, [])
end
defp dispatch_events(queue, 0, events) do
{:noreply, Enum.reverse(events), {queue, 0}}
end
defp dispatch_events(queue, demand, events) do
case :queue.out(queue) do
{{:value, {from, event}}, queue} ->
GenStage.reply(from, :ok)
dispatch_events(queue, demand - 1, [event | events])
{:empty, queue} ->
{:noreply, Enum.reverse(events), {queue, demand}}
end
end
end