Flow group_by() result differs from Enum.group_by

Hi all,
Could please someone explain me why am I getting incorrect result with Flow.group_by()

Because you have neither told us what your code looks like, nor what incorrect results you get and what makes them incorrect in your opinion.

I tend to say, that the result is correct but your assumption does not match your

2 Likes

Sorry, I thought I will be able to add details after creation of topic.
I have the following data:

  {609628859800,
   %{
     caddr: {192, 168, 129, 26},
     client: [%{len: 6, pos: 2101578, ...}, %{len: 6, ...}],
     cport: 53283,
     proto: 6,
     saddr: {...},
     ...
   }},
  {15020939,
   %{
     caddr: {192, 168, 129, ...},
     client: [%{len: 35, ...}, %{...}],
     cport: 52158,
     proto: 17,
     ...
   }},
   ...

And two methods:

1)       data
          |> Enum.group_by(fn ({x,y}) -> y.proto end)
          |> Enum.map(fn ({x, y}) -> %{label: x, count: length(y)} end)
          |> Enum.to_list
          |> IO.inspect

That gives me correct result:

        [%{count: 919, label: 6}, %{count: 191, label: 17}]
2)       data
          |> Flow.from_enumerable()
          |> Flow.group_by(fn ({x,y}) -> y.proto end)
          |> Flow.map(fn ({x, y}) -> %{label: x, count: length(y)} end)
          |> Enum.to_list
          |> IO.inspect

That gives me:

     [
      %{count: 90, label: 6},
      %{count: 20, label: 17},
      %{count: 829, label: 6},
      %{count: 171, label: 17}
    ]

I have no way to test right now, but have you tried to partition after the group_by?

This is just q guess though.

Invoking Flow.patrition after group_by changes nothing

Invoking it before gives more sparse result

[
  %{count: 104, label: 6},
  %{count: 24, label: 17},
  %{count: 98, label: 6},
  %{count: 21, label: 17},
  %{count: 116, label: 6},
  %{count: 22, label: 17},
  %{count: 140, label: 6},
  %{count: 26, label: 17},
  %{count: 116, label: 6},
  %{count: 21, label: 17},
  %{count: 123, label: 6},
  %{count: 24, label: 17},
  %{count: 103, label: 6},
  %{count: 28, label: 17},
  %{count: 119, label: 6},
  %{count: 25, label: 17}
]

Then I have no clue.

I would try including Flow.reduce/3 after a partition:

File.stream!("path/to/some/file")
|> Flow.from_enumerable()
|> Flow.flat_map(&String.split(&1, " "))
|> Flow.partition()
|> Flow.reduce(fn -> %{} end, fn word, acc ->
  Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()

https://hexdocs.pm/flow/Flow.html#partition/2
https://hexdocs.pm/flow/Flow.html#reduce/3
https://hexdocs.pm/flow/Flow.html#module-partitioning

1 Like

Yep, custom reduce function work fine,
however I’m trying to figure out whats wrong with my group_by() which is also a reduce function

You need to call partition/2 before group_by/3. The usual purpose of partitioning before a reduce operation is to ensure that events that belong together are handled by the same process. This is important to know when using group_by/3 since you want to group. The examples in the Flow docs deal with this in detail.

In your case what is probably missing is choosing the right key to partition by. Take a look at the docs of partition/2. You will need something like:

|> Flow.partition(key: &elem(&1, 1).proto)
3 Likes

Thank you much, guys!
Got it.