Stream CSV file from a remote zip on S3

I have a zip file, containing CSVs, on a remote S3 cellar.

Using Unzip, I can get a stream of the CSV file that I would like to decode, like so :

    aws_s3_config =
      ExAws.Config.new(:s3,
        access_key_id: ["xxx", :instance_role],
        secret_access_key: ["xxx", :instance_role]
      )

    file = new(zip_name, bucket_name, aws_s3_config)
    {:ok, unzip} = Unzip.new(file)
    stream = Unzip.file_stream!(unzip, file_name)

as explained in the doc.

Now I would like to consume that stream by reading it with CSV.
So I try stream |> CSV.decode |> Enum.take(1) and get an error ** (FunctionClauseError) no function clause matching in CSV.Decoding.Preprocessing.Lines.starts_sequence?/5

If I write the content of my CSV on the disk and then read it, it works fine :

# write the file on disk
stream |> Stream.into(File.stream!("stops.txt")) |> Stream.run()
# then read and decode it
File.stream!("stops.txt") |> CSV.decode |> Enum.take(1)

I get the desired result, the first row of the CSV file : [ok: ["\uFEFFstop_id", "stop_name", "stop_lat", "stop_lon", "location_type"]]

The difference I see is that Unzip.file_stream! and File.stream!("stops.txt") do not stream the file the same way. Unzip seem to do it by chunks of 65k, while File.stream! streams line by line.

How can I solve this, without writing the file to disk as an intermediary step?
Thanks!

1 Like

Hello,

I don’t know if it will help, as I’m not using Unzip, but StreamGzip, in combination with NimbleCSV, for this purpose. But maybe it will give you some hint?

I have the following function that returns a stream for an object downloaded from S3:

  defp get_object_stream(object) do
    {:ok, io_pid} = StringIO.open(object)

    io_pid
    |> IO.binstream(4096)
    |> StreamGzip.gunzip()
    |> NimbleCSV.RFC4180.to_line_stream()
  end

In my case, the trick was to use to_line_stream.

I then can use this stream like this:

object
|> get_object_stream()
|> NimbleCSV.RFC4180.parse_stream()

As you can see, I’m not streaming directly from S3 as I first download the object in memory, but if you have something that’s already able to stream from S3, you would just have to replace the part that constructs the stream from the in-memory string with your stream from S3.

3 Likes

Echoing what @ahamez has already mentioned, the issue seems to be that CSV.decode expects stream of lines. But Unzip.file_stream! returns stream of blobs. You can convert stream of blobs to stream of lines yourself, or you can use NimbleCSV as already mentioned.

Unzip.file_stream!(unzip, file_name)
|> NimbleCSV.RFC4180.to_line_stream()
|> NimbleCSV.RFC4180.parse_stream()
1 Like

Thanks for you replies.

Unfortunately, applying to_line_stream and parse_stream yields an error:

stream 
|> NimbleCSV.RFC4180.to_line_stream() 
|> NimbleCSV.RFC4180.parse_stream() 
|> Stream.run()
** (ArgumentError) errors were found at the given arguments:

  * 1st argument: not a bitstring
    :erlang.bit_size(["\uFEFFservice_id,monday,tuesday,wednesday,thursday,friday,saturday,sunday,start_date,end_date\r\nE1-5-1-127,1,1,1,1,1,1,1,20220115,20220115\r\nH2-0-1-1,1,0,0,0,0,0,0,20220103,20220415\r\nH2-0 (...)
    (nimble_csv 1.2.0) lib/nimble_csv.ex:393: NimbleCSV.RFC4180.to_line_stream_chunk_fun/3
    (elixir 1.12.2) lib/stream.ex:264: anonymous fn/4 in Stream.chunk_while_fun/2
    (elixir 1.12.2) lib/enum.ex:4280: Enumerable.List.reduce/3
    (elixir 1.12.2) lib/stream.ex:931: Stream.do_list_transform/7
    (elixir 1.12.2) lib/stream.ex:1719: Enumerable.Stream.do_each/4
    (elixir 1.12.2) lib/stream.ex:880: Stream.do_transform/5
    (elixir 1.12.2) lib/stream.ex:649: Stream.run/1

What I don’t understand, is the return structure of Unzip.file_stream!. I would expect the stream to return some data, chunk by chunk. If I do Unzip.file_stream! |> Enum.to_list(), I thought I would get something like ["some binary data", "some other binary data", "..."]. Instead I get a nested list of data, that looks like this :

[
 [
  [
   ["some data"], 
   "some other data"], 
   "data again"`]
]

That’s why to_line_stream fails. It expects an enumerable of binaries.

Ok thanks, I needed to adapt the stream coming from Unzip like so :

Unzip.file_stream!(unzip, file_name) 
|> Stream.map(fn c -> List.flatten(c) |> Enum.join("") end) 
|> NimbleCSV.RFC4180.to_line_stream() 
|> NimbleCSV.RFC4180.parse_stream() 
|> Enum.to_list() 

Thank you all for your kind assistance and for giving me good tips!

1 Like

List.flatten(c) |> Enum.join("") would probably better replaced with IO.iodata_to_binary/1

3 Likes

Nice, thanks @LostKobrakai :+1:

I wrote a blog post on the subject, if than can be useful to someone.