`Stream.chunk_every/4` equivalent for Flow

Context:
While practicing optimizing some exercises, I got stuck this morning trying to use Flow on a piece of code using Stream.

Here is the code :

counter = fn x, acc ->
  Map.update(acc, x, 1, &(&1 + 1))
end

# Create a stream for large files processing
"your_filename.ext"
|> File.stream!()
# Normalize every words
|> Stream.map(&String.downcase/1)
|> Stream.map(&String.replace(&1, ~r"[^a-z0-9]", " "))
# Split with spaces and remove empty words
|> Stream.flat_map(&String.split(&1, " ", trim: true))
# Get every sequences of 3 words in the text
|> Stream.chunk_every(3, 1, :discard)
# Join the sequences
|> Stream.map(&Enum.join(&1, " "))
# Count occurences for 3 words sequences
|> Enum.reduce(%{}, counter)
# Just displays the 10 most used sequences
|> Map.to_list()
|> Enum.sort_by(&elem(&1, 1), :desc)
|> Enum.take(10)
|> IO.inspect()

# Example of output with http://www.gutenberg.org/cache/epub/2009/pg2009.txt :
#    [
#      {"of the same", 320},
#      {"the same species", 130},
#      {"conditions of life", 125},
#      {"in the same", 117},
#      {"of natural selection", 111},
#      {"from each other", 104},
#      {"species of the", 102},
#      {"on the other", 89},
#      {"the other hand", 81},
#      {"the case of", 78}
#    ]

In short, this piece of code retrieves all the 3-word sequences of a text and displays the 10 that appear the most.

Problem:
I can’t find the equivalent of the line Stream.chunk_every(3, 1, :discard) for Flow.

Here is my current code:

Mix.install([{:flow, "~> 1.2"}])

counter = fn x, acc ->
  Map.update(acc, x, 1, &(&1 + 1))
end

"your_filename.ext"
|> File.stream!(read_ahead: 100_000)
|> Flow.from_enumerable()
|> Flow.map(&String.downcase/1)
|> Flow.map(&String.replace(&1, ~r"[^a-z0-9]", " "))
|> Flow.flat_map(&String.split(&1, " ", trim: true))
# |> Stream.chunk_every(3, 1, :discard)      <= ????????
|> Flow.map(&Enum.join(&1, " "))
|> Flow.partition()
|> Flow.reduce(&Map.new/0, counter)
|> Enum.to_list()
|> Enum.sort_by(&elem(&1, 1), :desc)
|> Enum.take(10)
|> IO.inspect()
1 Like