How would you stream the last item twice?

Lets say there is a large file which I do not want to store in memory. I can use File.stream! to get stream of lines. How can I transform this stream to get the last line twice? I think I would first transform stream to emit pairs of items (sliding window) and then emit from pair[1] if it is not nil, otherwise emit pair[0], but I am not sure how to do that.

In python it could look like this:

def stream_last_twice(path):
    last_line = None
    with open(path) as f:
        for line in f:
            last_line = line
            yield line
    yield last_line
"""
line 1
line 2
line 3
"""
|> String.split("\n")
|> Stream.chunk_every(2, 1, [])
|> Stream.flat_map(fn
  [a, _b] -> [a]
  [a] -> [a, a]
end)
|> Enum.to_list()
# ["line 1", "line 2", "line 3", "", ""]

Not the exact approach, but similar.

1 Like

Turns out the Stream.transform version for this isn’t that unreadable as well (as it often can be). It’s a bit faster and more memory efficient it seems from a quick benchmark.

"""
line 1
line 2
line 3
"""
|> String.split("\n")
|> Stream.transform(
  # Build acc for first element
  fn -> :start end,
  # Emit each line once and retain it as acc
  fn line, _acc -> {[line], line} end,
  # Emit the last line again after all lines were handled
  fn acc -> {[acc], :done} end,
  # Nothing to do after streaming was completed
  fn _ -> :ok end
)
|> Enum.to_list()
3 Likes

I really like the Stream.flat_map solution. Stream.transform looks complicated - I think I understand what is happening, but I feel it won’t fit into my muscle memory:)

The transform code is probably the best. If you don’t mind iterating the list twice (and the input stream is not an exhaustible resource, such as a file read), then this is the shortest I can make it:

list = ["a", "b", "c"]
list
|> Stream.concat(Stream.take(list, -1))
|> Enum.to_list()
# => ["a", "b", "c", "c"]
1 Like