I do need some reliable source of numbers, so I thought I create a Stream which simply counts upwards and pass that around as kind of state, but then the problem occurs.
How to take even a single value out of the stream, without “loosing” the new stream state?
I thought I could just use Enum.take/2, but this does force the “tail” of the stream, so it will never return:
iex(1)> s = Stream.unfold(4, fn n -> {n, n + 1} end)
#Function<59.87278901/2 in Stream.unfold/2>
iex(2)> s |> Enum.split(2)
# after about 30 seconds of waiting
BREAK: (a)bort (c)ontinue (p)roc info (i)nfo (l)oaded
(v)ersion (k)ill (D)b-tables (d)istribution
^C
So I am asking if there is already something available where that situation would be roughly as follows:
iex(1)> s = Stream.unfold(4, fn n -> {n, n + 1} end)
#Function<59.87278901/2 in Stream.unfold/2>
iex(2)> s |> Enum.split(2)
{[4, 5], #Function<59.87278901/2 in Stream.unfold/2>}
This way I could pattern match and pass the “tail” of the stream around.
Since there may be multiple processes that also needs a reliable stream of numbers, I can’t really use a central source, since each process’ list of numbers must be continous at the end of the day.
I would argue that Enum.split/2 should actually use the suspend feature of enumerable to return a suspended enumerable as the second argument. Obviously the regular specializations for things like lists or ranges can make it smarter.
I think you are mistaken about what a Stream actually is. It is just a composition of functions. If you like it is an Enumerable defined by a function rather than a data item.
There’s no straight forward way to “pause” a stream unless you dive into the Enumerable protocol. Streams don’t have “state”. The most straightforward way to pass the “state” of a Stream around is turn it into an Agent.
def split_stream(c, count) do
{ Stream.take(s, count) , Stream.drop(s, count) }
end
Seems like a slippery slope, the suspend feature is largely hidden and I’m not sure exposing it will not cause more confusion. If you want a stream with the first N items swallowed, Stream.transform will accomplish that.
Exposing the suspend might be appropriate, but I think it should be outside of Enum.
I’ve come around in my way of thinking about Streams as “lazy”, while they really aren’t lazy as functions they are lazy as “data”. And if you want to preserve the immutablity of data, exposing the suspension of an Enumerable breaks that concept.
A Stream.split would be a reasonable compromise, i.e. you use Stream.transform underneath to split the initial Stream into two independent Streams based on the initial Stream.
Yes, this is not the responsibility of Enum.split or the Enum module. What you want is to be able to take some items of a stream and return a suspended stream. We had an issue to implement those features but it never gathered enough interest to implement it.
This is also a bit bad as a stream, eg. reading a file, could have side effects. So for this code it would be better to iterate through stream elements and copy to a list, then return the “tail” of the stream along with that list
Ok, I feel a little silly posting this, but I wondered if I could do it without going a full process (i.e. Agent) for the state. This just reminds me of Rx with observables and it was a good exercise to play with I think.
# Silly names
defmodule Streamy.Structy do
defstruct [current: nil, func: nil]
end
defmodule Streamy do
alias Streamy.Structy
def iterate(current, func) do
%Structy{current: current, func: func}
end
def peek(streamy) do
s = Stream.iterate(streamy.current, streamy.func)
i = Enum.take(s, 1)
IO.puts inspect i
IO.puts List.last(i)
%Structy{current: List.last(i), func: streamy.func}
end
def take(streamy, count) do
s = Stream.iterate(streamy.current, streamy.func)
[h|rest] = Enum.take(s, count + 1)
IO.puts "h: #{inspect h}"
IO.puts "rest: #{inspect rest}"
%Structy{current: List.last(rest), func: streamy.func}
end
end
Thanks, this is very important.
I’ll give a very reasonable usecase, in case people missed it:
I want to read a CSV from S3 while processing it. If my process crashes, ideally I restart where I was.
This is totally achievable, but the Stream API (unless you use stream_split) doesn’t allow it. But the S3 API (and ex_aws_s3) can stream the file (you can get any number of bytes from any offset of the file), CSV reading can be done in the same way (read any number of bytes from any offset in the file), so the 2 things combined can be suspended and resumed.
I cannot iterate (and store) the whole thing in memory though. The CSV file is just too big (GBs) and I need to stop between each iteration to do some work and need to be able to resume my work if everything crashes (lot of data!)