WordCount a Stream

I wanted to play around with streams a bit, so I decided to build a wordcount over a Stream, but it seems as after_funs result is discarded in the following examplecode:

defmodule WordCount do
  def stream_count(stream) do
    stream
    |> Stream.transform(fn -> "" end, &tokenize/2, fn acc -> IO.puts "called #{acc}"; [acc] end)
    |> Enum.into([]) |> IO.inspect
    |> Enum.reduce(%{}, fn (word, acc) -> Map.update(acc, word, 1, &(&1 + 1)) end)
  end

  def count(sentence), do: sentence |> String.graphemes |> stream_count

  def tokenize(<<c>>, acc) when (?a <= c and c <= ?z) or (?A <= c and c <= ?Z), do: {[], acc <> <<c>>}
  def tokenize(<<_>>, acc), do: {[acc], ""}
end

calling WordCount.count("Word") will print “called Word” once when hitting the after fun, but the inspection line dies show an empty list, as well as I get an empty map back.

So how can I get that dangling accumulator to the end of my output stream?

PS: I already tried to return a naked value in the after fun, as well as an :ok and :halt-tuple.

2 Likes

Here is a fixed version:

defmodule WordCount do
  def stream_count(stream) do
    stream
    |> Stream.transform(fn -> "" end, &tokenize/2, fn acc -> IO.puts "called #{acc}"; [acc] end)
    |> Enum.into([]) |> IO.inspect
    |> Enum.reduce(%{}, fn (word, acc) -> Map.update(acc, word, 1, &(&1 + 1)) end)
  end

  def count(sentence) do
    sentence
    |> String.graphemes
    |> Stream.concat([""]) # Ensure the final word is closed.
    |> stream_count
  end

  def tokenize(<<c>>, acc) when (c in ?a..?z) or (c in ?A..?Z), do: {[], acc <> <<c>>}
  def tokenize(<<_>>, acc), do: {[acc], ""}
  def tokenize(<<>>, acc), do: {[acc], ""} # Ensure the empty string is handled as well.
end
WordCount.count("Foo bar baz")

Just like in Stream.transform/3, the reducer function in Stream.transform/4 is supposed to produce a {enumerable_from_reading_this_element, accum} tuple. (But I think this can be documented better.)

In your case, if you have as input "foo bar baz", then tokenize is never called like this: tokenize("", "baz"), because there is no final "" in the result of String.graphemes. The final call to tokenize will be tokenize("z", ba"), resulting in {[], "baz"}. This means that the final word is not going to be part of the list you’re going to build the histogram from.

To fix it, I added an extra element to the end of the output of String.graphemes.

1 Like

Well, isn’t it the purpose of Stream.transform/4, to get the accumulator a last time after the stream has finished to add (or a processed version of it) it to the stream?

If it is enough to just append an :eof to the end, I don’t see a reason for Stream.transform/4 in this scenario at all and can stick to /3.

1 Like

I believe that the after_fun of Stream.transform/4 is there to be used to close external resources like file handles. Its output is not going to be part of the returned enumerable.
The documentation notes that '[it] can be seen as a combination of Stream.resource/3 with
Stream.transform/3'. Stream.resource/3 has an example where they open and close file handles.

I think the documentation of the arity-4 version of Stream.transform could be improved.

2 Likes