Stream.peek and Stream.pop
I find Stream API limited and I often have to introduce workarounds for things like taking values from enumerable one by one without traversing the whole enumerable at one go.
To achieve this I do some functional magic which is completely unreadable and I honestly don’t understand it completely. It is also possible to do reduce/transform, but it introduce huge runtime overhead, slowing down enumeration around 4-5 times to closure solutions according to my homemade benchmarks.
Example
Works like this
one_to_five = 1..5
Stream.peek(one_to_five, fn first, enumerable ->
IO.inspect first, label: :first
IO.inspect Enum.to_list(enumerable), label: :all
end)
# Prints
# first: 1
# all: [1, 2, 3, 4, 5]
Stream.pop(one_to_five, fn first, rest ->
IO.inspect first, label: :first
IO.inspect Enum.to_list(rest), label: :rest
end)
# Prints
# first: 1
# all: [2, 3, 4, 5]
Please note that we pass the fn instead of doing something like {first, stream} = Stream.peek(stream) to ensure that the stream is automatically closed when the closure ends. I personally find it too defensive, but that’s how Stream and Enum works today.
Details
- Each value in the enumerable is traversed only once, enumeration starts once and ends once.
- When the closure is called, enumeration will be started and the rest of the enumerable will be a continuation wrapped in the Enumerable-compatible closure.
Use-cases
-
Parsing a CSV stream. Sometimes I need to get a headers line and all the rows from CSV stream. NimbleCSV only returns a single stream. There are three solutions do this problem:
- functional magic (the best in terms of performance but completely unreadable), doing
- Doing
[headers] = Enum.take(stream, 1); Enum.reduce(stream), but it opens and closes the descriptor two times, which is a fairly expensive operation - Doing a single
Enum.reducewhich tracks the state if we’re in the first element or not. Hard to implement, since it breaks the pipelining of Enumerable across functions
-
I wrote a very scary function to merge two ordered streams into one ordered stream, using functional magic. I think that functions proposed here would make the code much more readable






















