GenStage: spread demand over multiple producers

Multiple producers with a single consumer, is there a way to ensure that the demand is spreaded across all producers ?
to be more detailed, we have the following stages pipeline:
[A] - [B] - [C]

Stage A … a variable number of instances, processing incoming data
Stage B … a single instance, think of a collector, shall distribute demand across producers, caching demands
Stage C … a variable number of instances which do the actual processing

I would like to implement stage [A] as a producer, but didn’t find any hint the GenStage source where the consumer may spread the demand across multiple producers.
Another way would be to implement stage [A] simply as GenServer instances and manage to spread the demand across them in stage [B], which will then be only a “producer”.

You can control the demand from the producer but you can control how often you ask from consumers (B and C) by implementing the handle_subcribe/handle_cancel callbacks and setting the demand mode to :manual.


@josevalim thx for the fast (very fast ;-)) reply, it wasn’t very clear to me, but now it is, I’ve overseen that the handle_subscribe callback is called on both sides, consumer and producer.

thank you and best regards

But in the case stage B is set to :manual, I see no way for stage B to determine if there exists demands from stage C. In the case stage C is busy, no demand, stage B shall stop asking stage A (there could be more then 1, which shall be asked round-robin alike) for events to prevent buffer overflow. In the case there is demand from stage C, stage B shall ask for events …
I’m wondering if it would be better to let the pipeline start at stage B (B would be a :producer) and the original stage A would be implemented as simple GenServer(s), registering to stage B (the new start of the pipeline) and stage B requests events from them in the handle_demand method.

Is there a way to get multiple producers, though?

I have a project where a producer connects to an external websocket API and that API streams more data than a single producer can handle.

I’m manually creating separate producers for different endpoints of the API, but it’s a bit cumbersome and I’m looking for a cleaner way to distribute the incoming demand, ideally in such a way that I don’t need to manually measure how busy each endpoint is.

I think so… maybe this can help


That one appears to be about ordering events produced by different producers.

My question is making sure the producer can handle the volume of data streaming in from the external API. (Ideally via running concurrent copies of the same producer, if that is feasible)

Oh sorry… anyway I have done a genstage pipelines with multiple producers using a queue as input.

1 Like

How did you set up the multiple producers and control the number of them?

I have an old repo where I made some tests with Genstage 2 years ago…

It might not be up to date, but I remember I could push in the queue from multiple sources.

I use an ETS based queue, for speed.

I have one producer setup in the pipeline_sup, but I don’t see why You could not extends to multiple.