Best practice to achieve parallelism in GenStage

Hello. I am playing with GenStage and trying to understand how to reach parallelism in GenStage. For my pipeline I have next tasks:

  1. Read from file line by line
  2. Make a web request using line we read from step 1. Transform response a bit.
  3. Save result form step 2 into file.

Basically, I want step 2 to be handled by several processes. For this I can either have:

  1. Producer -> ProducerConsumer -> Consumer. Producer reads N lines from file, sends it to ProducerConsumer which does Task.async calling the web service and sends results to Consumer.

  2. Producer -> N instances of ProducerConsumer -> Consumer. Here I instantiate N ProducerConsumers and set max_demand to 1. Producer reads N lines from file, sends it to one of ProducerConsumers which just calls the web service and sends results to Consumer. This way each ProducerConsumer handles 1 event only in parallel with others but also ProducerConsumer kinda assumes that there will be only one event in events list which I personally don’t like. I came up with code like this:

{:ok, producer} = Producer.start_link("./file.txt")
{:ok, consumer} = Consumer.start_link()

parallel_workers = 5

  |> Enum.each(
    fn _ ->
      {:ok, producer_consumer} = ProducerConsumer.start_link()
      GenStage.sync_subscribe(producer_consumer, to: producer, max_demand: 1)
      GenStage.sync_subscribe(consumer, to: producer_consumer, max_demand: 1)

I understand that I could use Task.async_stream instead but the goal here is to understand GenStage. So I would like to know which is the more common way to solve task like this.

Welcome to the forum.

I would select 2, but I would also increase max_demand.

The reason is… for back pressure to work efficiently, GenStage try to avoid empty consumer cycle by asking more tasks after processing half of its tasks.

Well, half of 1 is not really useful. IIRC Default is 500, but You can try less.


To be precise, it doesn’t ask after processing half. It asks after processing max_demand - min_demand. So in the scenarios with max_demand: 1 it will ask for one more after processing one. It’s a reasonable enough setup when you’re doing async-heavy code. You’re using back pressure to make sure each worker finishes its slow async work before it picks up another event, meaning events don’t get backed up behind a worker that, eg if you’re making requests, gets stuck on a few requests that time out or are otherwise unusually slow.


If you’re using GenStage to get to parallelism you should also look at flow, which is an abstraction on top of GenStage to make parallel computation simpler to setup.


And perhaps Broadway as well, though I am not really sure how one would fit it into this scenario.