Flow partition

I am exploring Flow partitions. Basically I want to group the numbers by if dividable by 3.

    |> Flow.partition( key: fn e -> rem(e, 3) end, stages: 4)
    |> Flow.reduce(fn -> [] end, fn e, acc -> [e | acc] end)
    |> Flow.departition(fn -> [] end, &[&1 | &2], &Enum.sort/1)
    |> Enum.to_list

If I have the stages more than 3, say 4 I have the following result
[[[], [7, 4, 1], [8, 5, 2], [9, 6, 3, 0]]]
However if I have the stages to be set as 3, then I have the following
[[[], [8, 5, 2], [9, 7, 6, 4, 3, 1, 0]]]

Any en-lighting on this? Many Thanks.

There are 3 possible keys: 0, 1, and 2. Those keys are going to be hashed from which the partitions are calculated. (The hash: option allows generating hashes for partitioning directly, on the other hand)

So of those 3 keys, two of those are evidently hashing to very similar values, so when it partitions out the hash value space between the stages there is one reducer that isn’t getting any values. I’m guessing that it does an equal partition between stages, so with 3 stages each gets a third of the hash space and with 4 stages each gets a fourth of the hash space. This would make sense as the stages can not know in advance the distribution of keys.

This would only be a problem when there are few stages and a narrow and sufficiently similar set of keys to hash on. Which is exactly what you have in your example, resulting in one reducer getting zero inputs. As the number of stages increases, the hashing of the 3 values becomes apparent: three reducers get values and the others don’t.



the default hash function used in Flow is :erlang.phash2

So if you modify your code to look like this:

  |> Flow.partition( hash: fn e -> {e, rem(e, 3)} end, stages: num_stages ) 
  |> Flow.reduce(fn -> [] end, fn e, acc -> [e | acc] end) 
 |> Flow.departition(fn -> [] end, &[&1 | &2], &Enum.sort/1) |> Enum.to_list 

You’ll get something that is perhaps more like what you were expecting ->

[[[7, 4, 1], [8, 5, 2], [9, 6, 3, 0]]]



Yes, precisely! :heart: :key returns a value that will still be hashed. Use :hash if you want your custom hashing (that value is not changed in any way.

1 Like

Thanks for the clarification and nice samples!