Stream.peek, Stream.pop

Stream.peek and Stream.pop

I find Stream API limited and I often have to introduce workarounds for things like taking values from enumerable one by one without traversing the whole enumerable at one go.

To achieve this I do some functional magic which is completely unreadable and I honestly don’t understand it completely. It is also possible to do reduce/transform, but it introduce huge runtime overhead, slowing down enumeration around 4-5 times to closure solutions according to my homemade benchmarks.

Example

Works like this

one_to_five = 1..5
Stream.peek(one_to_five, fn first, enumerable ->
  IO.inspect first, label: :first
  IO.inspect Enum.to_list(enumerable), label: :all
end)

# Prints
# first: 1
# all: [1, 2, 3, 4, 5]

Stream.pop(one_to_five, fn first, rest ->
  IO.inspect first, label: :first
  IO.inspect Enum.to_list(rest), label: :rest
end)

# Prints
# first: 1
# all: [2, 3, 4, 5]

Please note that we pass the fn instead of doing something like {first, stream} = Stream.peek(stream) to ensure that the stream is automatically closed when the closure ends. I personally find it too defensive, but that’s how Stream and Enum works today.

Details

  • Each value in the enumerable is traversed only once, enumeration starts once and ends once.
  • When the closure is called, enumeration will be started and the rest of the enumerable will be a continuation wrapped in the Enumerable-compatible closure.

Use-cases

  • Parsing a CSV stream. Sometimes I need to get a headers line and all the rows from CSV stream. NimbleCSV only returns a single stream. There are three solutions do this problem:

    1. functional magic (the best in terms of performance but completely unreadable), doing
    2. Doing [headers] = Enum.take(stream, 1); Enum.reduce(stream), but it opens and closes the descriptor two times, which is a fairly expensive operation
    3. Doing a single Enum.reduce which tracks the state if we’re in the first element or not. Hard to implement, since it breaks the pipelining of Enumerable across functions
  • I wrote a very scary function to merge two ordered streams into one ordered stream, using functional magic. I think that functions proposed here would make the code much more readable

1 Like

Does Enum.reduce have poor performance? That’s how we traverse the stream in all cases, so I don’t see how Stream.peek or any other operation would be faster than that?

Stream.transform does have a cost though.

Yeah, I was not quite clear in the last point. Enum.reduce is not slow, it just requires to move everything to reduce.

For example, if I have a function which accepts a Enumerable.t(%{String.t() => term()}) and it lazily evaluates it and writes it to database or log or whatever and I would want to transform a CSV stream from stream of lists into stream of maps, I’d have to do something weird and strange

1 Like