GenStage serving as event manager

I have a couple of questions here on GenStage. I am using it as an Event Manager.

Question 1)

The code snippet from announcing-genstage shows the callback routines for a synchronous event manager. It is synchronous because thesync_notify method invokes GenStage.call and the callback is implemented as ahandle_call instead of a handle_cast - And in dispatch events, a GenStage.reply is called.

My question then is, to whom is the “non” recipient of the :noreply return tuple?

A) When called via handle_demand, which is called under the covers from a consumer process (via backpressure mechanism) that is subscribed to the producer GenStage, does the consumer process then get a no reply? but then, this is contrary to the events list it receives e.g. Enum.reverse(events) – so clearly it does get a reply, so then why the:noreply ?

B) When called via handle_call, the code that issues the sync_notify clearly won’t get the reply of the events list but does get an :ok reply of the GenStage.reply(from, :ok).

So my question is why is the tag :noreply valid when its relevant in B but not A?

  def handle_call({:notify, event}, from, {queue, demand}) do
    dispatch_events(:queue.in({from, event}, queue), demand, [])
  end

  def handle_demand(incoming_demand, {queue, demand}) do
    dispatch_events(queue, incoming_demand + demand, [])
  end

  defp dispatch_events(queue, demand, events) do
    with d when d > 0 <- demand,
         {item, queue} = :queue.out(queue),
         {:value, {from, event}} <- item do
      GenStage.reply(from, :ok)
      dispatch_events(queue, demand - 1, [event | events])
    else
      _ -> {:noreply, Enum.reverse(events), {queue, demand}}
    end
  end

Question 2

For Consumers of events, how can we match on an event key, so you don’t waste bandwidth processing events/queueing events that say are not relevant. This is not the same as the question as trying to match the key of a particular producer but more a key in the actual event msg payload. I’m at the moment, specifying a clause in a list comprehension for example. key == Kernel.elem(event, 1). Looking to see if GenStage supports this explicitly where you can specify a match rule at the event payload level… So for example if this consumer handles alerts for Bob that have an event id “bob”, it shouldn’t be receiving messages for “Amy” or “Carl”, let alone filter them out. PartitionDispatcher looks interesting here but what if the keys are not static but dynamic?

Original code E.g.:

  def handle_events(events, _from, {key, write_pid}) do
    for event <- events, key == Kernel.elem(event, 1) do
      process_event(event, write_pid)
    end

    {:noreply, [], {key, write_pid}}    
  end

Apologies in advance if I have missed something in the way GenStage works intended or not…

Cheers Bibek

2 Likes

Question 1)

The :reply and :noreply tuples are always related to whoever sent the message. So in the handle_call case, it is whoever called sync_notify. For the demand case, it would be whatever consumer sent the demand. Note though you can’t reply on handle_demand (as you can’t reply on handle_info).

What we are doing in that code sample in handle_call is that, instead of replying immediately, we are storing the client reference and not replying via the tuple. If there is demand, we then reply, otherwise we will wait until the consumer asks for events before replying. In both cases, the client will be waiting until GenStage.reply is called. When we send the event to the consumer, then we effectively reply to the client.

Maybe a simpler example is to use handle_call with GenStage.reply outside of that context. For example, this “classic” handle_call:

def handle_call(:ping, _from, state) do
  {:reply, :pong, state}
end

is equivalent to:

def handle_call(:ping, from, state) do
  GenStage.reply(from, :pong)
  {:noreply, state}
end

The reply technique is used when you can’t reply immediately (it needs to be done through another callback) or even if you want to reply as soon as you get the message because you need to do long running work afterwards.

Question 2)

Currently this is not supported. We may end-up supporting something similar, but it would have to be specific per dispatcher (i.e. some dispatchers would be able to support it but others do not). One possible option is to have multiple producers per key and then you subscribe to the producers that have the keys you care about. However, unless you have too many messages, I wouldn’t worry about it.

3 Likes

Thank you this was helpful.

The two equivalent versions of the handle_call case is insightful.

In my head, I’m going to catalog the second function version :noreply as :no_immediate_full_reply_guarantee, e.g. as in dispatch_events as well.

Feels truer to actual outcome.

Cheers

1 Like