Collecting Flow results into map

Hello,

we are trying to parallelize processing operation with Flow and run into the situation when small datasets resulted in empty output.

The concise version of the problematic section is following:

defmodule TestFlow do
  alias Experimental.Flow

  def test(range) do
    range
    |> Flow.from_enumerable()
    |> Flow.reduce(fn -> %{even: [], odd: []} end, fn(number, acc) -> 
      push_number(number, acc) 
    end)
    |> Enum.into(%{})
  end
           
  defp push_number(number, acc) do
    if rem(number, 2) == 0 do
      %{even: [number | acc.even], odd: acc.odd}
    else
      %{even: acc.even, odd: [number | acc.odd]}
    end
  end
end

What would an idiomatic solution to collecting Flow.reduce results into a map?

Since the keys are identical for each partition, each partition overwrites the values from the partition “before” it.

The chain is roughly like this:

   S      # Flow.from_enumerable
/ | | \
0 1 2 3   # Flow.reduce
\ | | /
   E      # Enum.into

Our "starting list looks like this: [1,2,3] and we have a prtitioner of rem(x, 4) (simplified).

So the distinct partitions will generate the following output:

0: %{even: [], odd: []}
1: %{even: [], odd: [1]}
2: %{even: [2], odd: []}
3: %{even: [], odd: [3]}

Enum.into/2s merge strategy is very simple: drop old value, and the order items arive from the partitions is random.

So at the end you will always see the result of the partition that has handed over the result last.

I think you should do flow |> Enum.into([]) |> merge_fun() to get the desired result.

One last warning: if you do require ordering in any sense, please stick to Stream!

Thank you for the prompt explanation! Background mechanics explanation gave a really clear picture of what’s going on.