I have a use case where I want to use Flow to parallelise my workflow and say I have the following code
[1, 5, 10, 15, 20]
|> Flow.from_enumerable(stages: 3, max_demand: 1)
|> Flow.flat_map(fn job ->
IO.inspect("1st iteration: #{inspect(Kernel.self())} processing job #{job}")
# Simulate task processing
:timer.sleep(job * 1000)
# Each task yields 5 sub tasks
["#{job}.1", "#{job}.2", "#{job}.3", "#{job}.4", "#{job}.5"]
end)
|> Flow.partition(stages: 10, max_demand: 1)
|> Flow.map(fn job ->
IO.inspect("2nd iteration: #{inspect(Kernel.self())} processing job #{job}")
# Simulate subtask processing and hangs to debug
:timer.sleep(500_000)
end)
|> Flow.run()
The idea is say I have 5 tasks, and I have maximum 3 stages, each processing one task at a time, but each task processing would yield 5 sub-tasks. Now I want to parallelise the sub-task processing too, hence the Flow.partition
(but I am not sure if this is the way as if I remove it the sub-task processing becomes serial). Now my code kinda works but the output is weird, the output is something like:
β1st iteration: #PID<0.444.0> processing job 1β
β1st iteration: #PID<0.445.0> processing job 5β
β1st iteration: #PID<0.446.0> processing job 10β
β2nd iteration: #PID<0.449.0> processing job 1.5β
β2nd iteration: #PID<0.450.0> processing job 1.2β
β2nd iteration: #PID<0.451.0> processing job 1.1β
β2nd iteration: #PID<0.455.0> processing job 1.4β
β2nd iteration: #PID<0.456.0> processing job 1.3β
β1st iteration: #PID<0.444.0> processing job 15β
β2nd iteration: #PID<0.447.0> processing job 5.1β
β1st iteration: #PID<0.445.0> processing job 20β
β2nd iteration: #PID<0.452.0> processing job 10.5β #why would we process this subtasks instead of 5.x first?
β2nd iteration: #PID<0.454.0> processing job 10.3β#why would we process this subtasks instead of 5.x first?
β2nd iteration: #PID<0.448.0> processing job 15.1β#why would we process this subtasks instead of 5.x first?
Now there are two problems with this output, first its only 9 processes handling the sub tasks even though I specified 10 stages. The second problem is highlighted in the output above.
Any ideas why?
Thanks in advance.