I doubt it’s what was requested. According to docs on Stream.take/2.
If a negative count is given, the last count values will be taken. For such, the collection is fully enumerated keeping up to 2 * count elements in memory.
FWIW, 4 years ago I even proposed the PR to the core to make transform/4 to preserve an accumulator, but it was ruled out in favour of chunk_×××/× family to deal with it.
The last element of an enumerable can be fetched with Enum.at(enumerable, -1). However this won’t help you here. Conceptually the problem can be described as:
# at the input we have unencoded table data (enumerable of lists)
|> CSV.encode()
# at the output we have a stream of encoded rows
So whatever additional info you collect (like e.g. id of the last row), it has to be discarded before CSV.encode, and at the output you get a stream of encoded rows.
For this particular case I’d try something like this (untested):
unencoded_rows =
# using Stream.resource to avoid `Stream.unfold` + `Stream.flat_map`
Stream.resource(
fn -> start_id end,
fn from_id ->
case Log.list(%{from_id: from_id, size: @size}) do
[] -> {:halt, nil}
logs -> {logs, logs |> List.last() |> Map.get(:id)}
end
end,
fn _ -> :ok end
)
encoded_ids =
# using `Stream.transform` to ensure the file is open only while the stream is being consumed
Stream.transform(
unencoded_rows,
fn -> File.open!(file_path, [:write, :utf8]) end,
fn log, file ->
# encode to csv and store as a side-effect
csv_row = [log] |> CSV.encode() |> Enum.to_list()
IO.write(file, csv_row)
# return the id of the encoded entry
{[Map.get(log, :id)], file}
end,
&File.close/1
)
# force the entire stream to run and get the last entry
last_encoded_id = Enum.at(encoded_ids, -1)
In other words, we encode each row, store it to file, and keep track of the id of the stored rows. You could replace Stream.transform + Enum.at with Enum.reduce, but then you need to open file before Enum.reduce, and use try/after to ensure the file is closed immediately after.