Multiple Ecto transactions give timeout

I’m in a situation where I have to insert 9000 registries in bulk into the database with Ecto in PostgreSQL.

I already have a dedicated function for inserting an array of elements BulkOperations.bulk_create(MyStruct, list). It’s basically an Ecto.Multi, so everything in the list must be inserted because it’s a transaction.

No problem with that. If I insert a list of 10 elements everything’s OK. But if the list has 9000 registries we have a problem, because I receive a timeout when the connection stays open for more than 15000 ms doing the operations. I mean, 9000 registries :exploding_head:, it takes a lot.

Now, I decided to separate everything by chunks. Like this:

        total_rows_affected =
          list
          |> Enum.chunk_every(20)
          |> Enum.map(&BulkOperations.bulk_create(MyStruct, &1))
          |> Enum.reduce(0, fn %{rows_affected: rows_affected}, acc ->
            rows_affected + acc
          end)

        %{errors: [], rows_affected: total_rows_affected}

It works for for 9000 registries.

But I feel it’s not secure because if I insert a chunk of elements with &BulkOperations.bulk_create(MyStruct, &1) and that operation is not succesful (because of a database key conflict maybe) then I have a problem because some operations will fail and some others don’t.

My solution was to create a transaction, so everything has to be successful. Like this:

    {:ok, result} =
      Repo.transaction(fn ->
        total_rows_affected =
          list
          |> Enum.chunk_every(10)
          |> Enum.map(&BulkOperations.bulk_create(MyStruct, &1))
          |> Enum.reduce(0, fn %{rows_affected: rows_affected}, acc ->
            rows_affected + acc
          end)

        %{errors: [], rows_affected: total_rows_affected}
      end)

    result

And it works but I receive a timeout when I try to insert 9000 registries because I believe everything is using the same connection opened by Repo.transaction.

So, my question is, how can I insert all of this 9000 registries but making sure that everything is inserted?

Have you tried to pass timeout: :infinity option to Repo.insert_all/3 or Repo.transaction/2?

2 Likes

No, I haven’t. Because I’m worried that if I do that I could overload the server’s memory were the database is hosted. Because it’s too much to insert. Too much work

Are you using insert_all? If you’re not then you could get a speed up that way.

2 Likes

Why don’t you try before it so you know for sure?

I think Jason’s proposal is the best option, btw. honestly, I don’t know of any database that can span a transaction across multiple connections, but I can be wrong.

1 Like

Actually, no. I’m not using an insert_all function. It’s a Ecto.Multi.insert one when calling BulkOperations.bulk_create/2

This is what I did following your advices

{:ok, result} =
    Repo.transaction(fn ->
      total_rows_affected =
        list
        |> Enum.chunk_every(10)
        |> Enum.map(&BulkOperations.bulk_create(MyStruct, &1))
        |> Enum.reduce(0, fn %{rows_affected: rows_affected}, acc ->
          rows_affected + acc
        end)

      %{errors: [], rows_affected: total_rows_affected}
    end, [{:timeout, :infinity}])   # <============= this is what I added

  result

But I received :

"2021-12-13 19:29:43.254 UTC [1299724]: [1-1] db=my_db,user=postgres LOG:  unexpected EOF on client connection with an open transaction"

I don’t know why the client(Phoenix) is closing the connection because I set this to :infinity

I also did it this way: timeout: :infinity and Phoenix is closing the connection after some time

What does bulk_create do exactly?

1 Like

@dimitarvp it was inserting one by one. I changed it with the help of a friend and it worked. insert_all does the job as @axelson was saying. Now it looks like this:

def bulk_create(entity, items) do
    Ecto.Multi.new()
    |> Ecto.Multi.insert_all(:insert_all, entity, items, on_conflict: :nothing)
    |> Repo.transaction()
    |> case do
      {:ok, _} ->
        %{rows_affected: length(items), errors: []}

      {
        :error,
        _,
        %Ecto.Changeset{
          changes: changes,
          errors: errors
        },
        _
      } ->
        %{
          rows_affected: 0,
          errors: "Changes #{changes} couldn't be created because #{inspect(errors)}"
        }
    end
  end

But the problem is that I had to take care of timestamps because insert_all doesn’t insert them. So a migration to set a default value of now() solves the issue.

So, issue solved but I still won’t know why it was closing the connection even when I set the timeout to :infinity with the previous function. BTW, now with the insert_all solution I can stick to the default timeout of 15_000ms

If memory serves, the second element in the :ok tuple should already contain the number of items affected.

1 Like

Yes, but as it’s a transaction, all of the initial items in the list should be affected. That’s why I used items param with length(items). Makes sense to you now? Thanks for noticing that, BTW

The “unexpected EOF” error is what I’d expect if the server decided to hang up the connection.

1 Like