I recently faced a similar task, so hopefully some of my experiences will come in handy.
COPY
was my first thought. It’s the most efficient way to import CSV data to postgres, period.
It’s also the least flexible way from an Elixir app perspective. Building a custom query, handling separators, error handling/reporting, testing, finally “all or nothing” semantics (it’s a single transaction).
If you’re OK with the above, COPY
is for you.
If you’re troubled with performance and still don’t want to completely sacrifice flexibility, consider using nimble_csv. Thanks to super clever implementation (metaprogramming and binary matching), sequential parsing is way faster than the library you’ve mentioned, that attempts parsing in parrallel (IIRC 5M rows in 20 seconds vs 2 minutes according to my microbenchmarks).
nibmble_csv
works with Streams too, so you’ll be fine when it comes to memory spikes.
If you care about parallel processing later on, to make the DB insertion efficient (by utilizing the connection pool and bulk inserts), here’s what you can do to make it reasonably fast:
stream # can be a file, but you can use IO.stream for testing
|> YourNimbleParser.parse_stream(headers: discard_headers?) # nimble_csv discards it by default
|> Stream.chunk(1000, 1000, []) # experiment, fine tune
|> process_in_parallel(&chunk_handler_fn/1)
process_in_parallel
implementation is entirely up to you. If you’re on Elixir 1.4, you may use async_stream
; if lower than that, parallel_stream
looks like an OK choice. Just make sure the number of parallel processes is somewhat in line with System.schedulers_online
and your database pool size. Make it configurable, measure, rinse and repeat. How scheduling works.
Your chunk_handler_fn/1
will receive… a chunk of 1000 decoded rows. You may prepare changesets there, have them validated, filter the chunk based on valid?
property, remap the columns according to a custom mapping rule, build a list of maps to be inserted (changeset.changes
is already there for you, perhaps needs to be enriched a little) and push it through Repo.insert_all
in each individual process.
Caveats:
- The parallel bulk inserts are not wrapped in a single transaction. You have to deal with individual chunk errors. Use
on_conflict
option if the need be to perform an upsert/ignore constraint errors.
- You don’t really deal with individual rows. If you’d like to report progress you have to figure out the number of chunks upfront and emit a notification every time a batch was inserted/processed. A separate short-living process gathering the stats and forwarding progress could be a good idea.
- It’s quite fast. In my case I was able to parse, validate/reject and insert 1M rows/minute on average. That was local, no latency penalty etc.
- If you’re looking to report errors for individual rows and match them with CSV file line number, look up
Stream.with_index
. Note, that you will have to calculate the offset based on the header presence (the very first row gets either skipped or included).
-
nimble_csv
will brutally crash on bad rows, e.g. discontinued quote. You might want to rescue from (catch) NimbleCSV.ParseError
and convert it to something useful ({:error, {:parse_error, reason}}
tuple most likely).
Hope this helps.
Cheers
edit added a note about NimbleCSV.ParseError
handling, fixed grammer