I’m in a situation where I have to insert 9000 registries in bulk into the database with Ecto in PostgreSQL.
I already have a dedicated function for inserting an array of elements BulkOperations.bulk_create(MyStruct, list)
. It’s basically an Ecto.Multi
, so everything in the list must be inserted because it’s a transaction.
No problem with that. If I insert a list of 10 elements everything’s OK. But if the list has 9000 registries we have a problem, because I receive a timeout when the connection stays open for more than 15000 ms doing the operations. I mean, 9000 registries , it takes a lot.
Now, I decided to separate everything by chunks. Like this:
total_rows_affected =
list
|> Enum.chunk_every(20)
|> Enum.map(&BulkOperations.bulk_create(MyStruct, &1))
|> Enum.reduce(0, fn %{rows_affected: rows_affected}, acc ->
rows_affected + acc
end)
%{errors: [], rows_affected: total_rows_affected}
It works for for 9000 registries.
But I feel it’s not secure because if I insert a chunk of elements with &BulkOperations.bulk_create(MyStruct, &1)
and that operation is not succesful (because of a database key conflict maybe) then I have a problem because some operations will fail and some others don’t.
My solution was to create a transaction, so everything has to be successful. Like this:
{:ok, result} =
Repo.transaction(fn ->
total_rows_affected =
list
|> Enum.chunk_every(10)
|> Enum.map(&BulkOperations.bulk_create(MyStruct, &1))
|> Enum.reduce(0, fn %{rows_affected: rows_affected}, acc ->
rows_affected + acc
end)
%{errors: [], rows_affected: total_rows_affected}
end)
result
And it works but I receive a timeout when I try to insert 9000 registries because I believe everything is using the same connection opened by Repo.transaction
.
So, my question is, how can I insert all of this 9000 registries but making sure that everything is inserted?