Pass message from handle_info to handle_demand

in gen_stage, how I pass message from handle_info to handle_demand.
I want to subscribe redis but message is received at handle_info instead handle_demand

Can you describe in more detail what you are trying to accomplish?

  • If the message is a notification that data is ready to enter the pipeline then handle_demand isn’t relevant as that is governed by the consumers after the producer. The message should simply change the producer state in such a way that it can use the ready data once its consumer issues a new handle_demand (or process it immediately if there is already outstanding demand).
  • If the message is a request for more data from the pipeline then this is consumer, not a producer issue.

In GenStage you can pass events to consumers from all callbacks in a producer, not just from handle_demand. Handle_demand just happens to be the callback called when a consumer requests more events. Handling how many events you return and how much demand was given/already fulfilled can happen async over multiple called callbacks and must not happen solely in handle_demand.

i.e. handle_info can return

{:noreply, events, producer_state}

where events is the list of events to release to the next stage. That being said, the number of events being released should not exceed buffered demand (tracked somewhere inside producer_state). Any available events exceeding the buffered demand needs to wait until the next handle_demand call. That is part of the GenStage contract.

buffering the demand is a case that must be explicitly considered by developers implementing producers.

So what I need to do: append new events from handle_info to new_state.queue and dequeue in handle_demand?

It is a bit more complicated than that - a producer has to buffer demand and events. Consider the following scenario:

  1. When the producer is initialized there is neither buffered demand nor buffered events.
  2. Once a consumer is subscribed it can demand events via handle_demand. It could request 1000 events. But as the producer doesn’t have any events it has to buffer demand somewhere in new_producer_state. So handle_demand returns {:noreply, [], new_producer_state}, i.e. handle_demand returns without releasing any events.
  3. The producer may then get a handle_info indicating that raw data for new events is available. That raw data my be enough to generate 1200 events. Given that demand for 1000 events is buffered handle_info can return {:noreply, events, new2_producer_state} where events is a list of 1000 events. The remaining 200 events (or the raw data to generate them) has to be buffered somewhere in new2_producer_state (i.e. buffered events).
  4. The producer may now receive another handle_demand for another 1000 events. handle_demand can immediately release the remaining 200 events in events2 by returning {:noreply, events2, new3_producer_state} while at the same time buffering demand for the remaining 800 events somewhere inside new3_producer_state.
  5. etc.
1 Like

The problem with how you proposed to handle things is that for as long as no events are released there won’t be any new calls to handle_demand. Consumers will ask for events once and depend on the producer to fullfil that (to at least a certain percentage) before asking again. That’s why the producer does need to keep track of demand it received.

Thank for all of you.

I didn’t know can push message using handle_demand:

def handle_info({:message, _channel, data, _pid}, state) do
      {:noreply, [data], state}
end