Hi there! I have large CSV file (2GB) so I decided to partitioned it in many files and then I’m trying to use Streams, Flow and NimbleCSV in order to parse and filter it but I’m getting an error that I can’t figure out why it happens, in the code bellow it reaches perfectly the reduce section but then it raises the described error:
CSV parsing code:
streams
|> Flow.from_enumerables()
|> Flow.flat_map(&String.split(&1, "\n"))
|> Flow.stream()
|> NimbleCSV.RFC4180.parse_stream(skip_headers: false)
|> Stream.transform([], fn r, acc ->
IO.inspect(acc, label: "Reaches here and builds the new %{}")
if acc == [] do
# first row contains the column names, we put them in the accumulator
{%{}, r}
else
# other rows contain the values, we zip them with the column names
{[acc |> Enum.zip(r) |> Enum.into(%{})], acc}
end
end)
# skip the header row
|> Stream.drop(1)
|> Flow.from_enumerable()
The error:
[error] GenServer #PID<0.714.0> terminating
** (NimbleCSV.ParseError) expected escape character " but reached the end of file
(nimble_csv 1.2.0) lib/nimble_csv.ex:433: NimbleCSV.RFC4180.finalize_parser/1
(elixir 1.14.4) lib/stream.ex:993: Stream.do_transform_user/6
(elixir 1.14.4) lib/stream.ex:942: Stream.do_transform/5
(elixir 1.14.4) lib/stream.ex:1813: Enumerable.Stream.do_each/4
(gen_stage 1.2.1) lib/gen_stage/streamer.ex:52: GenStage.Streamer.handle_demand/2
(gen_stage 1.2.1) lib/gen_stage.ex:2223: GenStage.noreply_callback/3
(stdlib 4.3) gen_server.erl:1123: :gen_server.try_dispatch/4
(stdlib 4.3) gen_server.erl:1200: :gen_server.handle_msg/6
(stdlib 4.3) proc_lib.erl:240: :proc_lib.init_p_do_apply/3
The CSV is separated by “,”
Any idea to solve this or instead on how to parse and filter this large CSV in a good timeframe? without partitioning the file it’s taking around 15’ to apply first filter to extract records by certain row.