What is the elixir way of processing long excels/csv files line by line (or any similar large inputs that is itemizable)

Hi there

I am still learning my elixir ways, doing my first real elixir project. It is going to process excels or CSV files of user feedback, analyze them line by line with AI and provide useful summaries on e.g. whether users complain more about performance rather than some functionality.

Coming from JVM world once the file is uploaded I would use something like SpringBatch or Quartz with csv/excel readers able to stream through the file and create records in database (or push to RabbitMQ) in the atomic manner (so if server got rebooted while parsing row 124, job would continue from row 124 when restarted).

And then a similar job for actual analysis would fetch rows (from db or rabbitmq) and again in atomic way would process things row by row.

What is the elixir way of doing it?
I am googling and reading this forum about batch jobs in elixir and it seems like the most mature way that should work out of the box is Oban (the nice controlling dashboard for it is expensive though, but maybe I could live without it).

The part I am missing is this “atomicity” or parsing files and processing already parsed rows.

  • Could somebody, please, point me into the direction to research?
  • Or is Oban the wrong framework for it and not really suited for splitting large job into small atomically failing or succeeding items? What would you look at then?
1 Like

Hi @artem!

I think the architecture can be quite similar to the Java one. You can use either NimbleCSV or Explorer packages for parsing CSV. For processing later on, you can use Oban or something like Broadway RabbitMQ.

2 Likes

Wow, that was a fast response, @josevalim !
Maybe I just fail to locate the proper starting pointers :confused:

It is the atomicity part that I can’t figure. I played with NimbleCSV already and it seems to do its job pretty well. I can’t figure how to make it so that for example server reboots during parsing of row 124 (before it’s saved to DB), I’d like the job to continue from parsing the same row next time the job is resumed, not reparsing the first 123 rows (and therefore not creating 123 duplicated analysis requests).

I guess I could build such a mechanism myself (e.g. by just storing number of processed rows of each job somewhere in the DB and asking NimbleCSV to skip them), but there should be some some libraries or known general ways of doing that already, right?

P.S.
As for connecting to external services (such as RabbitMQ), I’d like to avoid it. It is half-a-learning project on a budget, so I am trying to learn more elixir ways of doing things rather than offloading them to an external service. And well, it is unlikely that I’ll have real huge excels all the time, so most of the time such external service would be doing nothing while needing some maintenance.

I don’t think there’s anything prebuild mixing how you parse and iterate through files with maintaining and persisting the state of where you are. There are libraries for each piece of the puzzle, but you’ll still need to do the composition on your own.

1 Like

There aren’t am I’m not sure there are any libraries in other languages for doing this, the simple answer is that this feature is so specific that when you actually need it, you want to implement it yourself. I worked at a project where you would process a lot of CSV files concurrently and the solution we used is only to validate the data once all file was processed, but in that case the files we received were relatively small (about 20k lines).

If you really have very large files, I would recommend to split that file into parts (either by number of lines or by size), and process them as simple files, in this way you don’t have to come up with any abominations on db side.

Hmm, in SprintBatch if memory doesn’t let me down (it was happening several years ago) I used reader/writer interfaces for processing just one item of a batch.

@D4no0 , this approach could be okay enough for the excel, but after that… I would really want to analyze hundreds or thosands of feedback one at a time. So my imagination was to have a single “process feedbacks from collection 123” and then readers would read user feedbacks from DB, process then one-by-one (via an HTTP call to external model) and save the results via writer.

If it indeed isn’t something usual in Elixir world, well, then it is this way and I’ll need to build some custom machinery. I am just surprised if it is so and would like to double check that it’s not my poor googling skills.

There’s a lot of machinery and tradeoffs to be made for such a broad requirement. You can look at e.g. broadway or genstage, which are common elixir tools to build processing pipelines, but those might also be to low level for what you seems to be looking for.

1 Like

It is quite common but as others have said, your scenario is quite specific so there’s no custom library just for that.

Your task sounds like you would read every CSV record and put them in a DB and mark them with “not processed yet”, and then have a consumer that will dispatch them to processing agents. Very easy and trivial in Elixir, especially to make use of all CPU cores and maximize throughput. I’ve done such tasks in Elixir, Java, Golang, Rust.

Look at Oban and Broadway, they have what you need.

2 Likes

I spent a fair amount of time doing large-scale data processing in Elixir. Once you have things loaded up into a queue of some sort, then you have many more tools available for handling failures. But that first pass over the big data file is hard to avoid, and I’m not aware of any tools in any language which fully “solve” for the problem of keeping track of your progress while doing that initial processing of the file.

I didn’t find a silver bullet for recovering seamlessly from crashes, but here are 2 methods I found to help with the chore of getting data from the files and into the queue:

  1. Use Bash’s split utility to separate a larger file into smaller files. This helps you divide and conquer, so if things do really go off the rails, you can at least keep track of which files have been successfully processed. One pattern I used grouped items together and relied on Ecto’s insert_all/2 function – this was much more efficient than performing individual database operations, for example:
# Split will create files with an `x` prefix, e.g. `xaa`, `xab`, etc.
input_files = ~w(
/foo/xaa
/foo/xab
/foo/xac
# ... etc...
)

Task.Supervisor.start_link(name: TmpTaskSupervisor)
Task.Supervisor.async_stream_nolink(
  TmpTaskSupervisor,
  input_files,
  fn input_file ->
    input_file
    |> File.stream!()
    |> Stream.chunk_every(1000)
    |> Stream.each(fn chunk ->
      rows = chunk
        |> Enum.map(fn line ->
          %{payload: String.trim(line), foo: "bar", etc: "etc"}
        end)
      MyApp.Repo.insert_all(MyApp.Something, rows)
    end)
    |> Stream.run()

  end,
  timeout: 86400_000,
  max_concurrency: 50
)
|> Enum.to_list()
  1. I had some success in some cases using Stream.with_index/2 to help me keep track of which line I was processing – e.g. I could write this number to a file (but in some circumstances, e.g. on AWS, these file operations became a bottleneck). If I needed to recover after a failure, I could pass the number of the last successful line processed to Stream.drop/2 and use that to skip past rows that had already been processed. This can still take a while on a long file, but it’s much faster than re-parsing the whole file.

Relatedly: I’ve been doing some benchmarking of Elixir (e.g. with Python), and one of the tasks involved reading over a large CSV. There are a handful of different patterns I tried, e.g. ex_vs_py/control.ex at main · fireproofsocks/ex_vs_py · GitHub My hot take on this was that Python was quite performant for these types of quick one-off tasks – I think any tool is fair game for you to “prime the pump” and get your data out of files and into a queue so you can benefit from the supervision tools available to Elixir as it handles the long-running process.

3 Likes

Thank you, folks. I suppose I really need to focus on splitting “loading excel/csv to DB” and “process these” to separate things. Processing can be just a regular queue pretty much.

Just a variation here – I work with event streams a lot (effectively, infinite streams of incoming data, often arriving in “chunks”). My goto solution is to use Kafka as the inbound message layer. In your case, you could simply grab an inbound CSV file, break it into lines, and feed those lines into Kafka (effectively turning each line into an event). Once it’s in Kafka, it’s safe… on the other end, you have something reading the events (“lines”) one at a time and processing them. If something goes wrong and it crashes, you just restart at the same location, reading from the queue.

Pretty much exactly what others have alluded to, but thought I’d spell it out. Kafka offers you a lot of excellent consistency guarantees and is one of the highest performing tools when it comes to sheer throughput.

I very much doubt you would need to (because of the sheer speed Kafka runs) but, you could also chunk the data, say 5 lines at a time, to try and squeeze a little more performance out… but, I’d be pretty surprised if the gain would be worth it.

2 Likes