Best way to iterate a Stream

Background: I am writing a multiplayer mini-game with math puzzles. I need the puzzles to be random but reproducible given a seed. I decided to model them as an infinite Stream. The players will ask the stream for new puzzles. I did something like that but for one player in Elm long time ago.

http://tomekowal.github.io/elm-multiplication-game/

Now, the real question is: how do I get elements from the Stream one by one? I’d love an API similar to String.next_grapheme/1

String.next_grapheme("asdf")
{"a", "sdf"}
String.next_grapheme("")
nil

{1, stream} = Stream.next([1, 2, 3])
{2, stream} = Stream.next(stream)
{3, stream} = Stream.next(stream)
nil = Stream.next(stream)

Unfortunately, there is no such API neither in Stream nor in Enum modules. It seems that the question is fairly popular:



And there are a couple of solutions to it:


None of them is perfect. E.g. StreamSplit works on infinite Streams but breaks on lists.

Did you encounter such a problem? How do you code around it?
When I started, I was pretty sure something like that is in the standard library but I was wrong.
Is there a reason it is not there? Maybe it doesn’t play well with Stream.interval?

2 Likes

I think you can find an example in gen_stage code.

4 Likes

Exactly, that is another example of the same problem. The protocol isn’t very complicated but wrong implementations can have subtle bugs like the one in StreamSplit (or maybe it is not a bug, it is just not intended to use it with finite enumerables).

The pattern is very common. Do you think it makes sense to make it part of the standard library? I am hesitant to open a proposal because every new feature needs to be maintained. But I feel, that without it, Stream API is not feature complete.

Imagine GenStage not knowing about the details of Enumerable porotocol. Instead, it would use Stream.next(enumerable, number_of_elements) -> {stream, values} where stream is nil if it ended.

defmodule GenStage.Streamer do
  @moduledoc false
  use GenStage

  def start_link({_, opts} = pair) do
    GenStage.start_link(__MODULE__, pair, opts)
  end

  def init({stream, opts}) do
    {:producer, stream, Keyword.take(opts, [:dispatcher, :demand])}
  end

  def handle_demand(_demand, nil) do
    {:noreply, [], nil}
  end

  def handle_demand(demand, stream) when demand > 0 do
    {stream, values} = Stream.next(stream, demand)

    if stream == nil do
      GenStage.async_info(self(), :stop)
    end

    {:noreply, values, stream}
  end

  def handle_info(:stop, state) do
    {:stop, :normal, state}
  end
end

Apart from Stream.next/2 I’d go with Stream.next/1 which returns {stream, value} or nil.

But before discussing API or implementation details, my question is. Do you think such “iterator” should be part of the standard library? Do you see any cons?

1 Like

IMO, the ability to pull items from the stream one at a time definitely looks useful. I guess your questions should be addressed at @josevalim and the rest of the core team :slight_smile:

1 Like

Consuming a collection (both finite and infinite ones) one-by-one is definitely useful. I have had conversations with @josevalim about this before. The reason Enum (and, by extension, Stream) do not support this by default is twofold:

  1. They expose their iteration process as a fold (which is the academic term for Enumerable.reduce), which always consumes the complete collection. For certain kinds of Enumerables, it is impossible or impractical to take elements out one by one. An example are file streams: iterating them one-by-one means that you have to keep the file open for the whole time you take elements out one-by-one.
  2. Basing Enumerable on top of a ‘take one per time and keep the collection intact’ interface would be much slower (if I recall correctly, this was tried and benchmarked in some pre-stable version of Elixir. From the top of my head, I believe it was 4-5x slower, which is very significant since so much time in Elixir is spent working with/on collections).

Nevertheless, there are cases like in your use-case where this is very useful. You mentioned the iter repository in your post. That is an old attempt at fleshing out what later became the Extractable library/protocol.
Extractable only comes with out-of-the-box implementations for Lists, Maps and MapSets.
I am fairly certain that you could create a custom struct for your puzzle generating logic that contains a function which returns a {puzzle, new_struct_with_function_for_next_puzzle}.

5 Likes

Thank you! That is a handy insight!

ad1) File streams are a perfect point for not putting those functions in the standard API. It could encourage users to get lines one by one and do operations on them instead of composing the entire stream and then executing.

On the other hand, it might be useful for something like tail -f where we know we don’t want to close the file.
I thought a little about the Stream.interval and I figured that potential Stream.next should block until it can get all the elements, it tries to consume. It would make sense for both interval and tail -f. Stream.next would wait for one tick of interval or a new line appended to a file in the tail -f.

I think with good examples and documentation, we should be able to show where to use and where not to use Stream.next

ad2) I wouldn’t base Enumerable on top of taking one element at a time. I wouldn’t touch Enumerable at all. It supports continuations and suspends, so I don’t think anything would change in the existing code.

WRT Extractable library: did you think about basing it on top of the Enumerable? It could use the same trick as the one used in the gen_stage code that @sasajuric linked above. This way, it could automagically work for all kinds of Enumerables.

WRT puzzle generator. You are right. I could go with a custom generator, but I would miss all the potential that the Stream module gives me. Maybe I could enable an easy mode where someone needs to only solve every second puzzle with Stream.take_every. Alternatively, instead of generating puzzles one from another, I could create a stream of seeds and build puzzles from that:

start_seed
|> Stream.iterate(&next_seed)
|> Stream.map(&seed_to_puzzle)

I’d instead implement the next function in my project with Enumerable like in the GenStage example to preserve all the power Stream gives me.

I’d like to know of other downsides you can think of. I am more and more inclined to create a proposal, and this kind of feedback is helpful.

Thank you; very interesting thoughts :smiley:.

Correct, but in a situation like a tail -f, what we are actually modeling is much closer to e.g. GenStage, where we have multiple producers/consumers in parallel.

This is very different from streams like your PuzzleStream, which are (potentially) infinite, enumerated one-at-a-time and not dependent on the production of a different process.

The GenStage.Streamer code that @sasajuric linked to turns an (potentially infinite) local stream in a process-wrapped GenStage producer. This is an interesting ‘solution’, but it does mean that we have the extra overhead of a process when we want to work with such a stream. I think that there are many instances where a pure approach that keeps everything inside the same process makes more sense.

Related; I am reluctant to offer a seemingly data-only API which behind the scenes spins up processes.
But that does mean that building an abstraction that would happily work for arbitrary existing Enumerables (and which does not spin up processes) is difficult and probably impossible to implement.

WRT the PuzzleGenerator: I think that we could easily (and it would be an interesting exercise!) create a bunch of functions that allow a similar interface to what Stream provides. Maybe it is even possible to create a variant that is fully pipe-able. (Extracting single elements, of course, is something probably more commonly done in a with-statement).

1 Like

OK, I started testing it and it is much harder than I expected :smiley:

a) I can’t treat suspended stream a.k.a. continuation as another stream. If I start iterating I need to finish and can’t apply any other transformations so {value, stream} = Stream.next(stream) doesn’t work. It can be {value, continuation} = MyStream.next(stream_or_continuation)
b) the continuations were not designed to call one by one :smiley: In example list [1,2,3] results in four steps:

  • for [1,2,3] returning {:suspeneded, 1, continuation}
  • for [2, 3] returning {:suspended, 2, continuation}
  • for [3] returning {:suspended, 3, continuation}
  • for [] returning {:done, whatever we think should indicate no value}

but [1,2,3] |> Stream.take(3) results in three steps:

  • for [1,2,3] returning {:suspended, 1, continuation}
  • for [2, 3] returning {:suspended, 2, continuation}
  • for [3] returning {:halted, 3}

So I need to make sure the code handles it. Here is what I came up with:

defmodule MyStream do
  def next(continuation) when is_function(continuation, 1) do
    case continuation.({:cont, nil}) do
      {:suspended, element, continuation} ->
        {element, continuation}

      {:halted, element} ->
        {element, fn _ -> {:done, nil} end}

      {:done, element} ->
        nil
    end
  end

  def next(stream) do
    continuation = &Enumerable.reduce(stream, &1, fn x, _acc -> {:suspend, x} end)
    next(continuation)
  end
end

Oh, and sometimes halting retunrs an element while sometimes it doesn’t and only copies value from given accumulator (e.g. File.stream! does that), so I came up with something crazy like this:

defmodule MyStream do
  def next(continuation) when is_function(continuation, 1) do
    case continuation.({:cont, :ignored_accumulator}) do
      # extract element and return continuation
      {:suspended, {:element, element}, continuation} ->
        {element, continuation}

      # halted with value
      {:halted, {:element, element}} ->
        {element, &fake_continuation/1}

      # halted without value
      {:halted, _} ->
        nil

      # done *probably* doesn't return values
      {:done, _} ->
        nil
    end
  end

  # Entry point
  def next(stream) do
    # Tag the value with :element to differentiate it later from empty values
    # in the unlikely case that someone had `:ignored_accumulator` as an element of the stream
    continuation = &Enumerable.reduce(stream, &1, fn x, _acc -> {:suspend, {:element, x}} end)
    next(continuation)
  end

  # The stream already finished but the API is nicer if MyStream.next returns nil in the next call
  defp fake_continuation(_), do: {:done, nil}
end

I am going to sleep on it :smiley:

There is one additional reason why streams can’t be consumed one-by-one and instead need to be folded (Enumerable.reduced) at once: the stream generator may have side-effects (example: consuming File.stream! advances the file position) and therefore must be invoked at most once.

Try this:

{e1, cont1} = MyStream.next(File.stream!("test.txt", [], 1))
{e2, cont2} = MyStream.next(cont1)
{e2a, cont2a} = MyStream.next(cont1)
IO.puts("#{e1} #{e2} #{e2a}")

Assuming test.txt contains 12345, one would expect to see 122 but instead you get 123.

The StreamSplit library suffers from the same problem:

iex(1)> {head, tail} = StreamSplit.take_and_drop(File.stream!("test.txt", [], 1), 1)
{["1"], #Function<55.126435914/2 in Stream.resource/3>}
iex(2)> Enum.take(tail, 2)
["2", "3"]
iex(3)> Enum.take(tail, 2)
["4", "5"]

The documentation for Enumerable repeatedly mentions that “In case a reducer/0 function returns the :suspend accumulator, the :suspended tuple must be explicitly handled by the caller and never leak.” And that’s exactly what your code does — the continuation leaks and can then be called multiple times, messing up the side effects.

If one comes from a Haskell background (like me), it really is somewhat suprising. Stream resembles a lazily evaluated linked list, but the BEAM VM doesn’t have lazy evaluation and thunks, so the side-effectful Stream must be treated with care. :slight_smile:

Here’s my stab at a safe-ish stream iterator:

defmodule StreamStepper do
  def stream_stepper(stream) do
    stream
    |> Enumerable.reduce({:cont, nil}, &stream_stepper_suspender/2)
    |> stream_stepper_continuer()
  end

  defp stream_stepper_suspender(head, nil) do
    {:suspend, {head}}
  end

  defp stream_stepper_continuer({done_halted, nil}) when done_halted in [:done, :halted] do
    []
  end

  defp stream_stepper_continuer({done_halted, {head}}) when done_halted in [:done, :halted] do
    tail = fn -> [] end
    [head | tail]
  end

  defp stream_stepper_continuer({:suspended, {head}, tail_cont}) do
    once = callable_once()
    tail = fn -> once.(fn -> tail_cont.({:cont, nil}) |> stream_stepper_continuer() end) end
    [head | tail]
  end

  defp callable_once do
    seen = :atomics.new(1, [])

    fn fun ->
      case :atomics.compare_exchange(seen, 1, 0, 1) do
        :ok -> fun.()
        _ -> raise "protected fun evaluated twice!"
      end
    end
  end
end

It’s just a hack, the continuation kind of leaks as well, but this leak is protected by callable_once which raises whenever the possibly side-effecting fun is invoked more than once.

5 Likes

Wow! This callable_once is brilliant! I haven’t heard about :atomics before.

And now I understand better why passing a continuation around might result in shooting myself in the foot :smiley:

I find this funny: my initial idea for the stream of puzzles was precisely about two processes iterating over the same stream at different speeds :smiley:

In my case, I could create two identical streams, and that should work.

For infinite streams without side effects like the stream of puzzles or Stream.cycle([1,2,3]) setting atomics references seems wasteful.

BUT, if we would like to introduce Strem.next or Stream.step to standard API, we would have to let the Enumerable decide, if it uses side effects and needs protection or if it is safe to call the same continuation multiple times.

That would require adding protections as callable_once to all side effectful streams in the stdlib and changes in the API of generators like Strem.resource or Strem.iterate to enable such protections. Well, even Stream.map could have side effects that we might want to control.

Suddenly the amount of work to implement it becomes more significant. Next question arises: is it worth it?

The amount of libraries and even GenStage example suggests there is a need for something like that while it is very easy to do wrong.
On the other hand, that would complicate the Stream API and add another thing to think about when implementing Enumberable protocol.
From those two, I think complicating the Stream API is a bigger issue. It is elegant and clean, so adding opts like protect: true seems like a bad idea to me. And of course, allowing continuations to leak is even worse.

Alternatively, we could keep protections for all streams as in your solution, and never allow calling the same continuation twice even when it is safe. That would require information that it might be less efficient than regular processing. Those :atomics instructions should be fast though.

Well, at least I now know why the stepping API is hard to implement :slight_smile:

I still don’t want to give up on the idea of my PuzzleStream being a proper Stream, but maybe instead of stepping over it, I’ll implement the game logic inside Stream.transform/3

Thank you all for the discussion! It was enlightening!

2 Likes

It’s possible to use a less generic version of callable_once which instead of creating new atomics at every step uses a single atomics reference for the entire stream and increments it. I think the performance impact of such a thing is negligible. But I’m no expert on low-level erlang/elixir performance issues. Just wanted to mention this for completeness (I did write the code for this and then later simplified it).

1 Like

Personally, I would write a small abstraction around puzzle sequence, with the following operations:

  • PuzzleSequence.new(seed)
  • PuzzleSequence.next_puzzle(sequence)

From there, I’d write the rest of the system and see how it unfolds. If I notice that there is some significant overlap between what I wrote and Stream/Enum functions, I’d try to see if I can somehow reuse the existing enum logic, e.g. by doing some trickery proposed here. But before that, I’d just stick with this simple interface.

4 Likes

Yep, if I was doing it for work, I wouldn’t think twice about it and went with PuzzleSequence.new(seed) and PuzzleSequence.next_puzzle(sequence). But I am doing it mostly for fun and learning. I am totally aware I am over-engineering but just from those discussions I understood Streams and Enumerables better and learned about :atomics :smiley: I feel that was worth it :smiley:

3 Likes

Pobably for simplicity, the developers set the definition of Stream to a function:
Stream :: function(_,_)

Which is used together with fold (wich iterates until the end).

When I try to do lazy generations in erlang, I use the following construction:
Stream :: [term() | function()]

Where the tail is the function that will generate the next iteration.
Here an example (in elixir):

  def lazy_seq(cycle) do
    lazy_queue(:queue.from_list(cycle))
  end

  defp lazy_queue(queue) do
    {{:value, h}, queue} = :queue.out(queue)
    [h | fn -> lazy_queue(:queue.in(h, queue)) end]
  end

To use it you have to trigger the function at the tail:

iex(1)> [_|lz] = Test.lazy_seq([1,2,3,4])
[1 | #Function<0.133222699/0 in Test.lazy_queue/1>]
iex(2)> [_|lz] = lz.()
[2 | #Function<0.133222699/0 in Test.lazy_queue/1>]
iex(3)> [_|lz] = lz.()
[3 | #Function<0.133222699/0 in Test.lazy_queue/1>]
iex(4)> [_|lz] = lz.()
[4 | #Function<0.133222699/0 in Test.lazy_queue/1>]
iex(5)> [_|lz] = lz.()
[1 | #Function<0.133222699/0 in Test.lazy_queue/1>]
iex(6)> [_|lz] = lz.()
[2 | #Function<0.133222699/0 in Test.lazy_queue/1>]
...
1 Like

Enjoy the updated version… forgot to add clause for {:halted, item}

defmodule StreamIterator.Suspension do
  defstruct continuation: nil

  def start(stream) do
    reduce_fun = fn item, _acc -> {:suspend, item} end
    {:suspended, nil, continuation} = Enumerable.reduce(stream, {:suspend, nil}, reduce_fun)
    %__MODULE__{continuation: continuation}
  end

  def next(state)
  
  def next(%{continuation: nil} = state) do
    {:eof, state}
  end

  def next(state) do
    case state.continuation.({:cont, nil}) do
      {:suspended, item, continuation} -> {:next, item, %{state | continuation: continuation}}
      {:halted, item} -> {item, %{state | continuation: nil}}
      {:done, nil} -> {:eof, %{state | continuation: nil}}
    end
  end
end
1 Like