Problem with testing bulk inserts with Task.async_stream

I have code to do bulk insert which looks like this:

stream = build_attrs_stream()

Repo.transaction(fn ->
  stream
  |> Stream.chunk_every(5000)
  |> Task.async_stream(fn batch ->
    {n, _} = Repo.insert_all(Assignment, batch)
    n
  end, ordered: false)
  |> Enum.reduce(0, fn {:ok, n}, acc -> acc + n end)
end)

The above code works fine until I run the test, which gives me error:

(EXIT from #PID<0.544.0>) an exception was raised:
         ** (DBConnection.ConnectionError) connection not available and request was dropped from queue after 939ms. You can configure how long requests wait in the queue using :queue_target and :queue_interval. See DBConnection.start_link/2 for more information

I’m using Phoenix v1.4 and Ecto v3.1, database is Postgres.
Any help?

:wave:

What exactly does using Task.async_stream in a repo transaction achieve? I think Repo.transaction checks out a single connecting and all messages are sent over a over it (i.e. “single thread”) anyway.

That is, it’s not obvious to me what your approach has over

stream = build_attrs_stream()

Repo.transaction(fn ->
  stream
  |> Stream.chunk_every(5000)
  |> Enum.flat_map(fn batch ->
    {n, _} = Repo.insert_all(Assignment, batch)
    n
  end)
end)

Well, I actually wrote code similar as yours in my first implementation(let’s call it the “single thread” version):

Repo.transaction(fn ->
      stream
      |> Stream.chunk_every(5000)
      |> Enum.reduce(0, fn batch, total ->
        {n, _} = Repo.insert_all(Assignment, batch)
        total + n
      end)
    end)

I learned the “parallel” version(code I paste in my original topic) from the book Programming Ecto, Chapter 17, Section “Fetching Large Datasets with Streams”. Here’s the example in the book:

stream =
Artist
|> Repo.stream()
|> Task.async_stream(fn artist ->
    save_artist_record(artist)
end)

Repo.transaction(fn ->
   Stream.run(stream)
end)

I benchmarked these two versions by inserting large amount of data over 20 times. And the “parallel” version is nearly 40% faster and uses all of my CPU cores, while the “single thread” version uses only one CPU core at a time.

Ah, it wasn’t obvious from your first snippet that you also read from the database, in that case, yes, the concurrent version can be more efficient.

Yeah, the stream was build from Repo.stream. Guess I didn’t give enough details.
The code “stream = build_attrs_stream()” is doing something like:

some_query
|> Repo.stream()
|> Stream.map(&build_attrs/1)

Hey @ug0 this is because of how the ecto sandbox works during tests. Each test process is provided its own database connection. The Task.async_stream call spawns new processes, and ecto doesn’t know which connection to use. You can solve this by giving Ecto a little more information:

stream = build_attrs_stream()

parent = self()

Repo.transaction(fn ->
  stream
  |> Stream.chunk_every(5000)
  |> Task.async_stream(fn batch ->
    {n, _} = Repo.insert_all(Assignment, batch, caller: parent)
    n
  end, ordered: false)
  |> Enum.reduce(0, fn {:ok, n}, acc -> acc + n end)
end)

The change here is the addition of caller: parent as options to the Repo call.

1 Like

I add caller: parent option but it still gives me the same error.

Sorry for necro-posting. This is how I solved the issue combining Repo.stream and Task.async_stream with in my case.

Below is a self-contained script that will hang:

Mix.install([
  {:ecto_sql, "~> 3.10"},
  {:postgrex, ">= 0.0.0"}
])

Application.put_env(:myapp, Repo, database: "repo_stream_example", pool: Ecto.Adapters.SQL.Sandbox)

defmodule Repo do
  use Ecto.Repo, adapter: Ecto.Adapters.Postgres, otp_app: :myapp
end

defmodule Main do
  def my_fun do
    import Ecto.Query

    Repo.transaction(fn ->
      from(n in fragment("SELECT generate_series(1, 3) AS n"), select: n.n)
      |> Repo.stream()
      |> Task.async_stream(fn number ->
        # this hangs
        Repo.query!("select $1 + 1", [number]).rows
      end)
      |> Enum.to_list()
    end)
  end

  def main do
    Repo.__adapter__().storage_down(Repo.config())
    :ok = Repo.__adapter__().storage_up(Repo.config())
    {:ok, _} = Repo.start_link([])
  end
end

ExUnit.start()

defmodule MainTest do
  use ExUnit.Case

  setup do
    :ok = Ecto.Adapters.SQL.Sandbox.checkout(Repo)
    Ecto.Adapters.SQL.Sandbox.mode(Repo, {:shared, self()})
    :ok
  end
  
  test "the insides of my_fun hang" do
    assert {:ok, [ok: [[2]], ok: [[3]], ok: [[4]]]} = Main.my_fun()
  end
end

Main.main()
ExUnit.run()

Make the following change:

diff --git a/hang-repro.exs b/hang-repro.exs
index 3be69f09da..1bf6f76929 100644
--- a/hang-repro.exs
+++ b/hang-repro.exs
@@ -17,7 +17,6 @@ defmodule Main do
       from(n in fragment("SELECT generate_series(1, 3) AS n"), select: n.n)
       |> Repo.stream()
       |> Task.async_stream(fn number ->
-        # this hangs
         Repo.query!("select $1 + 1", [number]).rows
       end)
       |> Enum.to_list()
@@ -35,14 +34,8 @@ ExUnit.start()

 defmodule MainTest do
   use ExUnit.Case
-
-  setup do
-    :ok = Ecto.Adapters.SQL.Sandbox.checkout(Repo)
-    Ecto.Adapters.SQL.Sandbox.mode(Repo, {:shared, self()})
-    :ok
-  end

-  test "the insides of my_fun hang" do
+  test "the insides of my_fun don't hang anymore" do
     assert {:ok, [ok: [[2]], ok: [[3]], ok: [[4]]]} = Main.my_fun()
   end
 end

This way a default value for Sandbox mode is used, which is :auto. The script will now run to completion.

I think it’s important to be mention, however, that the code inside Task.async_stream will not run inside the same transaction, and won’t be automatically rolled back at the end of the test. So any data inserted in the DB during the test will have to be cleaned up manually. But that’s OK in my case.