How can I get last value of stream?

I need to read very large data and write that to csv file.
I want to get last datum of the data after writing.

file =!(file_path, [:write, :utf8])

Stream.unfold(start_id, fn
  from_id ->
    logs = Log.list(%{from_id: from_id, size: @size})

    last_id =
      case logs |> List.last() do
        nil -> nil
        last_log -> last_log |> Map.get(:id)

    {logs, last_id}
|> Stream.flat_map(fn logs ->
  logs |> to_list()
|> CSV.encode()
|> some_func() <- This is what I want


using Enum.to_list() |> List.last() takes too large resource.

simple version

For example:

[1, 2, 3]
|> x -> x * 2 end)
|> some_func()


Use Stream.take/2 with -1 as count as long as the Stream is not infinite

1 Like

I doubt it’s what was requested. According to docs on Stream.take/2.

If a negative count is given, the last count values will be taken. For such, the collection is fully enumerated keeping up to 2 * count elements in memory.

I would use Stream.transform/4 instead of Stream.unfold/2.


You are indeed correct as Stream.transform/4 with an agent would do the job correctly.

enum = [1, 2, 3, 4, 5]
{:ok, agent_pid} = Agent.start_link(fn -> nil end)
start_fun = fn -> nil end
reducer_fun = fn v, _ -> {[], v} end
after_fun = fn v -> Agent.update(agent_pid, fn _ -> v end) end
stream = Stream.transform(enum, start_fun, reducer_fun, after_fun)
:ok =
value = Agent.get(agent_pid, & &1)
:ok = Agent.stop(agent_pid)

FYI Elixir's Stream.transform doesn't emit its accumulator at the end of the input. What to do? · GitHub


FWIW, 4 years ago I even proposed the PR to the core to make transform/4 to preserve an accumulator, but it was ruled out in favour of chunk_×××/× family to deal with it.

That said, we might abuse Stream.chunk_while/4 to do what we want.

[1, 2, 3]
|> Stream.chunk_while(
  fn e, _ -> {:cont, e} end, # do whatever here, return `e`
  fn acc -> {:cont, acc, acc} end
|> Enum.to_list()
#⇒ [3]
1 Like

The last element of an enumerable can be fetched with, -1). However this won’t help you here. Conceptually the problem can be described as:

# at the input we have unencoded table data (enumerable of lists)
|> CSV.encode()
# at the output we have a stream of encoded rows

So whatever additional info you collect (like e.g. id of the last row), it has to be discarded before CSV.encode, and at the output you get a stream of encoded rows.

For this particular case I’d try something like this (untested):

unencoded_rows =
  # using Stream.resource to avoid `Stream.unfold` + `Stream.flat_map`
    fn -> start_id end, 
    fn from_id ->
      case Log.list(%{from_id: from_id, size: @size}) do
        [] -> {:halt, nil}
        logs -> {logs, logs |> List.last() |> Map.get(:id)}
    fn _ -> :ok end

encoded_ids =
  # using `Stream.transform` to ensure the file is open only while the stream is being consumed
    fn ->!(file_path, [:write, :utf8]) end,
    fn log, file ->
      # encode to csv and store as a side-effect
      csv_row = [log] |> CSV.encode() |> Enum.to_list()
      IO.write(file, csv_row)

      # return the id of the encoded entry
      {[Map.get(log, :id)], file}

# force the entire stream to run and get the last entry
last_encoded_id =, -1)

In other words, we encode each row, store it to file, and keep track of the id of the stored rows. You could replace Stream.transform + with Enum.reduce, but then you need to open file before Enum.reduce, and use try/after to ensure the file is closed immediately after.