fchabouis
Stream CSV file from a remote zip on S3
I have a zip file, containing CSVs, on a remote S3 cellar.
Using Unzip, I can get a stream of the CSV file that I would like to decode, like so :
aws_s3_config =
ExAws.Config.new(:s3,
access_key_id: ["xxx", :instance_role],
secret_access_key: ["xxx", :instance_role]
)
file = new(zip_name, bucket_name, aws_s3_config)
{:ok, unzip} = Unzip.new(file)
stream = Unzip.file_stream!(unzip, file_name)
as explained in the doc.
Now I would like to consume that stream by reading it with CSV.
So I try stream |> CSV.decode |> Enum.take(1) and get an error ** (FunctionClauseError) no function clause matching in CSV.Decoding.Preprocessing.Lines.starts_sequence?/5
If I write the content of my CSV on the disk and then read it, it works fine :
# write the file on disk
stream |> Stream.into(File.stream!("stops.txt")) |> Stream.run()
# then read and decode it
File.stream!("stops.txt") |> CSV.decode |> Enum.take(1)
I get the desired result, the first row of the CSV file : [ok: ["\uFEFFstop_id", "stop_name", "stop_lat", "stop_lon", "location_type"]]
The difference I see is that Unzip.file_stream! and File.stream!("stops.txt") do not stream the file the same way. Unzip seem to do it by chunks of 65k, while File.stream! streams line by line.
How can I solve this, without writing the file to disk as an intermediary step?
Thanks!
Most Liked
ahamez
Hello,
I don’t know if it will help, as I’m not using Unzip, but StreamGzip, in combination with NimbleCSV, for this purpose. But maybe it will give you some hint?
I have the following function that returns a stream for an object downloaded from S3:
defp get_object_stream(object) do
{:ok, io_pid} = StringIO.open(object)
io_pid
|> IO.binstream(4096)
|> StreamGzip.gunzip()
|> NimbleCSV.RFC4180.to_line_stream()
end
In my case, the trick was to use to_line_stream.
I then can use this stream like this:
object
|> get_object_stream()
|> NimbleCSV.RFC4180.parse_stream()
As you can see, I’m not streaming directly from S3 as I first download the object in memory, but if you have something that’s already able to stream from S3, you would just have to replace the part that constructs the stream from the in-memory string with your stream from S3.
LostKobrakai
List.flatten(c) |> Enum.join("") would probably better replaced with IO.iodata_to_binary/1
akash-akya
Echoing what @ahamez has already mentioned, the issue seems to be that CSV.decode expects stream of lines. But Unzip.file_stream! returns stream of blobs. You can convert stream of blobs to stream of lines yourself, or you can use NimbleCSV as already mentioned.
Unzip.file_stream!(unzip, file_name)
|> NimbleCSV.RFC4180.to_line_stream()
|> NimbleCSV.RFC4180.parse_stream()








