Problem with parallel inserts with ecto via async tasks

Hi elixir community,

I’m currently working on a data migration with elixir and ecto. It simply takes lines from the legacy db and puts them - with new mapped fields - into the new postgres db.

So far this is working fine as long as I work sequentially: I take a batch of lines, map them to the new fields and insert them line by line in the batch. I do that for every batch until I got through the whole table.

Now I tried to run those batches (each with 1000 lines) parallel with
Task.async_stream(0..last_batch, Migration, :migrate_batch, [], max_concurrency: 10)

migrate_batch/1 reads 1000 lines starting at the the nth-thousand line and inserts them line by line into the db.

After some time, this leads to the following error:

[error] Postgrex.Protocol (#PID<0.232.0>) disconnected: ** (DBConnection.ConnectionError) client #PID<0.242.0> exited: killed
[error] Postgrex.Protocol (#PID<0.236.0>) disconnected: ** (DBConnection.ConnectionError) client #PID<0.243.0> exited: killed
[error] Postgrex.Protocol (#PID<0.235.0>) disconnected: ** (DBConnection.ConnectionError) client #PID<0.244.0> exited: killed
** (exit) exited in: Task.Supervised.stream(5000)
** (EXIT) time out
(elixir) lib/task/supervised.ex:209: Task.Supervised.stream_reduce/10
(elixir) lib/enum.ex:1776: Enum.each/2

I’m not 100% sure what’s happening… Am I just not able to perform parallel inserts into postgres? I tried to find a solution or cause but no luck so far.

When I run the code with max_concurrency: 1 everything is working so it must be a problem with the parallel processing.

Hope you guys can help me out :slight_smile:

looks like you are over-saturating your DB pool, which leads to timeouts…

what is DB pool size? if it’s low like 10 then that is the culprit…

else you need to post some of the code, to see where it “over-parallelizes”, for postgres I would try to stay in the 10-20 DB connections for batch work… but you could benchmark what the optimal is…

Currently the poolsize is 10. I thought that this would be enought when I’m working with 3 concurrent tasks, but even with 3 it already fails.

I call it like that:

# The code that calls the parallel execution
@max_cuncurrency = 3
0..last_batch
    |> Task.async_stream(Migration, :migrate_batch, [], max_concurrency: @max_cuncurrency)
    |> Enum.each(fn x -> IO.puts "Batch #{x} done")

# The important parts of the migration Module
defmodule Migration
   def migrate_batch(batch) do
      batch
      # Thought it would be no much to show this function in detail.
      # It just loads the legacy elements into a list. But if you like, I can add it.
      |> get_batch_from_legacy()
      |> Enum.each(&migrate_element/1)
   end

  defp migrate_element(element) do
     changeset = Migration.changeset(%Migration{}, map_legacy_fields(element))
     media = Migration.Repo.insert!(changeset)
  end
end

Hope that helps. I’ll try to raise the pool-size a bit for testing… maybe that helps.

####//EDIT
I raised the pool_size to 100 but I’m still getting the same message. Not even later then before as far as I can tell…

For note, PostgreSQL has FDW that let you access another database from inside PostgreSQL, if you do that to access the other database then you can just write a simple SQL statement to select from the old, then insert while transforming into the new. Would be significantly faster than probably any other way.

1 Like

What happens if you set timeout to 30 seconds via timeout: 30_000? And then compare the times with max_concurrency: 1 and max_concurrency: 3. If max_concurrency is 3 is as slow as with max_concurrency 1, then it means we likely have contention issues. You can also see the “queue” time in the logged queries.

1 Like

Also if you have any indexes on the new table it would help to remove them for now. You can re-add them after you’ve finished the import.

1 Like

Hey guys,

thank you for all your hints on possible causes for my problem and how I can optimize the migration even more. Sometimes you need a fresh mind to see the obvious solution. When I looked at the problem this morning I thought

What happens by the way when a task is looping. Does Task.async_stream get stuck or does it cancel the process?

With that question in my mind I saw those lines in the error output and was wondering where the 5000 comes from and thought this might be the answer to my problem AND my question above:

** (exit) exited in: Task.Supervised.stream(5000)
** (EXIT) time out

The problem was not a database timeout but the timeout of the Task Scheduler which defaults to 5000.

So I raised the task schedulers timeout to 30_000 and now the parallelization works fine. Here’s the new code:

@max_cuncurrency = 3
0..last_batch
    |> Task.async_stream(Migration, :migrate_batch, [],
            [max_concurrency: @max_cuncurrency, timeout: 30_000]) # Added the timeout here
    |> Enum.each(fn x -> IO.puts "Batch #{x} done")

Nevertheless thank you guys. There are definitely some good points I will look into to see if my parallelization actually is bringing me some advantages and to see if I can speed up the migration even more.

You’ll most likely read me again soon :smiley:
Thanks again. For now I can move on :slight_smile:

5 Likes