Hello. I am playing with GenStage and trying to understand how to reach parallelism in GenStage. For my pipeline I have next tasks:
- Read from file line by line
- Make a web request using line we read from step 1. Transform response a bit.
- Save result form step 2 into file.
Basically, I want step 2 to be handled by several processes. For this I can either have:
-
Producer -> ProducerConsumer -> Consumer. Producer reads N lines from file, sends it to ProducerConsumer which does
Task.async
calling the web service and sends results to Consumer. -
Producer -> N instances of ProducerConsumer -> Consumer. Here I instantiate N ProducerConsumers and set
max_demand
to 1. Producer reads N lines from file, sends it to one of ProducerConsumers which just calls the web service and sends results to Consumer. This way each ProducerConsumer handles 1 event only in parallel with others but also ProducerConsumer kinda assumes that there will be only one event in events list which I personally don’t like. I came up with code like this:
{:ok, producer} = Producer.start_link("./file.txt")
{:ok, consumer} = Consumer.start_link()
parallel_workers = 5
1..parallel_workers
|> Enum.each(
fn _ ->
{:ok, producer_consumer} = ProducerConsumer.start_link()
GenStage.sync_subscribe(producer_consumer, to: producer, max_demand: 1)
GenStage.sync_subscribe(consumer, to: producer_consumer, max_demand: 1)
end
)
I understand that I could use Task.async_stream
instead but the goal here is to understand GenStage. So I would like to know which is the more common way to solve task like this.