Hi all,
Could please someone explain me why am I getting incorrect result with Flow.group_by()
Because you have neither told us what your code looks like, nor what incorrect results you get and what makes them incorrect in your opinion.
I tend to say, that the result is correct but your assumption does not match your
Sorry, I thought I will be able to add details after creation of topic.
I have the following data:
{609628859800,
%{
caddr: {192, 168, 129, 26},
client: [%{len: 6, pos: 2101578, ...}, %{len: 6, ...}],
cport: 53283,
proto: 6,
saddr: {...},
...
}},
{15020939,
%{
caddr: {192, 168, 129, ...},
client: [%{len: 35, ...}, %{...}],
cport: 52158,
proto: 17,
...
}},
...
And two methods:
1) data
|> Enum.group_by(fn ({x,y}) -> y.proto end)
|> Enum.map(fn ({x, y}) -> %{label: x, count: length(y)} end)
|> Enum.to_list
|> IO.inspect
That gives me correct result:
[%{count: 919, label: 6}, %{count: 191, label: 17}]
2) data
|> Flow.from_enumerable()
|> Flow.group_by(fn ({x,y}) -> y.proto end)
|> Flow.map(fn ({x, y}) -> %{label: x, count: length(y)} end)
|> Enum.to_list
|> IO.inspect
That gives me:
[
%{count: 90, label: 6},
%{count: 20, label: 17},
%{count: 829, label: 6},
%{count: 171, label: 17}
]
I have no way to test right now, but have you tried to partition
after the group_by
?
This is just q guess though.
Invoking Flow.patrition after group_by changes nothing
Invoking it before gives more sparse result
[
%{count: 104, label: 6},
%{count: 24, label: 17},
%{count: 98, label: 6},
%{count: 21, label: 17},
%{count: 116, label: 6},
%{count: 22, label: 17},
%{count: 140, label: 6},
%{count: 26, label: 17},
%{count: 116, label: 6},
%{count: 21, label: 17},
%{count: 123, label: 6},
%{count: 24, label: 17},
%{count: 103, label: 6},
%{count: 28, label: 17},
%{count: 119, label: 6},
%{count: 25, label: 17}
]
Then I have no clue.
I would try including Flow.reduce/3
after a partition
:
File.stream!("path/to/some/file")
|> Flow.from_enumerable()
|> Flow.flat_map(&String.split(&1, " "))
|> Flow.partition()
|> Flow.reduce(fn -> %{} end, fn word, acc ->
Map.update(acc, word, 1, & &1 + 1)
end)
|> Enum.to_list()
https://hexdocs.pm/flow/Flow.html#partition/2
https://hexdocs.pm/flow/Flow.html#reduce/3
https://hexdocs.pm/flow/Flow.html#module-partitioning
Yep, custom reduce function work fine,
however I’m trying to figure out whats wrong with my group_by() which is also a reduce function
You need to call partition/2
before group_by/3
. The usual purpose of partitioning before a reduce operation is to ensure that events that belong together are handled by the same process. This is important to know when using group_by/3
since you want to group. The examples in the Flow docs deal with this in detail.
In your case what is probably missing is choosing the right key to partition by. Take a look at the docs of partition/2
. You will need something like:
|> Flow.partition(key: &elem(&1, 1).proto)
Thank you much, guys!
Got it.