My objective is to read data (in the most performant approach) from multiple large CSV files , make minimal transformations and eventually write to output files. I want to ensure I am leveraging all the compute on my machine.
I am using the following deps:
{:flow, "~> 1.2.3"},
{:nimble_csv, "~> 1.2.0"},
{:parallel_stream, "~> 1.1.0"}
Let me know your thoughts if this code is performant.
defmodule Sample do
alias NimbleCSV.RFC4180, as: CSV
def process_data(datafile) do
datafile
|> File.stream!()
|> Flow.from_enumerable()
|> Flow.map(fn row ->
[row] = CSV.parse_string(row, skip_headers: false)
%{
id: :binary.copy(Enum.at(row, 0)),
name: :binary.copy(Enum.at(row, 2)),
place: :binary.copy(Enum.at(row, 4))
}
end)
# |> Enum.to_list()
|> Flow.run()
end
def read_files() do
Path.wildcard("data/*.csv")
end
def init() do
read_files()
|> ParallelStream.map(fn file ->
process_data(file)
end)
|>Enum.into([])
end
end
Sample.init() invokes the file(s) processing.
Thanks,
Raza
One note unrelated to performance - File.stream!
returns lines by default, but a CSV row may span multiple lines if it contains an embedded \n
. For instance, this is a valid CSV with one row and three columns:
foo,"bar
baz",wat
parse_stream
has additional machinery to handle escapes (like the "
before bar
on line 1 above) that cross line boundaries, but parse_string
assumes the given string contains the whole CSV file.
I would recommend you benchmark your code: Readme — Benchee v1.1.0 .
And I’m corious as to why you use :binary.copy
like that?
raza_ep:
y objective is to read data (in the most performant approach) from multiple large CSV files , make minimal transformations and eventually write to output files. I want to ensure I am leveraging all the compute on my machine.
I am using the following deps:
{:flow, "~> 1.2.3"},
{:nimble_csv, "~> 1.2.0"},
{:parallel_stream, "~> 1.1.0"}
Let me know your thoughts if this code is performant.
defmodule Sample do
alias NimbleCSV.RFC4180, as: CSV
def process_data(datafile) do
datafile
|> File.stream!()
|> Flow.from_enumerable()
|> Flow.map(fn row ->
[row] = CSV.parse_string(row, skip_headers: false)
%{
id: :binary.copy(Enum.at(row, 0)),
name: :binary.copy(Enum.at(row, 2)),
place: :binary.copy(Enum.at(row, 4))
}
end)
# |> Enum.to_list()
|> Flow.run()
end
def read_files() do
Path.wildcard("data/*.csv")
end
def init() do
read_files()
|> ParallelStream.map(fn file ->
process_data(file)
end)
|>Enum.into([])
end
end
Sample.init() invokes the file(s) processing.
Thanks,
Raza
Looks like File.Read is faster than File.Stream: Surprising behavior of File.stream vs File.read
Thanks for pointing this out. However, in my case, I am certain that the rows will not spill over to multiple lines.
Any reason to use ParallelStream.map
here when Task.async_stream
works just fine? I’m likely missing something here.
But I would only swap that out and then benchmark.
What are your reservations towards your code? Has it proven to be slower than you wanted it to be?