`Enum.split/2` which does not force the tail of a `Stream`

I do need some reliable source of numbers, so I thought I create a Stream which simply counts upwards and pass that around as kind of state, but then the problem occurs.

How to take even a single value out of the stream, without “loosing” the new stream state?

I thought I could just use Enum.take/2, but this does force the “tail” of the stream, so it will never return:

iex(1)> s = Stream.unfold(4, fn n -> {n, n + 1} end)
#Function<59.87278901/2 in Stream.unfold/2>
iex(2)> s |> Enum.split(2)
# after about 30 seconds of waiting
BREAK: (a)bort (c)ontinue (p)roc info (i)nfo (l)oaded
       (v)ersion (k)ill (D)b-tables (d)istribution
^C

So I am asking if there is already something available where that situation would be roughly as follows:

iex(1)> s = Stream.unfold(4, fn n -> {n, n + 1} end)
#Function<59.87278901/2 in Stream.unfold/2>
iex(2)> s |> Enum.split(2)
{[4, 5], #Function<59.87278901/2 in Stream.unfold/2>}

This way I could pattern match and pass the “tail” of the stream around.

Since there may be multiple processes that also needs a reliable stream of numbers, I can’t really use a central source, since each process’ list of numbers must be continous at the end of the day.

I would argue that Enum.split/2 should actually use the suspend feature of enumerable to return a suspended enumerable as the second argument. Obviously the regular specializations for things like lists or ranges can make it smarter.

1 Like

I think you are mistaken about what a Stream actually is. It is just a composition of functions. If you like it is an Enumerable defined by a function rather than a data item.

There’s no straight forward way to “pause” a stream unless you dive into the Enumerable protocol. Streams don’t have “state”. The most straightforward way to pass the “state” of a Stream around is turn it into an Agent.

def split_stream(c, count) do 
    { Stream.take(s, count) , Stream.drop(s, count) }
end 

would work for simple integer uses of Enum.split.

Seems like a slippery slope, the suspend feature is largely hidden and I’m not sure exposing it will not cause more confusion. If you want a stream with the first N items swallowed, Stream.transform will accomplish that.

Exposing the suspend might be appropriate, but I think it should be outside of Enum.

I’ve come around in my way of thinking about Streams as “lazy”, while they really aren’t lazy as functions they are lazy as “data”. And if you want to preserve the immutablity of data, exposing the suspension of an Enumerable breaks that concept.

A Stream.split would be a reasonable compromise, i.e. you use Stream.transform underneath to split the initial Stream into two independent Streams based on the initial Stream.

Yes, this is not the responsibility of Enum.split or the Enum module. What you want is to be able to take some items of a stream and return a suspended stream. We had an issue to implement those features but it never gathered enough interest to implement it.

1 Like

This is also a bit bad as a stream, eg. reading a file, could have side effects. So for this code it would be better to iterate through stream elements and copy to a list, then return the “tail” of the stream along with that list

Ok, I feel a little silly posting this, but I wondered if I could do it without going a full process (i.e. Agent) for the state. This just reminds me of Rx with observables and it was a good exercise to play with I think.

# Silly names
defmodule Streamy.Structy do
  defstruct [current: nil, func: nil]
end

defmodule Streamy do
  alias Streamy.Structy

  def iterate(current, func) do
    %Structy{current: current, func: func}
  end

  def peek(streamy) do
    s = Stream.iterate(streamy.current, streamy.func)
    i = Enum.take(s, 1)
    IO.puts inspect i
    IO.puts List.last(i)
    %Structy{current: List.last(i), func: streamy.func}
  end

  def take(streamy, count) do
    s = Stream.iterate(streamy.current, streamy.func)
    [h|rest] = Enum.take(s, count + 1)
    IO.puts "h: #{inspect h}"
    IO.puts "rest: #{inspect rest}"
    %Structy{current: List.last(rest), func: streamy.func}
  end
end

And then it’s used like this:

Interactive Elixir (1.3.2) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> import Streamy
Streamy
iex(2)> streamy = iterate(0, fn(item) -> item + 1 end)
%Streamy.Structy{current: 0, func: #Function<6.52032458/1 in :erl_eval.expr/5>}
iex(3)> streamy = streamy |> take(1)
h: 0
rest: [1]
%Streamy.Structy{current: 1, func: #Function<6.52032458/1 in :erl_eval.expr/5>}
iex(4)> streamy = streamy |> peek
[1]
1
%Streamy.Structy{current: 1, func: #Function<6.52032458/1 in :erl_eval.expr/5>}
iex(5)> streamy = streamy |> take(5)
h: 1
rest: [2, 3, 4, 5, 6]
%Streamy.Structy{current: 6, func: #Function<6.52032458/1 in :erl_eval.expr/5>}
iex(6)> streamy |> peek
[6]
6
%Streamy.Structy{current: 6, func: #Function<6.52032458/1 in :erl_eval.expr/5>}
iex(7)> streamy.current
6

I’m not sure if the peek is even necessary since everything is immutable. You would just not reassign the variable.

Anyway, like I said…it’s a little ridiculous, but just throwing it out there.

EDIT: Pushed to GitHub

1 Like

This one is interesting: http://stackoverflow.com/questions/29136874/enumerable-stream-with-look-ahead

Module.lookahead/2 is what you are looking for…

After drawing some inspiration from ‘Warren’ on stackoverflow, I created this hex package:

2 Likes

Thanks! I just ran into this problem. I’ve made a note to study your library :slight_smile:

Thanks, this is very important.
I’ll give a very reasonable usecase, in case people missed it:

I want to read a CSV from S3 while processing it. If my process crashes, ideally I restart where I was.

This is totally achievable, but the Stream API (unless you use stream_split) doesn’t allow it. But the S3 API (and ex_aws_s3) can stream the file (you can get any number of bytes from any offset of the file), CSV reading can be done in the same way (read any number of bytes from any offset in the file), so the 2 things combined can be suspended and resumed.

I cannot iterate (and store) the whole thing in memory though. The CSV file is just too big (GBs) and I need to stop between each iteration to do some work and need to be able to resume my work if everything crashes (lot of data!)

1 Like