Inserting CSV into postgres

Hello,

I have found lovely CSV library that I will be using to insert CSV files into postgres.

But there are some issues I am wondering about, as I am fresh in elixir.

Ok, here is the sample code I will be using to insert the CSV into DB:

File.stream!("ignore/customers.csv")
|> CSV.decode
|> Enum.each(fn
        {:ok, [id, nm, csr, sal]} ->
            Customer.changeset(%Customer{},
                %{masterid: id,
                  custname: nm,
                  csrid: String.to_integer(csr),
                  salesid: String.to_integer(sal)})
            |> Repo.insert
         {:error, message} ->
            # Whatever you want to do with invalid rows
    end)

My questions are:

  1. In the Enum.each what is the time interval between each call to the Repo.insert? can I control this to make sure that my DB won’t get over-pumped with queries?

  2. As I need to implement a progress bar in the browser while the CSV being inserted into DB, Can I broadcast a progress message in successful Enum.each to client using channels? for ex: after |> Repo.insert to broadcast message as:

MyApp.Endpoint.broadcast client_topic, “progress”, %{
progress: progress_number
}

2 Likes

If you have a lot of records and don’t need data validation use COPY to pipe the data to postgres. This is very fast.

The time between each insert is actually the time it takes to insert the record, So you insert a record and then another. So you probably don’t have to worry about overloading the server. Also, broadcasting a message should also be possible using the technique you mention

3 Likes

Thank you, it’s there an example of using copy? Then I think I won’t be able to broadcast progress messages?

In general, no more than 25000 record expected in the CSV file

1 Like

https://github.com/elixir-ecto/postgrex/issues/228 I would definitely use COPY for that volume

2 Likes

Yea, if I need one second for each individual row to insert, it would take hours to complete…
So, as for the link provided, how would I refactor their example below, so that it will be feeded by the CSV.decode in my code above?

Postgrex.transaction(pid, fn(conn) →
query = Postgrex.prepare!(conn, “”, “COPY users TO STDOUT;”)
Postgrex.stream(conn, query, )
|> Enum.into(File.stream!(“users”), fn(%Postgrex.Result{rows: rows}) → rows end)
end)

But, then, I will lose validation as you mentioned? any work arround for possible errors to handle?

Thanks for valuable help!

1 Like

I recently faced a similar task, so hopefully some of my experiences will come in handy.

COPY was my first thought. It’s the most efficient way to import CSV data to postgres, period.
It’s also the least flexible way from an Elixir app perspective. Building a custom query, handling separators, error handling/reporting, testing, finally “all or nothing” semantics (it’s a single transaction).

If you’re OK with the above, COPY is for you.

If you’re troubled with performance and still don’t want to completely sacrifice flexibility, consider using nimble_csv. Thanks to super clever implementation (metaprogramming and binary matching), sequential parsing is way faster than the library you’ve mentioned, that attempts parsing in parrallel (IIRC 5M rows in 20 seconds vs 2 minutes according to my microbenchmarks).

nibmble_csv works with Streams too, so you’ll be fine when it comes to memory spikes.

If you care about parallel processing later on, to make the DB insertion efficient (by utilizing the connection pool and bulk inserts), here’s what you can do to make it reasonably fast:

stream # can be a file, but you can use IO.stream for testing
|> YourNimbleParser.parse_stream(headers: discard_headers?) # nimble_csv discards it by default
|> Stream.chunk(1000, 1000, []) # experiment, fine tune
|> process_in_parallel(&chunk_handler_fn/1)

process_in_parallel implementation is entirely up to you. If you’re on Elixir 1.4, you may use async_stream; if lower than that, parallel_stream looks like an OK choice. Just make sure the number of parallel processes is somewhat in line with System.schedulers_online and your database pool size. Make it configurable, measure, rinse and repeat. How scheduling works.

Your chunk_handler_fn/1 will receive… a chunk of 1000 decoded rows. You may prepare changesets there, have them validated, filter the chunk based on valid? property, remap the columns according to a custom mapping rule, build a list of maps to be inserted (changeset.changes is already there for you, perhaps needs to be enriched a little) and push it through Repo.insert_all in each individual process.

Caveats:

  • The parallel bulk inserts are not wrapped in a single transaction. You have to deal with individual chunk errors. Use on_conflict option if the need be to perform an upsert/ignore constraint errors.
  • You don’t really deal with individual rows. If you’d like to report progress you have to figure out the number of chunks upfront and emit a notification every time a batch was inserted/processed. A separate short-living process gathering the stats and forwarding progress could be a good idea.
  • It’s quite fast. In my case I was able to parse, validate/reject and insert 1M rows/minute on average. That was local, no latency penalty etc.
  • If you’re looking to report errors for individual rows and match them with CSV file line number, look up Stream.with_index. Note, that you will have to calculate the offset based on the header presence (the very first row gets either skipped or included).
  • nimble_csv will brutally crash on bad rows, e.g. discontinued quote. You might want to rescue from (catch) NimbleCSV.ParseError and convert it to something useful ({:error, {:parse_error, reason}} tuple most likely).

Hope this helps.

Cheers

edit added a note about NimbleCSV.ParseError handling, fixed grammer

10 Likes

Great answer! From the earlier example, I would definitely use Repo.insert_all. Also the parallel processing can be done with Task.async_stream from Elixir v1.4.

3 Likes

Thanks for detailed answer, I will try to do the best thing I can now

1 Like

Ok, here is my humble try with a CSV file that has 100 entry like:

CSV:

seq,first,last,email,digit
1,Antonio,Martinez,femeise@bi.gov,348429376
2,Robert,Butler,amfob@vaim.io,738848051

Code:

NimbleCSV.define(MyParser, separator: ",", escape: "\"")
File.stream!("/Users/samir/Downloads/100.csv")
|> MyParser.parse_stream
|> Stream.chunk(10)
|> Task.async_stream(&MyApp.CsvsController.chunk_handler_fn/1)


def chunk_handler_fn(csv_row) do
    IO.puts csv_row
end

But, chunk_handler_fn was never called, I have attached the simple 100.csv sample file here:

https://s3.eu-central-1.amazonaws.com/test-files-74/100.csv

Is there something I have missed?

1 Like

Task.async_stream returns a lazy stream - it won’t be executed unless you consume it. This can be done with Stream.run if you don’t care for results or any of the functions working on enumerables in an eager way if you do care about return values (most notably from the Enum module).

1 Like

I do care about results, so I am not using Enum, in-fact, I will be broadcasting results to the client, to implement a progress bar of achievement of the task, as in the top of this thread.

If I run the stream as Stream.run will it run all the chunks until the stream is all streamed? or shall I run it for each chunk?
When will the temp streamed uploaded file get deleted? after finishing the stream?

1 Like

If I run the stream as Stream.run will it run all the chunks until the stream is all streamed?

All chunks. But try it. :slight_smile:

When will the temp streamed uploaded file get deleted?

Uploaded files are saved to a temporary directory and deleted afterwards, yes.

1 Like

Thanks, all chunks got streamed now, but I could not parse them, here is what I got as a sample chunk from |> Stream.chunk(10)

1AntonioMartinezfemeise@bi.gov3484293762RobertButleramfob@vaim.io7388480513GordonSimonnak@jodcon.org8829569684TylerFlowersid@do.net5789928675LandonRossriojipof@fuju.gov7791899366EthanChavezozeketja@vucocabo.gov7585680127AmyHendersoneruraep@emu.io9750233448PaulWilliamsabmahu@bi.edu7778078969TomHerreraosaik@alaigocuc.com45881961610HelenCurrymuw@pi.io598349524

While NimbleCSV will generate an array for each line in the CSV file, |> Stream.chunk(10) has concatenated the arrays as above, making it impossible to get the data using pattern matching.

So, what’s the advice here?

1 Like

That’s what Stream.chunk(10) does, it builds chunks up to 10 elements. Therefore inside Task.async_stream, you will have list of 10 elements, then you need to use Enum.each or Enum.map and friends to traverse them, process them, and insert them into the database in chunks.

2 Likes

Thanks, I’ve finally implemented it!, but I have to admit, that I had to change my mindset to do it :slight_smile:
Here is a sample chunk:

[%{first: “Antonio”, seq: “1”}, %{first: “Robert”, seq: “2”},
%{first: “Gordon”, seq: “3”}, %{first: “Tyler”, seq: “4”},
%{first: “Landon”, seq: “5”}, %{first: “Ethan”, seq: “6”},
%{first: “Amy”, seq: “7”}, %{first: “Paul”, seq: “8”},
%{first: “Tom”, seq: “9”}, %{first: “Helen”, seq: “10”}]

As I have used Stream.map in the pipeline:

File.stream!(path)
    |> MyParser.parse_stream
    |> Stream.map(fn n ->
      case n do
        [seq,first,last,email,digit] -> %{seq: seq, first: first}
        _ -> ""
      end
      end)
    |> Stream.chunk(10)
    |> Task.async_stream(&MyApp.CsvsController.chunk_handler_fn/1)
    |> Stream.run

Thanks all for support :love_letter:

4 Likes

I did a screencast covering this here https://www.youtube.com/watch?v=YQyKRXCtq4s

4 Likes