I have a couple of questions here on GenStage. I am using it as an Event Manager.
Question 1)
The code snippet from announcing-genstage shows the callback routines for a synchronous event manager. It is synchronous because thesync_notify
method invokes GenStage.call
and the callback is implemented as ahandle_call
instead of a handle_cast
- And in dispatch events, a GenStage.reply
is called.
My question then is, to whom is the “non” recipient of the :noreply
return tuple?
A) When called via handle_demand
, which is called under the covers from a consumer process (via backpressure mechanism) that is subscribed to the producer GenStage, does the consumer process then get a no reply? but then, this is contrary to the events list it receives e.g. Enum.reverse(events)
– so clearly it does get a reply, so then why the:noreply
?
B) When called via handle_call
, the code that issues the sync_notify
clearly won’t get the reply of the events list but does get an :ok
reply of the GenStage.reply(from, :ok).
So my question is why is the tag :noreply valid when its relevant in B but not A?
def handle_call({:notify, event}, from, {queue, demand}) do
dispatch_events(:queue.in({from, event}, queue), demand, [])
end
def handle_demand(incoming_demand, {queue, demand}) do
dispatch_events(queue, incoming_demand + demand, [])
end
defp dispatch_events(queue, demand, events) do
with d when d > 0 <- demand,
{item, queue} = :queue.out(queue),
{:value, {from, event}} <- item do
GenStage.reply(from, :ok)
dispatch_events(queue, demand - 1, [event | events])
else
_ -> {:noreply, Enum.reverse(events), {queue, demand}}
end
end
Question 2
For Consumers of events, how can we match on an event key, so you don’t waste bandwidth processing events/queueing events that say are not relevant. This is not the same as the question as trying to match the key of a particular producer but more a key in the actual event msg payload. I’m at the moment, specifying a clause in a list comprehension for example. key == Kernel.elem(event, 1).
Looking to see if GenStage
supports this explicitly where you can specify a match rule at the event payload level… So for example if this consumer handles alerts for Bob that have an event id “bob”, it shouldn’t be receiving messages for “Amy” or “Carl”, let alone filter them out. PartitionDispatcher
looks interesting here but what if the keys are not static but dynamic?
Original code E.g.:
def handle_events(events, _from, {key, write_pid}) do
for event <- events, key == Kernel.elem(event, 1) do
process_event(event, write_pid)
end
{:noreply, [], {key, write_pid}}
end
Apologies in advance if I have missed something in the way GenStage works intended or not…
Cheers Bibek