GenStage: How to have consumer ask for demand when it has finished processing a certain batch

So I’m using GenStage in a project and I’ve got the following setup:

P <- C, where the demand is always a single event

When Consumer receives an event it has to be processed and this can take up to 50 minutes. The processing of this event is a Task which once it has finished send a :DOWN message to the Consumer which now knows he is ready to process another event (by setting its state as ready)

However, what is happening is that while a given Task is processing, if another event arrives, the Consumer will keep asking the Producer which will give the Consumer this new event and the event is lost because my Consumer state is set as not being ready to process another one (by lost I mean nothing is done with it). I need this new event to either be kept in the Producer and only be given to the Consumer when it is ready or have the Consumer keep this event in memory somehow and have it be next event to process after the current one has finished.

What are the best practices to achieve this? Is this even a good use case for GenStage? I’m thinking maybe GenStage is meant only for “Fire and Forget” style of event processing where order has not to be kept.

Hallo Sasha,

when your handle_events callback is performing a non-blocking operation (probably a Task.async in your case?) then you’ll have to use a manual demand approach. More or less as described here: https://hexdocs.pm/gen_stage/GenStage.html#module-asynchronous-work-and

In your case, I guess, you’ll be asking the producer with ask/2 again when your task replies to the consumer (and not after a timeout interval like in the example above).

On the producer side then you’ll buffer produced events in a queue or something. If you have just one consumer, then incoming order should be respected by the event handling on the other side right?

hope this helps,

Andrea