Issue with streaming into a csv

I am trying to stream into a csv file using nimble_csv(1.2.0) but running into issues.

This is the code I am using to create a csv file from the stream.

defmodule Utils
  alias NimbleCSV.RFC4180, as: CSV

  def write_to_file(content_stream) do
    path = Path.join(System.tmp_dir(), "#{UUID.uuid4(:hex)}.csv")

    content_stream
    |> CSV.to_line_stream()
    |> CSV.parse_stream([skip_headers: false])
    |> Stream.into(File.stream!(path, [:write, :utf8]))
    |> Stream.run()
  end
end

This is my test data

contents = [
    "First Name,Last Name,Email\n",
    "David,Byrne,david@test.com"
  ]
  |> Stream.map(&String.trim_leading/1)
   
Utils.write_to_file(contents)

The file is created however the contents in the file are missing the comma separator
and the new line characters

First NameLast NameEmailDavidByrnedavid@test.com

If however I update Utils.write_to_file/1 to by removing parse_stream/2

def write_to_file(content_stream) do
    path = Path.join(System.tmp_dir(), "#{UUID.uuid4(:hex)}.csv")

    content_stream
    |> CSV.to_line_stream()
    |> Stream.into(File.stream!(path, [:write, :utf8]))
    |> Stream.run()
end

I get the csv contents in the correct format.

First Name,Last Name,Email
David,Byrne,david@test.com

I am not sure if I am missing any options that I need to pass to parse_stream/2 that would cause the contents to be malformed.

Any help would be appreciated

You’re trying this but your code is not streaming into a csv. It’s streaming data out of an csv into data. parse_stream is parsing a stream of csv data into elixir data. The reverse would be dump_to_stream, which turns elixir data into a stream for csv formatted data.

Okay thanks. So if I already have a stream do I just write it straight into the file without using dump_to_stream/1 ?

content_stream
|> Stream.into(File.stream!(path, [:write, :utf8]))
|> Stream.run()

Also as per this NimbleCSV — NimbleCSV v1.2.0 since the description says Lazily parses CSV from a stream and returns a stream of rows. I was assuming parse_stream/2 could be passed a stream and it would output a stream

Streams are lazy enumerables. Being a stream doesn’t tell you anything about what data the streams deals with. parse_stream maps from a stream of csv formatted binaries to a stream of rows being lists of cell contents. It’s the lazy version to parse_enumerable, which immediately returns [[binary()]].

In your case I’m really confused though because if you already have a csv file and you want to write that csv file, then there’s no need for NimbleCSV in the first place.

Enum.into(content_stream, file_collectable) would be all you need.

What makes streams even more confusing here is that a File.Stream struct implements both Enumerable.t (a - in this case lazy – collection to enumerate) as well as Collectable.t (a collection, which items can be pushed into), which work independently. You’re using the latter functionality here.

2 Likes

Thanks for the detailed info @LostKobrakai. Appreciate it :+1: