Flow stages from Flow.map

I have a use case where I want to use Flow to parallelise my workflow and say I have the following code

    [1, 5, 10, 15, 20]
    |> Flow.from_enumerable(stages: 3, max_demand: 1)
    |> Flow.flat_map(fn job ->
      IO.inspect("1st iteration: #{inspect(Kernel.self())} processing job #{job}")

      # Simulate task processing
      :timer.sleep(job * 1000)

      # Each task yields 5 sub tasks
      ["#{job}.1", "#{job}.2", "#{job}.3", "#{job}.4", "#{job}.5"]
    end)
    |> Flow.partition(stages: 10, max_demand: 1)
    |> Flow.map(fn job ->
      IO.inspect("2nd iteration: #{inspect(Kernel.self())} processing job #{job}")

      # Simulate subtask processing and hangs to debug
      :timer.sleep(500_000)
    end)
    |> Flow.run()

The idea is say I have 5 tasks, and I have maximum 3 stages, each processing one task at a time, but each task processing would yield 5 sub-tasks. Now I want to parallelise the sub-task processing too, hence the Flow.partition (but I am not sure if this is the way as if I remove it the sub-task processing becomes serial). Now my code kinda works but the output is weird, the output is something like:

β€œ1st iteration: #PID<0.444.0> processing job 1”
β€œ1st iteration: #PID<0.445.0> processing job 5”
β€œ1st iteration: #PID<0.446.0> processing job 10”
β€œ2nd iteration: #PID<0.449.0> processing job 1.5”
β€œ2nd iteration: #PID<0.450.0> processing job 1.2”
β€œ2nd iteration: #PID<0.451.0> processing job 1.1”
β€œ2nd iteration: #PID<0.455.0> processing job 1.4”
β€œ2nd iteration: #PID<0.456.0> processing job 1.3”
β€œ1st iteration: #PID<0.444.0> processing job 15”
β€œ2nd iteration: #PID<0.447.0> processing job 5.1”
β€œ1st iteration: #PID<0.445.0> processing job 20”
β€œ2nd iteration: #PID<0.452.0> processing job 10.5” #why would we process this subtasks instead of 5.x first?
β€œ2nd iteration: #PID<0.454.0> processing job 10.3”#why would we process this subtasks instead of 5.x first?
β€œ2nd iteration: #PID<0.448.0> processing job 15.1”#why would we process this subtasks instead of 5.x first?

Now there are two problems with this output, first its only 9 processes handling the sub tasks even though I specified 10 stages. The second problem is highlighted in the output above.

Any ideas why?

Thanks in advance.

The behaviour you see is the result of the way GenStage distributes events between partitions by default. It uses :erlang.phash2(event, number_of_partitions) to determine the index of the partition where an event will go. This works well in most cases, but fails if the hashes of your events happen to have a low variance.

You can configure the way Flow (and in turn GenStage) will distribute your events by using the :key or :hash options of Flow.partition/2. Learn more in the docs of GenStage.PartitionDispatcher.

To understand what is happening exactly in your case it’s useful to map all events to their hashes:

iex(5)> [1, 5, 10, 15, 20] |>
...(5)> Enum.flat_map(&["#{&1}.1", "#{&1}.2", "#{&1}.3", "#{&1}.4", "#{&1}.5"]) |>
...(5)> Enum.reduce(%{}, fn e, m -> Map.update(m, :erlang.phash2(e, 10), [e], &[e | &1]) end) |>
...(5)> Enum.map(fn {k, v} -> {k, Enum.reverse(v)} end)
[
  {0, ["5.1", "15.5", "20.2"]},
  {1, ["15.1"]},
  {2, ["1.5", "5.3", "10.1", "10.4", "15.2", "15.4", "20.4", "20.5"]},
  {3, ["1.2", "5.2", "5.4", "15.3"]},
  {4, ["1.1", "5.5", "20.1", "20.3"]},
  {5, ["10.5"]},
  {7, ["10.3"]},
  {8, ["1.4", "10.2"]},
  {9, ["1.3"]}
]

As you can see there are no events assigned to partition 6, so that explains why only 9 stages are working. You can also see that all the 5.x events except for 5.1 are stuck behind 1.x events.

1 Like