Best way to read data in the most performant way from multiple large CSV files?

My objective is to read data (in the most performant approach) from multiple large CSV files, make minimal transformations and eventually write to output files. I want to ensure I am leveraging all the compute on my machine.

I am using the following deps:

      {:flow, "~> 1.2.3"},
      {:nimble_csv, "~> 1.2.0"},
      {:parallel_stream, "~> 1.1.0"}

Let me know your thoughts if this code is performant.

defmodule Sample do
  alias NimbleCSV.RFC4180, as: CSV


  def process_data(datafile) do
    datafile
    |> File.stream!()
    |> Flow.from_enumerable()
    |> Flow.map(fn row ->
      [row] = CSV.parse_string(row, skip_headers: false)
      %{
        id: :binary.copy(Enum.at(row, 0)),
        name: :binary.copy(Enum.at(row, 2)),
        place: :binary.copy(Enum.at(row, 4))
      }
    end)
    # |> Enum.to_list()
    |> Flow.run()
  end

  def read_files() do
    Path.wildcard("data/*.csv")
  end

  def init() do
    read_files()
    |> ParallelStream.map(fn file ->
      process_data(file)
    end)
    |>Enum.into([])
  end

end

Sample.init() invokes the file(s) processing.

Thanks,
Raza

One note unrelated to performance - File.stream! returns lines by default, but a CSV row may span multiple lines if it contains an embedded \n. For instance, this is a valid CSV with one row and three columns:

foo,"bar
baz",wat

parse_stream has additional machinery to handle escapes (like the " before bar on line 1 above) that cross line boundaries, but parse_string assumes the given string contains the whole CSV file.

I would recommend you benchmark your code: Readme — Benchee v1.1.0.

And I’m corious as to why you use :binary.copy like that?

Looks like File.Read is faster than File.Stream: Surprising behavior of File.stream vs File.read

The intent is to use the data in later transformation processes.

https://hexdocs.pm/nimble_csv/NimbleCSV.html#module-binary-references

1 Like

Thanks for pointing this out. However, in my case, I am certain that the rows will not spill over to multiple lines.

Any reason to use ParallelStream.map here when Task.async_stream works just fine? :thinking: I’m likely missing something here.

But I would only swap that out and then benchmark.

What are your reservations towards your code? Has it proven to be slower than you wanted it to be?