Best practice to achieve parallelism in GenStage

Hello. I am playing with GenStage and trying to understand how to reach parallelism in GenStage. For my pipeline I have next tasks:

  1. Read from file line by line
  2. Make a web request using line we read from step 1. Transform response a bit.
  3. Save result form step 2 into file.

Basically, I want step 2 to be handled by several processes. For this I can either have:

  1. Producer -> ProducerConsumer -> Consumer. Producer reads N lines from file, sends it to ProducerConsumer which does Task.async calling the web service and sends results to Consumer.

  2. Producer -> N instances of ProducerConsumer -> Consumer. Here I instantiate N ProducerConsumers and set max_demand to 1. Producer reads N lines from file, sends it to one of ProducerConsumers which just calls the web service and sends results to Consumer. This way each ProducerConsumer handles 1 event only in parallel with others but also ProducerConsumer kinda assumes that there will be only one event in events list which I personally don’t like. I came up with code like this:

{:ok, producer} = Producer.start_link("./file.txt")
{:ok, consumer} = Consumer.start_link()

parallel_workers = 5

1..parallel_workers
  |> Enum.each(
    fn _ ->
      {:ok, producer_consumer} = ProducerConsumer.start_link()
      GenStage.sync_subscribe(producer_consumer, to: producer, max_demand: 1)
      GenStage.sync_subscribe(consumer, to: producer_consumer, max_demand: 1)
    end
)

I understand that I could use Task.async_stream instead but the goal here is to understand GenStage. So I would like to know which is the more common way to solve task like this.

Welcome to the forum.

I would select 2, but I would also increase max_demand.

The reason is… for back pressure to work efficiently, GenStage try to avoid empty consumer cycle by asking more tasks after processing half of its tasks.

Well, half of 1 is not really useful. IIRC Default is 500, but You can try less.

2 Likes

To be precise, it doesn’t ask after processing half. It asks after processing max_demand - min_demand. So in the scenarios with max_demand: 1 it will ask for one more after processing one. It’s a reasonable enough setup when you’re doing async-heavy code. You’re using back pressure to make sure each worker finishes its slow async work before it picks up another event, meaning events don’t get backed up behind a worker that, eg if you’re making requests, gets stuck on a few requests that time out or are otherwise unusually slow.

3 Likes

If you’re using GenStage to get to parallelism you should also look at flow, which is an abstraction on top of GenStage to make parallel computation simpler to setup.

3 Likes

And perhaps Broadway as well, though I am not really sure how one would fit it into this scenario.