Today I started studying Flow, and I have some questions.
Please keep in mind that this is an example code, I want to use Flow.
So, it’s:
Reading a text file
Remove empty lines and newlines
Count the words
Generate a comma separated string line with word and count
This is what I’ve done so far:
def count_words(filepath) do
File.stream!(filepath)
|> Flow.from_enumerable()
|> Flow.flat_map(&String.split(&1, " "))
|> Flow.reject(&(&1 == "\n"))
|> Flow.map(&String.replace(&1, "\n", ""))
|> Flow.partition()
|> Flow.reduce(fn -> %{} end, fn word, acc ->
Map.update(acc, word, 1, &(&1 + 1))
end)
|> Enum.map(fn {word, count} -> "#{word},#{count}\n" end)
end
My questions:
Is it ok to chain multiple Flow operations? (reject
, map
…) or should I put everything in the reducer function?
IIUC The last Enum.map
is not executed in parallel, but if I try to use Flow.map
I got this error:
** (ArgumentError) map/2 cannot be called after group_by/reduce/emit_and_reduce operation (use on_trigger/2 if you want to further emit events or the accumulated state)
which is quite informative, but still don’t get it
Cheers!
1 Like
NobbZ
July 17, 2023, 3:34pm
2
Yes.
Putting everything in the reducer would create a flow that slightly shuffles your elements and then effectively reduces serially.
carloratm:
IIUC The last Enum.map
is not executed in parallel, but if I try to use Flow.map
I got this error:
** (ArgumentError) map/2 cannot be called after group_by/reduce/emit_and_reduce operation (use on_trigger/2 if you want to further emit events or the accumulated state)
which is quite informative, but still don’t get it
Because there are no events anymore after the reduce
, you just have a single value that was reduced into.
Ok, thanks.
I am quite embarassed by that. For some reasons, I was thinking in Enumerables when Flow functions works on Flows (Flow — Flow v1.2.4 ).
(Sorry for the very silly questions, I’m studying Flow alone and I have no one to ask silly questions to…)
Cheers!
1 Like
NobbZ
July 18, 2023, 6:08am
4
As with the Enum
functions, the Flow
functions operate on flows, yes, but they don’t necessarily return a flow.
Now my question is:
What is the correct way to transform the result of that reduce in a parallel way?
Should I create another Flow from the result of the reduce?
Cheers
Flow.reduce
returns a flow, so you should be able to chain further Flow
functions.
NobbZ
July 18, 2023, 8:19am
7
Hmmm… Seems as if I so far totally misunderstood the use of Flow.reduce/3
then…
Flow.reduce
does indeed return a Flow
, but I can’t call Flow.map
on it, as I get the error I posted in the original post.
Following the suggestion in the error message, I was able to use on_trigger
on the flow, to map over the result of Flow.reduce
:
def count_words(filepath, :flow) do
File.stream!(filepath)
|> Flow.from_enumerable()
|> Flow.flat_map(&String.split(&1, " "))
|> Flow.reject(&(&1 == "\n"))
|> Flow.map(&String.replace(&1, "\n", ""))
|> Flow.map(&String.replace(&1, ",", " "))
|> Flow.partition()
|> Flow.reduce(fn -> %{} end, fn word, acc ->
Map.update(acc, word, 1, &(&1 + 1))
end)
|> Flow.on_trigger(fn wordcount ->
{Enum.map(wordcount, fn {w, c} -> "#{w},#{c}\n" end), wordcount}
end)
|> Enum.into([])
end
IIUC the Enum.map
inside the on_trigger
run sequentially on the batches, while the trigger function itself is parallel.
Thank you