in gen_stage, how I pass message from handle_info to handle_demand.
I want to subscribe redis but message is received at handle_info instead handle_demand
Can you describe in more detail what you are trying to accomplish?
- If the message is a notification that data is ready to enter the pipeline then
handle_demand
isn’t relevant as that is governed by the consumers after the producer. The message should simply change the producer state in such a way that it can use the ready data once its consumer issues a newhandle_demand
(or process it immediately if there is already outstanding demand). - If the message is a request for more data from the pipeline then this is consumer, not a producer issue.
In GenStage you can pass events to consumers from all callbacks in a producer, not just from handle_demand. Handle_demand just happens to be the callback called when a consumer requests more events. Handling how many events you return and how much demand was given/already fulfilled can happen async over multiple called callbacks and must not happen solely in handle_demand.
i.e. handle_info
can return
{:noreply, events, producer_state}
where events
is the list of events to release to the next stage. That being said, the number of events being released should not exceed buffered demand (tracked somewhere inside producer_state
). Any available events exceeding the buffered demand needs to wait until the next handle_demand
call. That is part of the GenStage contract.
buffering the demand is a case that must be explicitly considered by developers implementing producers.
So what I need to do: append new events from handle_info to new_state.queue and dequeue in handle_demand?
It is a bit more complicated than that - a producer has to buffer demand and events. Consider the following scenario:
- When the producer is initialized there is neither buffered demand nor buffered events.
- Once a consumer is subscribed it can demand events via
handle_demand
. It could request 1000 events. But as the producer doesn’t have any events it has to buffer demand somewhere innew_producer_state
. Sohandle_demand
returns{:noreply, [], new_producer_state}
, i.e.handle_demand
returns without releasing any events. - The producer may then get a
handle_info
indicating that raw data for new events is available. That raw data my be enough to generate 1200 events. Given that demand for 1000 events is bufferedhandle_info
can return{:noreply, events, new2_producer_state}
whereevents
is a list of 1000 events. The remaining 200 events (or the raw data to generate them) has to be buffered somewhere innew2_producer_state
(i.e. buffered events). - The producer may now receive another
handle_demand
for another 1000 events.handle_demand
can immediately release the remaining 200 events inevents2
by returning{:noreply, events2, new3_producer_state}
while at the same time buffering demand for the remaining 800 events somewhere insidenew3_producer_state
. - etc.
The problem with how you proposed to handle things is that for as long as no events are released there won’t be any new calls to handle_demand. Consumers will ask for events once and depend on the producer to fullfil that (to at least a certain percentage) before asking again. That’s why the producer does need to keep track of demand it received.
Thank for all of you.
I didn’t know can push message using handle_demand:
def handle_info({:message, _channel, data, _pid}, state) do
{:noreply, [data], state}
end