Problem with testing bulk inserts with Task.async_stream

I have code to do bulk insert which looks like this:

stream = build_attrs_stream()

Repo.transaction(fn ->
  stream
  |> Stream.chunk_every(5000)
  |> Task.async_stream(fn batch ->
    {n, _} = Repo.insert_all(Assignment, batch)
    n
  end, ordered: false)
  |> Enum.reduce(0, fn {:ok, n}, acc -> acc + n end)
end)

The above code works fine until I run the test, which gives me error:

(EXIT from #PID<0.544.0>) an exception was raised:
         ** (DBConnection.ConnectionError) connection not available and request was dropped from queue after 939ms. You can configure how long requests wait in the queue using :queue_target and :queue_interval. See DBConnection.start_link/2 for more information

I’m using Phoenix v1.4 and Ecto v3.1, database is Postgres.
Any help?

:wave:

What exactly does using Task.async_stream in a repo transaction achieve? I think Repo.transaction checks out a single connecting and all messages are sent over a over it (i.e. “single thread”) anyway.

That is, it’s not obvious to me what your approach has over

stream = build_attrs_stream()

Repo.transaction(fn ->
  stream
  |> Stream.chunk_every(5000)
  |> Enum.flat_map(fn batch ->
    {n, _} = Repo.insert_all(Assignment, batch)
    n
  end)
end)

Well, I actually wrote code similar as yours in my first implementation(let’s call it the “single thread” version):

Repo.transaction(fn ->
      stream
      |> Stream.chunk_every(5000)
      |> Enum.reduce(0, fn batch, total ->
        {n, _} = Repo.insert_all(Assignment, batch)
        total + n
      end)
    end)

I learned the “parallel” version(code I paste in my original topic) from the book Programming Ecto, Chapter 17, Section “Fetching Large Datasets with Streams”. Here’s the example in the book:

stream =
Artist
|> Repo.stream()
|> Task.async_stream(fn artist ->
    save_artist_record(artist)
end)

Repo.transaction(fn ->
   Stream.run(stream)
end)

I benchmarked these two versions by inserting large amount of data over 20 times. And the “parallel” version is nearly 40% faster and uses all of my CPU cores, while the “single thread” version uses only one CPU core at a time.

Ah, it wasn’t obvious from your first snippet that you also read from the database, in that case, yes, the concurrent version can be more efficient.

Yeah, the stream was build from Repo.stream. Guess I didn’t give enough details.
The code “stream = build_attrs_stream()” is doing something like:

some_query
|> Repo.stream()
|> Stream.map(&build_attrs/1)

Hey @ug0 this is because of how the ecto sandbox works during tests. Each test process is provided its own database connection. The Task.async_stream call spawns new processes, and ecto doesn’t know which connection to use. You can solve this by giving Ecto a little more information:

stream = build_attrs_stream()

parent = self()

Repo.transaction(fn ->
  stream
  |> Stream.chunk_every(5000)
  |> Task.async_stream(fn batch ->
    {n, _} = Repo.insert_all(Assignment, batch, caller: parent)
    n
  end, ordered: false)
  |> Enum.reduce(0, fn {:ok, n}, acc -> acc + n end)
end)

The change here is the addition of caller: parent as options to the Repo call.

1 Like

I add caller: parent option but it still gives me the same error.