Is there a 'next item' call for Stream?

I want to read a file line by line, but I want to pull lines on demand.
I have a GenServer that tokenizes a file. It is called using a call that says ‘give me the next token’. It works but at the moment it reads the whole file into memory at once, doesnt work for big files.

I would like to have the current line in GS state and when I finish it read the next line into state. But I cant see the Stream call needed.

There’s nothing like next item on Stream, because to elixir it’s all Enumerable data. Elixir just knows that it can reduce over those. There’s no stopping in the middle and continueing later for that model – at least none that you can expose, rather than being an implementation detail.

{item_a, rest} = Stream.next(file_stream)
{item_b, _} = Stream.next(rest)
{item_c, _} = Stream.next(rest)

Given how file streams work – using a file cursor – item_b and item_c would be different, even though the Stream.next function is passed the same value. That’s not great for a functional’ish api.

Instead for your usecase you could use the underlying pieces used to stream lines with File.stream!: File.open/2, IO.read/2 and File.close/1 directly.

3 Likes

TY v much

You don’t need to use a stream. Regularly opened files are IO devices, themselves processes, and there is an Erlang function which would do exactly what you want - :file.read_line/1. So, you need to open a file, using File.open or File.open!, and keep the result in your process’ state. Then just call the Erlang function I mentioned to read a line.

If you want to learn more on how IO devices work, check out The Erlang I/O Protocol.

1 Like

Relatedly you could take a look at Ash’s iterex v0.1.2 — Documentation.

Iterators provide the flexibility of Enum with the laziness of Stream and the ability to pause and resume iteration.

Can’t make a statement about the correctness, stability etc. of the library myself, had no real use case so far.

1 Like

Very big tangent, but with Iter.next/1 one can implement a very cool pattern for lazy pagination in LiveView, or elsewhere. Having a resource stream, one can do source = stream |> Iter.from() |> Iter.chunk_every(@page_size), and whenever a new page of items is needed, do {:ok, items, source_rest} = Iter.next(source).

2 Likes

TY.very much for the pointer

Iter seems to start a two additional processes for the Enumerable implementation though and using the process mailbox as a temp. storage: iterex/lib/iter/iterable/enumerable.ex at main · ash-project/iterex · GitHub

That’s quite a blunt approach for dealing with that complexity.

2 Likes

Yeah, it’s been a while since I looked at that code, but IIRC for lists etc. it takes an optimized path. Can’t say for sure. But I think the idea was to find ways to improve the generic implementation, or realistically that someone would write their own iterator etc.

It was built for a pretty specific purpose for supporting map steps in reactor.