Ecto Parallel inserts

Hi, I’m new here.

I was creating a module to import data from file to db and I was wondering how can I do concurrent ecto inserts.

My solution is this using Flow:

    file_collection
    |> Enum.flat_map(&get_structs_to_insert/1)
    |> Enum.chunk_every(8000)
    |> Flow.from_enumerable()
    |> Flow.map(fn e -> Repo.insert_all(Table, e) end)
    |> Flow.run()

But I don’t know if it’s wrong or not to use Flow in this case. Can anyone help me?

welcome to the elixir forum,

you have to keep in mind that using this approach, you may loose consistency, because it will be very likely that there will be used different database connections => different transactions, so you have to take care by yourself, if something goes wrong

First thanks for replying.

I know I will lose consistency but I already checked the data using changesets. Is this the right approach? Is it faster than using pure enums like this:

    file_collection
    |> Enum.flat_map(&get_structs_to_insert/1)
    |> Enum.chunk_every(8000)
    |> Enum.each(fn e -> Repo.insert_all(Table, e) end)

Also I saw another solution using Task.async but it doesn’t work for me because it throws a connection error:

    file_collection
    |> Enum.flat_map(&get_structs_to_insert/1)
    |> Enum.chunk_every(8000)
    |> Enum.each(fn e ->
      Task.async(fn -> Repo.insert_all(Table, e) end)
    end)

Hi @samoorai and welcome!

For simple problems like this you can typically do without Flow by using Task.async_stream:

file_collection
|> Stream.flat_map(&get_structs_to_insert/1)
|> Stream.chunk_every(8000)
|> Task.async_stream(fn e -> Repo.insert_all(Table, e) end)
|> Stream.run()

This achieves the same effect as your example using Flow, but without the extra library.

One thing to keep in mind that all your processes are inserting data concurrently into the database but they are all using the same database connection pool. The default pool size for Ecto is 10 (Ecto.Repo — Ecto v3.7.1), meaning that you won’t see any improvements beyond 10 concurrent processes (that is, unless you increase the pool size), because at least one process will be blocked waiting for a connection to become available.

Also, I would probably try to put as much work as possible into the parallel part of the pipeline. Something like:

file_collection
|> Task.async_stream(&parse_and_store_file/1)
|> Stream.run()

def parse_and_store_file(file) do
  file
  |> Stream.map(&get_structs_to_insert/1)
  |> Stream.chunk_every(8000)
  |> Task.async_stream(fn e -> Repo.insert_all(Table, e) end)
  |> Stream.run()
end

Basically, trying to run in parallel whatever can be parallelized.

I hope this helps :slight_smile:

4 Likes

Thank you @trisolaran for replying.

I parallelized all the operation as you wrote and it increased the performance but the solution with Task.async_stream gives me a connection error to the database and I don’t know how to fix it.
Also the solution using Flow doesn’t increase the performance, maybe I’m using it wrong

I would suggest that, as already mentioned by @trisolaran, you run out of available connection, when using the Task.async approach, Flow (and GenStage) is exactly the correct tool for that, it might be also necessary to limit flow stages (default is the amount of active cores in the system)

Make sure your DB pool size is bigger than the concurrent tasks that Task.async_stream is starting.

Example: I have a 10-core / 20-thread CPU. If I leave the default DB pool size of 10 and I use Task.async_stream without modifications, it means that my code will spawn 20 tasks. And then 20 tasks will fight over 10 DB connections. If you also leave the default timeouts then it’s super likely some of the tasks will start erroring with timeouts.

You have a few ways to attack the problem:

  • Take note of the number of CPUs (or threads on most machines, which are usually 2x the CPU core count) and put a DB pool size number that’s bigger. E.g. if your machine (or the production server) has 4 cores / 8 threads then make sure that the DB pool is at least 10, if not 20.
  • Increase the timeout values for the Ecto.Repo. If ingesting the data is not real-time critical, put some generous timeouts like 15 or even 20 seconds (IMO the default is 5 seconds which when inserting 8000 records is not guaranteed to finish on time, depending on the machine).
  • Increase the queue_size parameter of Ecto.Repo so if a task has to wait on a connection, it should wait a bit more before crashing and saying “gave up waiting on a connection after X milliseconds)”. Not sure if I remember correctly but IMO the default is 100ms which is way too small a value. In all of my projects that ingest data I set this value to 5 seconds.
  • Finally, you can also pass timeout: :infinity to your Task.async_stream call. I don’t recommend this – I prefer to just give it a generous value like 60 or 120 seconds but hey, measure and choose what feels right for you.
2 Likes

I still believe Task.async_stream is the right tool for the job here. You can control the maximum number of parallel processes using the :max_concurrency option. I believe Flow is necessary only when you need a Flow.partition step with data sent to each stage using a deterministic schema (for example, when counting words in a document you want the same word to be always sent to the same stage).

1 Like

Thank you, using timeout: :infinity fixed the connection problem, now the operation takes half the time to run, incredible.

Here is the benchmark:

Name              ips        average  deviation         median         99th %
conc           0.0908        11.01 s     ±0.00%        11.01 s        11.01 s
no_conc        0.0441        22.69 s     ±0.00%        22.69 s        22.69 s

Comparison: 
conc           0.0908
no_conc        0.0441 - 2.06x slower +11.68 s
2 Likes

If someone want to see the code here it is:

def import_all_earnings_files() do
 Path.wildcard("files_dir/*")
    |> Task.async_stream(&get_structs_to_insert/1)
    |> Stream.flat_map(fn {:ok, e} -> e end)
    |> Stream.chunk_every(8000)
    |> Task.async_stream(
      fn e ->
       Repo.insert_all(Table, e)
      end,
      timeout: :infinity
    )
    |> Stream.run()
end
2 Likes

Oh, you can achieve better than 2x, I’m sure.

Try reducing the chunk to 500?

You are right, I tried with different chunk_sizes and here are my result:

Name           ips        average  deviation         median         99th %
400          0.183         5.45 s     ±0.00%         5.45 s         5.45 s
500          0.169         5.93 s     ±0.00%         5.93 s         5.93 s
300          0.167         6.00 s     ±0.00%         6.00 s         6.00 s
200          0.127         7.85 s     ±0.00%         7.85 s         7.85 s
100         0.0653        15.31 s     ±0.00%        15.31 s        15.31 s

Comparison: 
400          0.183
500          0.169 - 1.09x slower +0.48 s
300          0.167 - 1.10x slower +0.55 s
200          0.127 - 1.44x slower +2.40 s
100         0.0653 - 2.81x slower +9.86 s

It seems that 400 is the optimal size

2 Likes

Depending on CPU, anywhere from 300 to 1000 is good. How much CPU cores / threads do you have?

Okay, so 4x improvement from the first version. Cool.

Next you can look into how do you produce the structs in the first steps. But you already made a lot of progress. Good job.