Sorry for necro-posting. This is how I solved the issue combining Repo.stream
and Task.async_stream
with in my case.
Below is a self-contained script that will hang:
Mix.install([
{:ecto_sql, "~> 3.10"},
{:postgrex, ">= 0.0.0"}
])
Application.put_env(:myapp, Repo, database: "repo_stream_example", pool: Ecto.Adapters.SQL.Sandbox)
defmodule Repo do
use Ecto.Repo, adapter: Ecto.Adapters.Postgres, otp_app: :myapp
end
defmodule Main do
def my_fun do
import Ecto.Query
Repo.transaction(fn ->
from(n in fragment("SELECT generate_series(1, 3) AS n"), select: n.n)
|> Repo.stream()
|> Task.async_stream(fn number ->
# this hangs
Repo.query!("select $1 + 1", [number]).rows
end)
|> Enum.to_list()
end)
end
def main do
Repo.__adapter__().storage_down(Repo.config())
:ok = Repo.__adapter__().storage_up(Repo.config())
{:ok, _} = Repo.start_link([])
end
end
ExUnit.start()
defmodule MainTest do
use ExUnit.Case
setup do
:ok = Ecto.Adapters.SQL.Sandbox.checkout(Repo)
Ecto.Adapters.SQL.Sandbox.mode(Repo, {:shared, self()})
:ok
end
test "the insides of my_fun hang" do
assert {:ok, [ok: [[2]], ok: [[3]], ok: [[4]]]} = Main.my_fun()
end
end
Main.main()
ExUnit.run()
Make the following change:
diff --git a/hang-repro.exs b/hang-repro.exs
index 3be69f09da..1bf6f76929 100644
--- a/hang-repro.exs
+++ b/hang-repro.exs
@@ -17,7 +17,6 @@ defmodule Main do
from(n in fragment("SELECT generate_series(1, 3) AS n"), select: n.n)
|> Repo.stream()
|> Task.async_stream(fn number ->
- # this hangs
Repo.query!("select $1 + 1", [number]).rows
end)
|> Enum.to_list()
@@ -35,14 +34,8 @@ ExUnit.start()
defmodule MainTest do
use ExUnit.Case
-
- setup do
- :ok = Ecto.Adapters.SQL.Sandbox.checkout(Repo)
- Ecto.Adapters.SQL.Sandbox.mode(Repo, {:shared, self()})
- :ok
- end
- test "the insides of my_fun hang" do
+ test "the insides of my_fun don't hang anymore" do
assert {:ok, [ok: [[2]], ok: [[3]], ok: [[4]]]} = Main.my_fun()
end
end
This way a default value for Sandbox mode is used, which is :auto
. The script will now run to completion.
I think it’s important to be mention, however, that the code inside Task.async_stream
will not run inside the same transaction, and won’t be automatically rolled back at the end of the test. So any data inserted in the DB during the test will have to be cleaned up manually. But that’s OK in my case.