NimbleCSV error trying to parse a large CSV partitioned in many files

Hi there! I have large CSV file (2GB) so I decided to partitioned it in many files and then I’m trying to use Streams, Flow and NimbleCSV in order to parse and filter it but I’m getting an error that I can’t figure out why it happens, in the code bellow it reaches perfectly the reduce section but then it raises the described error:

CSV parsing code:

streams
    |> Flow.from_enumerables()
    |> Flow.flat_map(&String.split(&1, "\n"))
    |> Flow.stream()
    |> NimbleCSV.RFC4180.parse_stream(skip_headers: false)
    |> Stream.transform([], fn r, acc ->
      IO.inspect(acc, label: "Reaches here and builds the new %{}")
      if acc == [] do
        # first row contains the column names, we put them in the accumulator
        {%{}, r}
      else
        # other rows contain the values, we zip them with the column names
        {[acc |> Enum.zip(r) |> Enum.into(%{})], acc}
      end
    end)
    # skip the header row
    |> Stream.drop(1)
    |> Flow.from_enumerable()

The error:

[error] GenServer #PID<0.714.0> terminating
** (NimbleCSV.ParseError) expected escape character " but reached the end of file
    (nimble_csv 1.2.0) lib/nimble_csv.ex:433: NimbleCSV.RFC4180.finalize_parser/1
    (elixir 1.14.4) lib/stream.ex:993: Stream.do_transform_user/6
    (elixir 1.14.4) lib/stream.ex:942: Stream.do_transform/5
    (elixir 1.14.4) lib/stream.ex:1813: Enumerable.Stream.do_each/4
    (gen_stage 1.2.1) lib/gen_stage/streamer.ex:52: GenStage.Streamer.handle_demand/2
    (gen_stage 1.2.1) lib/gen_stage.ex:2223: GenStage.noreply_callback/3
    (stdlib 4.3) gen_server.erl:1123: :gen_server.try_dispatch/4
    (stdlib 4.3) gen_server.erl:1200: :gen_server.handle_msg/6
    (stdlib 4.3) proc_lib.erl:240: :proc_lib.init_p_do_apply/3

The CSV is separated by “,”

Any idea to solve this or instead on how to parse and filter this large CSV in a good timeframe? without partitioning the file it’s taking around 15’ to apply first filter to extract records by certain row.

1 Like

It looks like the input CSV is malformed (which I frequently see) and there are a few creative ways to deal with that depending on the issue (mismatched double quotes?).

When dealing with large CSVs I always reach for GitHub - BurntSushi/xsv: A fast CSV command line toolkit written in Rust. which is lightning fast. The xsv split allows you to easily split the large file into many smaller files and I think the filter or search commands can help what you are doing also. There are many useful subcommands.

My Elixir code calls this executable when needed using System.cmd then I use NimbleCSV when needing to parse any outputs into Elixir/Ecto/etc.

1 Like

Yeah I will always vote for xsv, it’s amazing and has saved me a lot of time.

And it’s perfect for what you describe: have it massage / fix the data and then feed it to the code. I’ve done exactly as you no less than 15 times in the last years.

Thanks a lot guys! I will make a try :slight_smile:

btw I am not sure why you even need to split a meager 2GB file. I wrote code that ingested 14GB XML files and 7-8GB CSV files. When you use the streaming parsers it really doesn’t matter how big the files are, the RAM usage is mostly constant.

Is not that I need it but was looking for strategies to make it faster and reading the Flow documentation I saw this advice: Avoid single sources

Ah, you want to parallelize it maximally? I mean OK, that would probably work, yeah, though I am not sure. I’d think a streaming reader will super quickly fill up a queue of 1 million records to parse.

But not claiming either way, your idea is also sound. I’d measure first but wouldn’t judge if you didn’t. It’s a valid approach.

2 Likes