There is one additional reason why streams can’t be consumed one-by-one and instead need to be folded (Enumerable.reduce
d) at once: the stream generator may have side-effects (example: consuming File.stream!
advances the file position) and therefore must be invoked at most once.
Try this:
{e1, cont1} = MyStream.next(File.stream!("test.txt", [], 1))
{e2, cont2} = MyStream.next(cont1)
{e2a, cont2a} = MyStream.next(cont1)
IO.puts("#{e1} #{e2} #{e2a}")
Assuming test.txt
contains 12345
, one would expect to see 122
but instead you get 123
.
The StreamSplit
library suffers from the same problem:
iex(1)> {head, tail} = StreamSplit.take_and_drop(File.stream!("test.txt", [], 1), 1)
{["1"], #Function<55.126435914/2 in Stream.resource/3>}
iex(2)> Enum.take(tail, 2)
["2", "3"]
iex(3)> Enum.take(tail, 2)
["4", "5"]
The documentation for Enumerable
repeatedly mentions that “In case a reducer/0
function returns the :suspend
accumulator, the :suspended
tuple must be explicitly handled by the caller and never leak.” And that’s exactly what your code does — the continuation leaks and can then be called multiple times, messing up the side effects.
If one comes from a Haskell background (like me), it really is somewhat suprising. Stream
resembles a lazily evaluated linked list, but the BEAM VM doesn’t have lazy evaluation and thunks, so the side-effectful Stream
must be treated with care. 
Here’s my stab at a safe-ish stream iterator:
defmodule StreamStepper do
def stream_stepper(stream) do
stream
|> Enumerable.reduce({:cont, nil}, &stream_stepper_suspender/2)
|> stream_stepper_continuer()
end
defp stream_stepper_suspender(head, nil) do
{:suspend, {head}}
end
defp stream_stepper_continuer({done_halted, nil}) when done_halted in [:done, :halted] do
[]
end
defp stream_stepper_continuer({done_halted, {head}}) when done_halted in [:done, :halted] do
tail = fn -> [] end
[head | tail]
end
defp stream_stepper_continuer({:suspended, {head}, tail_cont}) do
once = callable_once()
tail = fn -> once.(fn -> tail_cont.({:cont, nil}) |> stream_stepper_continuer() end) end
[head | tail]
end
defp callable_once do
seen = :atomics.new(1, [])
fn fun ->
case :atomics.compare_exchange(seen, 1, 0, 1) do
:ok -> fun.()
_ -> raise "protected fun evaluated twice!"
end
end
end
end
It’s just a hack, the continuation kind of leaks as well, but this leak is protected by callable_once
which raises whenever the possibly side-effecting fun is invoked more than once.