Is there are way to bulk insert join records of a many-to-many relationship with Ecto?

We have two schemas: Batch and Feed. They have a many to many relationship between them. When creating a Batch it will often be associated with thousands of feeds. We’re using this code to create the Batch and associate it with existing feeds:

feeds = Feed |> where([f], f.feed_id in ^feed_ids) |> Repo.all()

changeset(batch_id_prefix, window)
|> put_assoc(:feeds, feeds)

We’re also doing all of this in Redshift using the redshift_ecto adapter.

Our issue is that the join records between the batch and the feeds are inserted into the database one by one. This takes a long time (~100ms per record) because Redshift isn’t optimised for single inserts. I assume Ecto does this to allow granular handling of primary key conflicts. Redshift doesn’t enforce database constraints including uniqueness of primary keys so we’re not getting any benefit from inserting these join records one by one.

Is there a way to tell Ecto to do a bulk insert here? If there is, I couldn’t find it by looking at the docs and code. Also, do you think this would be something useful to add to Ecto, or should we just handle the creation of the relationship manually with Repo.insert_all?

In situations like this I try to use a transaction with Ecto.Multi.new. Once the transaction is created, the first step would be to insert the parent record, and the second step would be to create the child records using Repo.insert_all.

This makes the transaction really fast, since there would be only one insert for the parent record and another bulk insert for the child records. You have to be really careful with the child records since Repo.insert_all doesn’t insert timestamps or perform any validation. Having the transaction is really helpful here, since if something fails, everything will be rolled back.

One important disclaimer about Repo.insert_all is that if you pass lots of data you may pass the parameter limit for the database (it happened to me on PostgreSQL). The solution I went through was something like:

  1. Build the list of parameters for the child records
  2. Use Enum.chunk_every to split the child records in chunks of the desired batch size. For example 1000 records.
  3. Iterate over the chunks and call Repo.insert_all for each one of them. Be sure to pattern match the result to ensure that the transaction is rolled back if something fails.
3 Likes

doesn’t insert timestamps or perform any validation”. :slight_smile:

2 Likes

Oops, thank you for catching the typo! :raised_hands:

1 Like

Thanks, @belaustegui, I ended up doing just that. I like that with Ecto.Multi I’m not even loosing much on the expressiveness of the code:

  {:ok, %{batch_with_feeds: batch_with_feeds}} =
    Multi.new()
    |> Multi.insert(:batch, changeset)
    |> Multi.insert_all(:batch_feeds, feeds_join_through_table(), join_records)
    |> Multi.run(:batch_with_feeds, fn %{batch: batch} -> {:ok, %{batch | feeds: feeds}} end)
    |> Repo.transaction()

Does the problem can be solved with GenStage too?