First time trying Flow, some novice questions

Today I started studying Flow, and I have some questions.
Please keep in mind that this is an example code, I want to use Flow.

So, it’s:

  • Reading a text file
  • Remove empty lines and newlines
  • Count the words
  • Generate a comma separated string line with word and count

This is what I’ve done so far:

def count_words(filepath) do
  File.stream!(filepath)
  |> Flow.from_enumerable()
  |> Flow.flat_map(&String.split(&1, " "))
  |> Flow.reject(&(&1 == "\n"))
  |> Flow.map(&String.replace(&1, "\n", ""))
  |> Flow.partition()
  |> Flow.reduce(fn -> %{} end, fn word, acc ->
    Map.update(acc, word, 1, &(&1 + 1))
  end)
  |> Enum.map(fn {word, count} -> "#{word},#{count}\n" end)
end

My questions:

  • Is it ok to chain multiple Flow operations? (reject, map…) or should I put everything in the reducer function?
  • IIUC The last Enum.map is not executed in parallel, but if I try to use Flow.map I got this error:
    ** (ArgumentError) map/2 cannot be called after group_by/reduce/emit_and_reduce operation (use on_trigger/2 if you want to further emit events or the accumulated state)
    which is quite informative, but still don’t get it

Cheers!

1 Like

Yes.

Putting everything in the reducer would create a flow that slightly shuffles your elements and then effectively reduces serially.

Because there are no events anymore after the reduce, you just have a single value that was reduced into.

Ok, thanks.

I am quite embarassed by that. For some reasons, I was thinking in Enumerables when Flow functions works on Flows (Flow — Flow v1.2.4).

(Sorry for the very silly questions, I’m studying Flow alone and I have no one to ask silly questions to…)

Cheers!

1 Like

As with the Enum functions, the Flow functions operate on flows, yes, but they don’t necessarily return a flow.

Now my question is:

  • What is the correct way to transform the result of that reduce in a parallel way?

Should I create another Flow from the result of the reduce?

Cheers

Flow.reduce returns a flow, so you should be able to chain further Flow functions.

Hmmm… Seems as if I so far totally misunderstood the use of Flow.reduce/3 then…

Flow.reduce does indeed return a Flow, but I can’t call Flow.map on it, as I get the error I posted in the original post.

Following the suggestion in the error message, I was able to use on_trigger on the flow, to map over the result of Flow.reduce:

def count_words(filepath, :flow) do
  File.stream!(filepath)
  |> Flow.from_enumerable()
  |> Flow.flat_map(&String.split(&1, " "))
  |> Flow.reject(&(&1 == "\n"))
  |> Flow.map(&String.replace(&1, "\n", ""))
  |> Flow.map(&String.replace(&1, ",", " "))
  |> Flow.partition()
  |> Flow.reduce(fn -> %{} end, fn word, acc ->
    Map.update(acc, word, 1, &(&1 + 1))
  end)
  |> Flow.on_trigger(fn wordcount ->
    {Enum.map(wordcount, fn {w, c} -> "#{w},#{c}\n" end), wordcount}
  end)
  |> Enum.into([])
end

IIUC the Enum.map inside the on_trigger run sequentially on the batches, while the trigger function itself is parallel.

Thank you