Why is Stream.reduce_while missing?

I need to reduce enumerable collection to single value. Enum.reduce_while seems to be a perfect fit. However, in my case it’s expensive / not-trival to load the whole collection into memory, instead I would like yield one value at the time until it’s necessary - in other words I want to create a custom stream. I’ve just realized that Stream.reduce_while is missing.

What’s the reason? What could I use instead? Thanks!

Just that you’re using Enum doesn’t mean the whole collection is loaded into memory. Let’s take a simple example:

iex(1)> Stream.repeatedly(fn -> 1 end) |> Stream.take(1000)
#Stream<[
  enum: #Function<53.48559900/2 in Stream.repeatedly/1>,
  funs: [#Function<58.48559900/1 in Stream.take_after_guards/2>]
]>

This is a relatively silly stream that just emits 1. It’s still a stream though in that, at the moment, it hasn’t emitted any values.

Now, suppose we pass this to |> Enum.count. We know what the answer will be of course (1000), but here is the key thing I want to emphasize: While Enum.count does iterate through all items in the stream to build its value, it doesn’t turn it into a list first, and then count that list. We can see this if we change the numbers to be really big, and then graph the memory.

If we take the stream and make it a list, then count the list, we see a spike in memory:

Stream.repeatedly(fn -> 1 end) |> Stream.take(1_000_000) |> Enum.to_list |> Enum.count

But if we just pass it into |> Enum.count, there is no spike in memory, because it’s only grabbing one item at a time, counting it, then throwing it away.

iex(2)> Stream.repeatedly(fn -> 1 end) |> Stream.take(1_000_000) |> Enum.count            
1000000

Enum.reduce_while

Now, addressing the function you actually asked about, Enum.reduce_while and really all of the Enum functions work the way Enum.count does. They only use more memory if the output of the function needs to hold onto the whole collection. So if your reduce function holds on to the whole collection in the accumulator, it’ll use a lot of memory, and if you don’t, it won’t. Even if a Stream.reduce_while existed it would have the same memory characteristics because only you control the reduce function.

20 Likes

thanks! It all makes sense!
Stupid me, stream is always anum so I can use Enum.reduce_while with it.
I’m Elixir newbie :wink:

It’s a little longer, but you could get the same effect as a reduce_while with a pipeline:

some_input
|> Stream.scan(initial_acc, fn el, acc -> ... end)
|> Stream.take_while(fn acc -> ... end)
|> Stream.take(-1)
2 Likes

Very nice explanation! What would you say is the difference between Enum.take_while/2 and Stream.take_while/2 then? Would they only behave differently if they were used in the middle of a chain of transformations? (i.e. the Stream version would not produce an “actual” end result list), while they would actually behave the same if they were used as the final transformation in a chain? Or maybe this understanding is not correct?

When you use Enum.take_while/2 , it processes the entire enumerable immediately and produces a result. (greedy)

Stream.take_while/2 by itself doesn’t actually compute anything. If it is the final step in a chain of transformations, the enumerable still needs to be explicitly evaluated to get the actual list. Stream functions are lazy in that they only occurs when you force the stream to be evaluated using Stream.run/1 or an Enum functions.

The thing that really helped me understand this a lot better is looking at the difference in how a List implement the Enumberable protocol’s ‘reduce/3’ function. vs. how Stream does.

1 Like