What's the best way to chunk a stream by an arbitrary non-newline character?

When streaming input from a file of unknown size, formatted as a single line of comma-separated values, what’s the best way to operate on each comma-separated element in that file?

Example input file input_file.txt:

"AAA","BBB","CCC","DDD"... for a few million characters

Splitting by line:
File.stream!/3 conveniently defaults to separating by :line, but that mode is fixed to splitting on \n or \r\n.

> File.stream!("input_file.txt") |> Enum.to_list()
> ["\"AAA\",\"BBB\",\"CCC\"..."]

Splitting by byte & chunking stream:
File.stream!/3 also accepts a number of bytes, so setting the byte size to 1 (reading the input as raw, not UTF-8) and passing it to Stream.chunk_by/2 constructs something closer to the stream we want:

> File.stream!("input_file.txt", [], 1) |> Stream.chunk_by(&(&1 == ",")) |> Enum.to_list()
> [                      
    ["\"", "A","A","A", "\""],
    [","],
    ["\"", "B","B","B", "\""],
    [","],
    ...
  ]

From this we could probably chain further operations on this stream to filter out the unwanted punctuation and join the desired characters together, but it doesn’t feel like the best way to solve this problem.

What other patterns are there for chunking a stream by an arbitrary character?

How about?

File.stream!("input_file.txt", [], 1)
|> Stream.chunk_while("", fn x, acc ->
  case x do
    "," -> {:cont, acc, ""}
    _ -> {:cont, acc <> x}
  end
end, fn _ -> {:cont, []} end)
|> Enum.to_list()
1 Like

Please don’t roll your own CSV parser. You can always use NimbleCSV.parse_stream.

If you are worried about IO performance, you can also pass options to File.stream! that ensure that the API will read-ahead e.g. 512KB of data – this is a very common practice in Erlang/Elixir land when having to ingest data from files.

1 Like

I’d agree with you on working with CSVs - it’s preferable to coordinate on some highly optimized open source CSV parsers. However, NimbleCSV.parse_stream and NimbleCSV.to_line_stream both expect line-oriented streams, and if the input doesn’t match that format they won’t be able to chunk properly.

The suggestion to increase the number of bytes read by File.stream! is excellent, though - we probably could improve the IO performance of @nallwhy’s solution by streaming larger chunks. Thanks!

That’s not true. to_line_stream is exactly to transform a non-line chunked stream into a line chunked stream, so the result can be used as the input to parse_stream.

6 Likes

Oh, that’s great.

My impression was that to_line_stream processes a stream that is csv/RFC4180 formatted, but which happens to be arbitrarily chunked at non-line-break locations. And this seems to be the default behavior, but I see I could pass a :newlines option to NimbleCSV.define/2 with whatever delimiter is used in my file stream, which will be used by to_line_stream. Thanks!