sabri

sabri

Inserting CSV into postgres

Hello,

I have found lovely CSV library that I will be using to insert CSV files into postgres.

But there are some issues I am wondering about, as I am fresh in elixir.

Ok, here is the sample code I will be using to insert the CSV into DB:

File.stream!("ignore/customers.csv")
|> CSV.decode
|> Enum.each(fn
        {:ok, [id, nm, csr, sal]} ->
            Customer.changeset(%Customer{},
                %{masterid: id,
                  custname: nm,
                  csrid: String.to_integer(csr),
                  salesid: String.to_integer(sal)})
            |> Repo.insert
         {:error, message} ->
            # Whatever you want to do with invalid rows
    end)

My questions are:

  1. In the Enum.each what is the time interval between each call to the Repo.insert? can I control this to make sure that my DB won’t get over-pumped with queries?

  2. As I need to implement a progress bar in the browser while the CSV being inserted into DB, Can I broadcast a progress message in successful Enum.each to client using channels? for ex: after |> Repo.insert to broadcast message as:

MyApp.Endpoint.broadcast client_topic, “progress”, %{
progress: progress_number
}

Most Liked Responses

hq1

hq1

I recently faced a similar task, so hopefully some of my experiences will come in handy.

COPY was my first thought. It’s the most efficient way to import CSV data to postgres, period.
It’s also the least flexible way from an Elixir app perspective. Building a custom query, handling separators, error handling/reporting, testing, finally “all or nothing” semantics (it’s a single transaction).

If you’re OK with the above, COPY is for you.

If you’re troubled with performance and still don’t want to completely sacrifice flexibility, consider using nimble_csv. Thanks to super clever implementation (metaprogramming and binary matching), sequential parsing is way faster than the library you’ve mentioned, that attempts parsing in parrallel (IIRC 5M rows in 20 seconds vs 2 minutes according to my microbenchmarks).

nibmble_csv works with Streams too, so you’ll be fine when it comes to memory spikes.

If you care about parallel processing later on, to make the DB insertion efficient (by utilizing the connection pool and bulk inserts), here’s what you can do to make it reasonably fast:

stream # can be a file, but you can use IO.stream for testing
|> YourNimbleParser.parse_stream(headers: discard_headers?) # nimble_csv discards it by default
|> Stream.chunk(1000, 1000, []) # experiment, fine tune
|> process_in_parallel(&chunk_handler_fn/1)

process_in_parallel implementation is entirely up to you. If you’re on Elixir 1.4, you may use async_stream; if lower than that, parallel_stream looks like an OK choice. Just make sure the number of parallel processes is somewhat in line with System.schedulers_online and your database pool size. Make it configurable, measure, rinse and repeat. How scheduling works.

Your chunk_handler_fn/1 will receive.. a chunk of 1000 decoded rows. You may prepare changesets there, have them validated, filter the chunk based on valid? property, remap the columns according to a custom mapping rule, build a list of maps to be inserted (changeset.changes is already there for you, perhaps needs to be enriched a little) and push it through Repo.insert_all in each individual process.

Caveats:

  • The parallel bulk inserts are not wrapped in a single transaction. You have to deal with individual chunk errors. Use on_conflict option if the need be to perform an upsert/ignore constraint errors.
  • You don’t really deal with individual rows. If you’d like to report progress you have to figure out the number of chunks upfront and emit a notification every time a batch was inserted/processed. A separate short-living process gathering the stats and forwarding progress could be a good idea.
  • It’s quite fast. In my case I was able to parse, validate/reject and insert 1M rows/minute on average. That was local, no latency penalty etc.
  • If you’re looking to report errors for individual rows and match them with CSV file line number, look up Stream.with_index. Note, that you will have to calculate the offset based on the header presence (the very first row gets either skipped or included).
  • nimble_csv will brutally crash on bad rows, e.g. discontinued quote. You might want to rescue from (catch) NimbleCSV.ParseError and convert it to something useful ({:error, {:parse_error, reason}} tuple most likely).

Hope this helps.

Cheers

edit added a note about NimbleCSV.ParseError handling, fixed grammer

10
Post #6
sabri

sabri

Thanks, I’ve finally implemented it!, but I have to admit, that I had to change my mindset to do it :slight_smile:
Here is a sample chunk:

[%{first: “Antonio”, seq: “1”}, %{first: “Robert”, seq: “2”},
%{first: “Gordon”, seq: “3”}, %{first: “Tyler”, seq: “4”},
%{first: “Landon”, seq: “5”}, %{first: “Ethan”, seq: “6”},
%{first: “Amy”, seq: “7”}, %{first: “Paul”, seq: “8”},
%{first: “Tom”, seq: “9”}, %{first: “Helen”, seq: “10”}]

As I have used Stream.map in the pipeline:

File.stream!(path)
    |> MyParser.parse_stream
    |> Stream.map(fn n ->
      case n do
        [seq,first,last,email,digit] -> %{seq: seq, first: first}
        _ -> ""
      end
      end)
    |> Stream.chunk(10)
    |> Task.async_stream(&MyApp.CsvsController.chunk_handler_fn/1)
    |> Stream.run

Thanks all for support :love_letter:

minhajuddin

minhajuddin

I did a screencast covering this here https://www.youtube.com/watch?v=YQyKRXCtq4s

Where Next?

Popular in Questions Top

sen
Hi All, I set a environment variables in dev.exs , like below code. when i start server, how can i set the ${enable} value? thanks. d...
New
aadeshere1
I have a another noob question about loop. Since elixir is immutable, while loop is not directly possible. total = 10 while total != 0 ...
New
vertexbuffer
Hello, can anybody help here..? I have a list of players and I what to delete an element, but every for loop the list is reverting to ori...
New
Tee
can someone please explain to me how Enum.reduce works with maps
New
JeremM34
Hello, how can I check the Phoenix version ? Thanks !
New
Lily
In templates/appointment/index.html.eex: <%= for appointment <- @appointments do %> <tr> <td><%= appoi...
New
lucidguppy
I have a super simple question about elixir - how would I take a file like this foo bar baz and output a new file that enumerates th...
New
komlanvi
Hi everyone, I was playing with phoenix liveView but I run into an issue. I have a form and want to validate each input text when the te...
New
shijith.k
I am trying to start a new phoenix project with elixir 1.9, but mix phx.new does not work. It says that ** (Mix) The task "phx.new" could...
New
WestKeys
Currently suffering from paralysis by [HTTP client] analysis. This is rather unusual in Elixirland as there tends to be consensus on the ...
New

Other popular topics Top

skosch
To my knowledge, put_in, Map.update etc. all have the one limitation of not automatically creating intermediate keys when needed (for exa...
New
greenz1
I have a phoenix application from which a user can download multiple(5-6) files of size 1MB. I couldn’t find anything related to sending ...
New
electic
Hi, I am new to Elixir. I am trying to use the DateTime component to insert a date into MySQL however the there seems to be no way to fo...
New
hariharasudhan94
lets say i have a sample like a = 20; b = 10; if (a > b) do {:ok, "a"} end if (a < b) do {:ok, b} end if (a == b) do {:ok, "equa...
New
alice
Hey, Just curious what are the main benefits of Elixir compared to Clojure? When is Elixir more useful than Clojure and vice versa? Th...
New
saif
Hello everyone, Long time lurker first time poster here. I’ve recently begun working on Elixir full-time again! :raised_hands: It’s been...
New
klo
Got a question about when to concat vs. prepending items to list then reversing to achieve appending. So i know lists boil down to [1 | ...
New
joaquinalcerro
Hi there, I am working with Ecto-Postgresql and I need to call all of the records from a specific table but the table has 40,000 records...
New
WestKeys
Currently suffering from paralysis by [HTTP client] analysis. This is rather unusual in Elixirland as there tends to be consensus on the ...
New
vonH
In asking this question I am more interested about the expressiveness of the language itself and less concerned about the availability of...
New

We're in Beta

About us Mission Statement