How to to insert multiple records asynchronously inside a transaction with Ecto

Hello, again folks!

I’ve been challenged by a problem this week and I’m not quite sure how to proceed to solve it. Also, I’ve spent a good amount of time today researching to better understand the said problem, however, I’m still kinda stuck with it. This is my drama…

I’m importing/ creating data using a CSV file, and I’m validating each line to see if the values are correctly filled/ formated and overall valid. Each row in this file has n columns, and they are not always part of the same schema (some of those columns are associations that should be created).

For example:

%Post{} that I want to create for each row:

schema "posts" do
  field :title, :string
  field :description, :string
  has_many :post_tags, PostTag

%Tag{} that exists beforehand:

schema "tags" do
  field :name, string
  # Let's say that we have to validate that the user 
  # has chosen from a value within a valid range
  field :max_possible_score, :integer

%PostTag{} relationship that will be inserted with the post:

schema "post_tags" do
  belongs_to :post, Post
  belongs_to :tag, Tag
  field :rating, :integer

And the file would look like this:

Title Description Taste Looks Popularity
how to cook lobster just lobster 5 10 100
how to cook beans just beans 4 8 70

So far, so good. I have a draft of the import logic in place and I was about to make some performance tests to start refactoring the code. The goal is to be able to import something like 20k rows with 100 columns.

The first thing that comes to mind is to chunk the rows that are being streamed from the file and process them asynchronously. But, I have to process everything in a transaction because If there’s even one invalid row, the whole operation should be invalid and not processed at all.

So what I’m doing right now is:

  • Getting all the rows from the CSV and generating a bunch of changesets
  • Filtering those changesets to see if they’re valid or not
  • If there’s at least one invalid changeset we abort the operation

In the end, I have a list of valid changesets for the %Post struct and the structure that I’m inserting is something similar to this (as a changeset, of course):

  title: "how to cook lobster",
  description: "just lobster",
  post_tags: [
    %PostTag {
      tag_id: 1,
      rating: 5
    %PostTag {
      tag_id: 2,
      rating: 10
   %PostTag {
      tag_id: 1,
      post_id: nil,
      rating: 100

Because I need to persist a struct with its association I’m having to use insert instead of insert_all (to keep the references). Looking to improve the code, I’ve wrapped the changesets in a transaction like this:

Repo.transaction(fn repo ->
  |> Enum.chunk_every(10)
  |> chunk ->
     Task.async(fn -> 
        Enum.each(chunk, fn changeset ->

I took inspiration from this comment and I thought this would work fine, but I got a series of errors ranging from database timeouts/ disconnects and problems with the test sandbox - It seems that there’s a problem with the processes sharing the same connection or something like this which I can’t quite put my finger on.

Besides that, I was trying to benchmark the code with benchee, and I got really confused about what exactly was going on. If the problem was in my test case or on the async routine and so on…

I’ve read multiple old replies of @michalmuskala talking about this not being supported and there are some old comments about it here too. I’m not sure if this is still true to this day or if I’m missing something here.

Could someone give me a hint on how to overcome this?

First caveat from that post:

The parallel bulk inserts are not wrapped in a single transaction. You have to deal with individual chunk errors.

The transaction is going to force all of this to go through a single DB connection, so even if this worked it would still ultimately result in each SQL statement executing sequentially.

One option (I haven’t tried this at this scale) would be to build a giant Ecto.Multi with all the changesets and then pass that to Repo.transaction; that will check that all the changesets have valid set before doing any SQL.

Hi, @al2o3cr thanks for your reply. The implementation I linked has different constraints and no restriction on using insert_all. One of them is that they’re are trying to achieve the best performance possible using application code. I don’t know how you’ve interpreted it, but I read this “caveat” as a particular detail of that implementation and not something that wouldn’t be possible with Ecto overall (since this is not explicit).

I started to think about that once I read that Repo.transaction actually checks out the connection. So what you’re talking about makes complete sense.

Considering what you just pointed about the transaction running sequentially, I think that this could work. I gonna try it today and will come back to report my experience. It’s weird that there’s not an easier way to accomplish a bulk insert with associations though.

Update: I just noticed that for my use-case we already have all the changesets validated. So I’m not sure if Ecto.Multi would solve that problem.

Btw, I found that Ecto does something similar here with migrations: but I’m not sure if this applies to my use-case.
I’m having a lot of errors with the test sandbox, so I’m not quite sure how I would begin to test for this scenario.