How can I get last value of stream?

I need to read very large data and write that to csv file.
I want to get last datum of the data after writing.

file = File.open!(file_path, [:write, :utf8])

Stream.unfold(start_id, fn
  from_id ->
    logs = Log.list(%{from_id: from_id, size: @size})

    last_id =
      case logs |> List.last() do
        nil -> nil
        last_log -> last_log |> Map.get(:id)
      end

    {logs, last_id}
end)
|> Stream.flat_map(fn logs ->
  logs |> to_list()
end)
|> CSV.encode()
|> some_func() <- This is what I want

1039292

using Enum.to_list() |> List.last() takes too large resource.

simple version

For example:

[1, 2, 3]
|> Stream.map(fn x -> x * 2 end)
|> some_func()

6

Use Stream.take/2 with -1 as count as long as the Stream is not infinite

1 Like

I doubt it’s what was requested. According to docs on Stream.take/2.

If a negative count is given, the last count values will be taken. For such, the collection is fully enumerated keeping up to 2 * count elements in memory.


I would use Stream.transform/4 instead of Stream.unfold/2.

3 Likes

You are indeed correct as Stream.transform/4 with an agent would do the job correctly.

enum = [1, 2, 3, 4, 5]
{:ok, agent_pid} = Agent.start_link(fn -> nil end)
start_fun = fn -> nil end
reducer_fun = fn v, _ -> {[], v} end
after_fun = fn v -> Agent.update(agent_pid, fn _ -> v end) end
stream = Stream.transform(enum, start_fun, reducer_fun, after_fun)
:ok = Stream.run(stream)
value = Agent.get(agent_pid, & &1)
:ok = Agent.stop(agent_pid)
value

FYI Elixir's Stream.transform doesn't emit its accumulator at the end of the input. What to do? · GitHub

4 Likes

FWIW, 4 years ago I even proposed the PR to the core to make transform/4 to preserve an accumulator, but it was ruled out in favour of chunk_×××/× family to deal with it.

That said, we might abuse Stream.chunk_while/4 to do what we want.

[1, 2, 3]
|> Stream.chunk_while(
  nil,
  fn e, _ -> {:cont, e} end, # do whatever here, return `e`
  fn acc -> {:cont, acc, acc} end
)
|> Enum.to_list()
#⇒ [3]
1 Like

The last element of an enumerable can be fetched with Enum.at(enumerable, -1). However this won’t help you here. Conceptually the problem can be described as:

# at the input we have unencoded table data (enumerable of lists)
|> CSV.encode()
# at the output we have a stream of encoded rows

So whatever additional info you collect (like e.g. id of the last row), it has to be discarded before CSV.encode, and at the output you get a stream of encoded rows.

For this particular case I’d try something like this (untested):

unencoded_rows =
  # using Stream.resource to avoid `Stream.unfold` + `Stream.flat_map`
  Stream.resource(
    fn -> start_id end, 
    fn from_id ->
      case Log.list(%{from_id: from_id, size: @size}) do
        [] -> {:halt, nil}
        logs -> {logs, logs |> List.last() |> Map.get(:id)}
      end
    end,
    fn _ -> :ok end
  )

encoded_ids =
  # using `Stream.transform` to ensure the file is open only while the stream is being consumed
  Stream.transform(
    unencoded_rows,
    fn -> File.open!(file_path, [:write, :utf8]) end,
    fn log, file ->
      # encode to csv and store as a side-effect
      csv_row = [log] |> CSV.encode() |> Enum.to_list()
      IO.write(file, csv_row)

      # return the id of the encoded entry
      {[Map.get(log, :id)], file}
    end,
    &File.close/1
  )

# force the entire stream to run and get the last entry
last_encoded_id = Enum.at(encoded_ids, -1)

In other words, we encode each row, store it to file, and keep track of the id of the stored rows. You could replace Stream.transform + Enum.at with Enum.reduce, but then you need to open file before Enum.reduce, and use try/after to ensure the file is closed immediately after.

3 Likes