problem asynchronizing ecto calls

I seem to be having an issue making some code asynchronous. I’m running

    stream = Task.Supervisor.async_stream_nolink(
      Module.TaskSupervisor,
      enum,
      fn (item) -> my_func(item, 2) end,
      [ordered: false, max_concurrency: 1]
    )
    results = Enum.to_list(stream)

which gives me 12:31:22.290 [error] Postgrex.Protocol (#PID<0.678.0>) disconnected: ** (DBConnection.ConnectionError) owner #PID<0.1768.0> exited while client #PID<0.1771.0> is still running with: shutdown

part of the code that’s run by my_func is an ecto call

IO.puts("before")
Repo.get(schema, id)
IO.puts("after")

and I’m seeing the "before" get printed right before the error message, but I’m not seeing the "after"
On the other hand if I change my code to run synchronously as

    stream = Enum.map(enum, fn (item) -> my_func(item, 2) end)
    results = Enum.to_list(stream)

then everything works fine. There’s no error message, and both print statements get run.

Any thoughts on what I could be doing wrong or how I can fix this?

1 Like

Are you running this code in tests with Ecto.Adapters.SQL.Sandbox?

I’m running from mix test with

config :project, Project.Repo,
  pool: Ecto.Adapters.SQL.Sandbox,

in my config/test.exs so I take it I am.

You can either use :shared mode with Ecto.Adapters.SQL.Sandbox and disable async tests for your test module or explicitly transfer the Ecto connection to the spawned task process. See https://hexdocs.pm/ecto_sql/Ecto.Adapters.SQL.Sandbox.html#module-collaborating-processes for more details.

It looks like the project is already doing that. The test file starts with

defmodule Project.Module do
    use Project.ModelCase

And in model_case.ex we’ve got

  setup tags do
    :ok = Ecto.Adapters.SQL.Sandbox.checkout(Project.Repo)

    unless tags[:async] do
      Ecto.Adapters.SQL.Sandbox.mode(Project.Repo, {:shared, self()})
    end

    :ok
  end

So since :async isn’t included in the use line I believe that means we’re using shared mode, so any process should be able to connect to the DB connection.

1 Like

I’m not sure how relevant it is, but while searching for Adapters.SQL.Sandbox in the project I discovered that it shows up in test/test_helpers.ex:

ExUnit.configure formatters: [JUnitFormatter, ExUnit.CLIFormatter]

ExUnit.start(exclude: [
  :skip,
])

Faker.start()

Ecto.Adapters.SQL.Sandbox.mode(Project.Repo, :manual)

Which seems to set the mode without setting :shared

Try spawning your tasks with Task.async_stream. If that works, then I think the problem is that in your original code task processes are started by a supervisor process and that’s why they don’t see the DB connection checked out by the test process. Performing a manual allowance in the task process should work.

There’s another option now: update your deps and it should just work - https://twitter.com/plataformatec/status/1091300824251285504

4 Likes

Thanks for the suggestions. I just tried with async_stream:

    parent = self()
    stream = Task.Supervisor.async_stream(
      Module.TaskSupervisor,
      enum,
      fn (item) -> my_func(item, 2) end,
      [ordered: false, max_concurrency: 1]
    )
    results = Enum.to_list(stream)

then with a manual allow:

    parent = self()
    stream = Task.Supervisor.async_stream_nolink(
      Module.TaskSupervisor,
      enum,
      fn (item) ->
        Ecto.Adapters.SQL.Sandbox.allow(Repo, parent, self())
        my_func(item, 2) end,
      [ordered: false, max_concurrency: 1]
    )
    results = Enum.to_list(stream)

as well as with both:

    parent = self()
    stream = Task.Supervisor.async_stream(
      Module.TaskSupervisor,
      enum,
      fn (item) ->
        Ecto.Adapters.SQL.Sandbox.allow(Repo, parent, self())
        my_func(item, 2) end,
      [ordered: false, max_concurrency: 1]
    )
    results = Enum.to_list(stream)

all unfortunately to no noticeable effect:
14:12:25.494 [error] Postgrex.Protocol (#PID<0.658.0>) disconnected: ** (DBConnection.ConnectionError) owner #PID<0.1745.0> exited while client #PID<0.1748.0> is still running with: shutdown

As for the dependency update route, that looks like a great idea. Alas we’re still on Elixir 1.6 in this project, and I think it would be a fairly large undertaking to get everything working under 1.8. I think we should probably do it at some point, but I don’t think now is the time.

Oops, it looks like I got your suggestion wrong, and had switched from Task.Supervisor.async_stream_nolink to Task.Supervisor.async_stream instead of Task.async_stream.

I’ve now tried switching to Task.async_stream both with a manual allow:

    parent = self()
    stream = Task.async_stream(
      enum,
      fn (item) ->
        Ecto.Adapters.SQL.Sandbox.allow(Repo, parent, self())
        my_func(item, 2) end,
      [ordered: false, max_concurrency: 1]
    )
    results = Enum.to_list(stream)

and without:

    stream = Task.async_stream(
      enum,
      fn (item) ->
        my_func(item, 2) end,
      [ordered: false, max_concurrency: 1]
    )
    results = Enum.to_list(stream)

And again unfortunately all to no noticeable effect: 19:08:14.135 [error] Postgrex.Protocol (#PID<0.658.0>) disconnected: ** (DBConnection.ConnectionError) owner #PID<0.1748.0> exited while client #PID<0.1751.0> is still running with: shutdown

I’ve realized that I misdiagnosed your problem.

When I tried to reproduce the error you’re getting, I realized that I had been thinking of the other error that occurs when a process cannot find a checked out connection. It looks like this:

11:14:40.287 [error] Task #PID<0.244.0> started from #PID<0.241.0> terminating                                                                                              
** (DBConnection.OwnershipError) cannot find ownership process for #PID<0.244.0>.                                                                                           
                                                                                                                                                                            
When using ownership, you must manage connections in one                                                                                                                    
of the four ways:                                                                                                                                                           

[...it goes on for a while...]

However, the error you’re getting must be caused by the test process exiting before another process stopped using the DB connection. I was only able to reproduce this as follows:

  test "Task.Supervisor.async_stream" do
    enum = [1, 2, 3]

    stream =
      Task.Supervisor.async_stream_nolink(
        Module.TaskSupervisor,
        enum,
        fn item -> my_func(item, 2) end,
        ordered: false,
        max_concurrency: 1
      )

    _result = Enum.to_list(stream)

    # We have to sleep here to allow the process spawned by my_func() 
    # to enter a transaction
    on_exit(fn ->
      Process.sleep(100)
    end)
  end

  defp my_func(user_id, _n) do
    Project.Repo.get(Project.User, user_id)

    # I could only get the error to occur when I spawned another process here.
    spawn(fn ->
      # This transaction starts before the test process exits but it sleeps for a while
      # to let the test process terminate. Then, when Project.Repo.get() is called, it'll
      # raise the error
      # Postgrex.Protocol (#PID<0.210.0>) disconnected: ** (DBConnection.ConnectionError) owner #PID<0.241.0> exited
      Project.Repo.transaction(fn ->
        Process.sleep(100)
        Project.Repo.get(Project.User, user_id)
      end)
    end)
  end

The full error I’m getting looks a bit different from yours, but I think it happen in a similar circumstance:

11:19:10.365 [error] Postgrex.Protocol (#PID<0.210.0>) disconnected: ** (DBConnection.ConnectionError) owner #PID<0.241.0> exited                                           
                                                                                                                                                                            
Client #PID<0.247.0> is still using a connection from owner at location:                                                                                                    
                                                                                                                                                                            
    (elixir) lib/process.ex:228: Process.sleep/1                                                                                                                            
    test/sandbox_test.exs:29: anonymous fn/1 in SandboxTest.my_func/2                                                                                                       
    (ecto_sql) lib/ecto/adapters/sql.ex:814: anonymous fn/3 in Ecto.Adapters.SQL.checkout_or_transaction/4                                                                  
    (db_connection) lib/db_connection.ex:1349: DBConnection.run_transaction/4                                                                                               
                                                                                                                                                                            
The connection itself was checked out by #PID<0.247.0> at location: 

In the end, you were right about your tests running with async: false since that the default, so my analysis was wrong in that regard. However, I think there’s something else going on in your test suite outside of the code snippets you’ve shared so far.

Just wanted to comment on this. The team working on Elixir does their best to keep Elixir versions backwards-compatible. The worst thing you should get after upgrading to a newer version is a bunch of new deprecation warnings.

I would actually encourage you to upgrade early because if something does get deprecated, you can stop doing that in your code and reduce the amount of work that you would otherwise have to do if you decided to upgrade months later. Dependencies also keep working as before as a rule, but since you may start getting new warnings, that can be something that prompts you to look into gradually upgrading your dependencies as well sooner rather than later. This is again going to be easier than skipping a few version of Elixir and getting a lot more issues with dependencies when you finally decide to upgrade.

2 Likes

This problem is particularly annoying for me since I use SQLite in one of the environments, which doesn’t have an Ecto 3 adapter yet. I really want to update the dependencies but am not able to. For now I have to comment out my test.

I wonder if there is an easy way to use different versions of a package depending on different environments. It seems the only way to do so would be to keep different lockfiles in the project directory?

I really don’t recommend this. If you’re testing or developing with a database type other than what you use in production you’re setting yourself up for great production surprises.

1 Like

There’s a special need in this project where the program needs to run on a local computer of somebody who doesn’t have any programming knowledge. I used Docker in the beginning but then decided that SQLite + an executable produced by distillery would be the ultimate one-click solution. Of course it wouldn’t make sense to test or develop with a different DB type.

I guess a more reasonable thing for me to do might be to just open a different branch for that, while keeping the main version to be run on the server on the main branch. Now I’m creating an extra :local environment for this purpose, that’s why the abovementioned happened.

1 Like

This makes sense. The use case with SQLite is an edge and unimportant case for this project anyways. I’d just create another branch for it. Sorry, I was quite tired and probably not thinking clearly.

It does seem to be an issue in our test setup, as it looks like you were suggesting, since if I run the same code outside of the test environment it returns correctly, and the message from IO.puts("after") following the call to the Ecto.Repo.get callback shows up.

I suspect you’re right that the issue is outside of what I’ve shared, but I’m uncertain where to look next.

The test file looks roughly (modified to preserve confidentiality) like:

defmodule Project.ModuleTest do
  use Project.ModelCase

  alias Project.{Module, Factory, TestHelpers}

  @moduletag :module_tag

  setup do
    variables = values

    %{
      more_variables: more values,
      cleanup_func: cleanup_func
    } = TestHelpers.setup_stuff_we_need(variable, other_variable)

    on_exit(cleanup_func)

    built_variables = Factory.build(variables)

    %{
      names: some_variables
    }
  end

  describe "#description" do
    test "test description", %{
      names: some_variables
    } do
      assert {:ok, %{
        specific_thing_we_need_to_check: false,
        id: result_id
      }} = Module.function(thing_id, and, some, values)

      # wait for a different second so records that only have 1 second resolution don't collide
      Process.sleep(2000)

      assert {:ok, %{
        specific_thing_we_need_to_check: true,
        something: ^specific_value,
        thing_id: ^thing_id
      }} = Module.function_that_calls_my_func(thing_id, and, some, values) # <--- the line that actually creates the issue
    end

  end
end

And the file that defines Project.ModelCase looks like

defmodule Project.ModelCase do

  use ExUnit.CaseTemplate

  using do
    quote do
      alias Project.Repo
      import Ecto
      import Ecto.Query, only: [from: 2]
      import Project.ModelCase
    end
  end

  setup tags do
    :ok = Ecto.Adapters.SQL.Sandbox.checkout(Project.Repo)

    unless tags[:async] do
      Ecto.Adapters.SQL.Sandbox.mode(Project.Repo, {:shared, self()})
    end

    :ok
  end

  def errors_on(changeset) do
    Ecto.Changeset.traverse_errors(changeset, fn {message, opts} ->
      Enum.reduce(opts, message, fn {key, value}, acc ->
        String.replace(acc, "%{#{key}}", to_string(value))
      end)
    end)
  end
end

We’ve also got some stuff in config/test.exs, and taking some guesses as to what might be relevant we’ve got

use Mix.Config

config :project, Project.Repo
  adapter: Ecto.Adapters.Postgres,
  username: System.get_env("TEST_USERNAME"),
  password: System.get_env("TEST_PASSWORD") ,
  database: System.get_env("TEST_DATABASE"),
  pool_size: 3,
  pool: Ecto.Adapters.SQL.Sandbox,
  types: Project.PostgresTypes,
  ownership_timeout: 10 * 60 * 1000