Questions about Streams

Hi everyone, hope y’all doing well, I am quite new to elixir and functional programming in general and I never really used Streams in another laguange before, so I have some questions about Streams.

file = File.stream!("elixir_is_funny.txt")
file |> Enum.map(&foo/1) |> Enum.map(&foo/2) |> Enum.map(&/foo3) |> Enum.take(1)

  • each Enum.map will it execute just one time? If yes, how? how does it knows that i will take only one?

  • I’ve read that in order to Enum works, you have to implement a iterable in the module, how that works with streams?

  • I’ve made some tests, and in some cases it stops being a stream, how can i work with streams without transforming it?

  • I’ve heard about the term lazy load, what does that mean? is Streams lazy loaded?

Well thats it, sorry for my english, it is not my first language :smiley:

2 Likes

Before you can truely understand streams in elixir you need to understand enumerables.

Enumerable is a protocol for collections of data, which can be iterated over using reduce at its core. An enumerable can be e.g. a list, which already exists completely in memory ready to be iterated over. It can however also be “lazy” in that the actual values the collection exists of are only computed or pulled into memory once the enumerable is iterated / reduced over.

A simple case for a lazy enumerable in elixir is Range. It models a sequence of interger values, but the struct only stores the lower and upper boundary of the sequence. 2..15 only stores a struct containing 2 and 15. Only once you start iterating over the range the values between those two numbers are computed – so 3, 4, 5, ….

So that’s Enumerable. Now there’s also Enum and Stream.

Enum provides APIs to work with Enumerable data. It is eager in the sense that each call to an Enum function will iterate over as much of the enumerable collection as the operation would require. E.g. Enum.map will iterate over the complete input and return a list of mapped values for each item in the input collection. Enum.take(input, 1) will take out the first item from the input and be done.

The return value of Enum apis might be another Enumerable, which is why you can often see chained calls to its apis, but it’ll never be another lazy enumerable.

Now the difference with Stream is that its apis will always return lazy enumerables. So the input is wrapped with some lazily computable operation, which gives you a lazy enumerable. Some functions don’t even require an enumerable as input, but build an lazy enumerable from other input, others wrap existing enumerables – them being lazy or not.

Therefore the answers to your questions:

Yes, each Enum.map will execute once, but not just for a single element of your initial input. They’ll all map over all lines of your file and only the last step will discard all the work, besides for the first item in there. Previous steps have no idea you’ll discard stuff in the end.

See above. The protocol is Enumerable and the way it works does allow for both lazy and “not so lazy” enumerables.

There’s no “stream” datatype and also things on the beam are immutable. So there’s no way to change something without changing it. But I hope I explained enough before that streams don’t just stop being streams. If you want to retain an lazy enumerables lazyness you add additional operations using Stream apis. Once you start using Enum operations become eager.

Lazy really only refers to the fact that items of an enumerable collection somehow become available “on demand”. This could be being loaded on demand or computed, …

7 Likes

Let’s do an experiment.

  1. Create a text file with the name nums.txt and the following content:
    1
    2
    3
    
  2. Open iex
  3. Try File.stream!("nums.txt") |> Enum.map(&IO.inspect/1) |> Enum.map(&IO.inspect/1) |> Enum.take(1), what do you see?
  4. Change Enum.map to Stream.map, what do you see?
  5. Change Enum.take(1) to Enum.take(2), what do you see?
7 Likes

Now it makes sense to me, i was thinking that by using Enum methods on a lazy input it somehow continued to output a stream. In fact it’s away more simpler than i thought.

Thank you so much @LostKobrakai, really appreciated your explanation.

2 Likes

wow thats cool, by using Stream.map it only runs each IO.inspect once.

Thank you very much for this practical example, really simple and didatic.

4 Likes

In the last step, you can also see that the code prints two 1 first, then prints two 2. This also means something.

2 Likes

i didn’t noticed that, so, does this mean that it will evaluate one object per time and then apply all functions to it?

Also I logged the result of the stream, without the Enum.take, and there is a field called funs with exact 2 functions in it, these are functions that will run once the value is requested, right?

Long story short, calling any function of Stream does not actually do anything until you attempt to take values from the stream which happens with functions in the Enum modules, mostly.

Taking the file of @Aetherus as a playground:

"nums.txt"
|> File.stream!()
|> Stream.map(fn item -> String.duplicate(item, 3) end)
|> Stream.map(fn item -> "*" <> item <> "*" end)
|> Enum.each(&IO.inspect(&1, label: "triple repeated number"))

This will process all elements because we have not filtered the stream and because you are eventually calling Enum.each, which goes through the entire collection it’s given (again, an unfiltered one). Thus you’ll see three numbers printed (since the file has 3 lines).

Notice something else: we use Stream functions in the entire pipe EXCEPT the last step. This saves you from memory pressure and memory spikes. Imagine the file was 1 million lines long. If you instead used Enum.map twice this means that the pipe will create a new list of elements three times (for the two instances of Enum.map and the one instance of Enum.each) and will discard two of them (leaving only the final one).

But if you use Stream.map only the final list will be constructed. You’ll only have the original list and the final list in memory.

Again, this reduces pressure on the garbage collector and makes the code perform predictably (not strictly true in 100% of the cases; Stream carries slight CPU time performance overhead but safe to say, anything that’s not super trivially small – say, 5000 elements or more – should be processed via Stream if you care about memory usage; otherwise you’ll get memory spikes).

Back to your original question, let’s modify the code a little:

"nums.txt"
|> File.stream!()
|> Stream.map(fn item -> String.duplicate(item, 3) end)
|> Stream.map(fn item -> "*" <> item <> "*" end)
# 👇 new code line 👇
|> Stream.take(1)
|> Enum.each(&IO.inspect(&1, label: "triple repeated number"))

And now you’ll see only one number inspected.

Just so I don’t sound too dogmatic: another way of forcing all Stream operations to evaluate and start doing work is via Stream.run. Let’s rewrite the above example for a demonstration:

"nums.txt"
|> File.stream!()
|> Stream.map(fn item -> String.duplicate(item, 3) end)
|> Stream.map(fn item -> "*" <> item <> "*" end)
|> Stream.take(1)
|> Stream.each(&IO.inspect(&1, label: "triple repeated number"))
|> Stream.run()

Note: this will return :ok (since Stream.run returns :ok) i.e. same as above, but only because Enum.each also returns :ok.

2 Likes

This is what I always think of Stream and Enum.

something_enumerable  # <-- upmost stream
|> Stream.do_something(...)
|> Enum.do_something(...)
|> Enum.do_something(...)  # <-- downmost stream

Enum

When you call Enum.do_something, it always tells its upstream “give me everything you have, NOW!”, then it iterates through all the things it gets and does some work to them, then returns something solid (like a list or a map).

Stream

When you call Stream.do_something, it tells the upstream “I’ll ask you for a value whenever I’m asked for a value. Other times I just sit there doing nothing.” So, if there’s no need for a value, it does nothing. The return value of Stream.do_something is not that solid.

Common functions

If some job can be done in a per-value way, like transforming a value to another value based on that value only, then such functions can be found in both Stream and Enum, like map and filter.

Enum-only functions

If something can only be done considering the whole Enumerable, then such functions can only be found in Enum, like sort and group_by and reduce.

Stream-only functions

Because Stream is lazy, it has the potential of generating an infinite number of values. For example, Stream.repeatedly(&:rand.uniform/0) can generate an infinite number of random float numbers, not at once, but whenever you need a float. Such “value generating functions” are only found in Stream.

5 Likes