First time trying Flow, some novice questions

Today I started studying Flow, and I have some questions.
Please keep in mind that this is an example code, I want to use Flow.

So, it’s:

  • Reading a text file
  • Remove empty lines and newlines
  • Count the words
  • Generate a comma separated string line with word and count

This is what I’ve done so far:

def count_words(filepath) do
  File.stream!(filepath)
  |> Flow.from_enumerable()
  |> Flow.flat_map(&String.split(&1, " "))
  |> Flow.reject(&(&1 == "\n"))
  |> Flow.map(&String.replace(&1, "\n", ""))
  |> Flow.partition()
  |> Flow.reduce(fn -> %{} end, fn word, acc ->
    Map.update(acc, word, 1, &(&1 + 1))
  end)
  |> Enum.map(fn {word, count} -> "#{word},#{count}\n" end)
end

My questions:

  • Is it ok to chain multiple Flow operations? (reject, map…) or should I put everything in the reducer function?
  • IIUC The last Enum.map is not executed in parallel, but if I try to use Flow.map I got this error:
    ** (ArgumentError) map/2 cannot be called after group_by/reduce/emit_and_reduce operation (use on_trigger/2 if you want to further emit events or the accumulated state)
    which is quite informative, but still don’t get it

Cheers!

1 Like

Yes.

Putting everything in the reducer would create a flow that slightly shuffles your elements and then effectively reduces serially.

Because there are no events anymore after the reduce, you just have a single value that was reduced into.

Ok, thanks.

I am quite embarassed by that. For some reasons, I was thinking in Enumerables when Flow functions works on Flows (Flow — Flow v1.2.4).

(Sorry for the very silly questions, I’m studying Flow alone and I have no one to ask silly questions to…)

Cheers!

1 Like

As with the Enum functions, the Flow functions operate on flows, yes, but they don’t necessarily return a flow.

Now my question is:

  • What is the correct way to transform the result of that reduce in a parallel way?

Should I create another Flow from the result of the reduce?

Cheers

Flow.reduce returns a flow, so you should be able to chain further Flow functions.

Hmmm… Seems as if I so far totally misunderstood the use of Flow.reduce/3 then…

Flow.reduce does indeed return a Flow, but I can’t call Flow.map on it, as I get the error I posted in the original post.

Following the suggestion in the error message, I was able to use on_trigger on the flow, to map over the result of Flow.reduce:

def count_words(filepath, :flow) do
  File.stream!(filepath)
  |> Flow.from_enumerable()
  |> Flow.flat_map(&String.split(&1, " "))
  |> Flow.reject(&(&1 == "\n"))
  |> Flow.map(&String.replace(&1, "\n", ""))
  |> Flow.map(&String.replace(&1, ",", " "))
  |> Flow.partition()
  |> Flow.reduce(fn -> %{} end, fn word, acc ->
    Map.update(acc, word, 1, &(&1 + 1))
  end)
  |> Flow.on_trigger(fn wordcount ->
    {Enum.map(wordcount, fn {w, c} -> "#{w},#{c}\n" end), wordcount}
  end)
  |> Enum.into([])
end

IIUC the Enum.map inside the on_trigger run sequentially on the batches, while the trigger function itself is parallel.

Thank you

1 Like

I’m here because I am working through the Concurrent Data Processing in Elixir book. There’s a point in the bit on Flows where there is an error in the code. On page 102, this is suggested:

|> Flow.reject(&(&1.type == "closed"))
|> Flow.partition(key: {:key, :country})
|> Flow.group_by(& &1.country)
|> Flow.map(fn {country, data} -> {country, Enum.count(data)} end)
|> Enum.to_list()

The result of this code is a similar error:

iex(4)> Airports.open_airports
** (ArgumentError) map/2 cannot be called after group_by/reduce/emit_and_reduce operation (use on_trigger/2 if you want to further emit events or the accumulated state)
    (flow 1.2.4) lib/flow.ex:1976: Flow.add_mapper/3
    (airports 0.1.0) lib/airports.ex:26: Airports.open_airports/0
    iex:4: (file)

To anyone else who may have arrived by the same path, making this change will allow you to continue unabated:

    |> Flow.reject(&(&1.type == "closed"))
    |> Flow.partition(key: {:key, :country})
    |> Flow.group_by(& &1.country)
    |> Flow.on_trigger(fn count ->
      {Enum.map(count, fn {country, data} -> {country, Enum.count(data)} end), count}
    end)
    |> Enum.to_list()
1 Like