Inserting CSV into postgres

I recently faced a similar task, so hopefully some of my experiences will come in handy.

COPY was my first thought. It’s the most efficient way to import CSV data to postgres, period.
It’s also the least flexible way from an Elixir app perspective. Building a custom query, handling separators, error handling/reporting, testing, finally “all or nothing” semantics (it’s a single transaction).

If you’re OK with the above, COPY is for you.

If you’re troubled with performance and still don’t want to completely sacrifice flexibility, consider using nimble_csv. Thanks to super clever implementation (metaprogramming and binary matching), sequential parsing is way faster than the library you’ve mentioned, that attempts parsing in parrallel (IIRC 5M rows in 20 seconds vs 2 minutes according to my microbenchmarks).

nibmble_csv works with Streams too, so you’ll be fine when it comes to memory spikes.

If you care about parallel processing later on, to make the DB insertion efficient (by utilizing the connection pool and bulk inserts), here’s what you can do to make it reasonably fast:

stream # can be a file, but you can use IO.stream for testing
|> YourNimbleParser.parse_stream(headers: discard_headers?) # nimble_csv discards it by default
|> Stream.chunk(1000, 1000, []) # experiment, fine tune
|> process_in_parallel(&chunk_handler_fn/1)

process_in_parallel implementation is entirely up to you. If you’re on Elixir 1.4, you may use async_stream; if lower than that, parallel_stream looks like an OK choice. Just make sure the number of parallel processes is somewhat in line with System.schedulers_online and your database pool size. Make it configurable, measure, rinse and repeat. How scheduling works.

Your chunk_handler_fn/1 will receive… a chunk of 1000 decoded rows. You may prepare changesets there, have them validated, filter the chunk based on valid? property, remap the columns according to a custom mapping rule, build a list of maps to be inserted (changeset.changes is already there for you, perhaps needs to be enriched a little) and push it through Repo.insert_all in each individual process.

Caveats:

  • The parallel bulk inserts are not wrapped in a single transaction. You have to deal with individual chunk errors. Use on_conflict option if the need be to perform an upsert/ignore constraint errors.
  • You don’t really deal with individual rows. If you’d like to report progress you have to figure out the number of chunks upfront and emit a notification every time a batch was inserted/processed. A separate short-living process gathering the stats and forwarding progress could be a good idea.
  • It’s quite fast. In my case I was able to parse, validate/reject and insert 1M rows/minute on average. That was local, no latency penalty etc.
  • If you’re looking to report errors for individual rows and match them with CSV file line number, look up Stream.with_index. Note, that you will have to calculate the offset based on the header presence (the very first row gets either skipped or included).
  • nimble_csv will brutally crash on bad rows, e.g. discontinued quote. You might want to rescue from (catch) NimbleCSV.ParseError and convert it to something useful ({:error, {:parse_error, reason}} tuple most likely).

Hope this helps.

Cheers

edit added a note about NimbleCSV.ParseError handling, fixed grammer

10 Likes