DB Connection Ownership Issue with Custom Association Preloader using `Task.async_stream`

Hello Elixir community,

I have recently been working on an association library for our project, which is used to load associations across bounded contexts and provide custom association mapping between these contexts. We recently added a preloader function to the library, intended to work similarly to how Repo.preload works but using our custom loader function for associations.

The implementation works well for the most part. However, I encountered an issue when trying to optimize the loading of sibling associations by using Task.async_stream to handle them asynchronously, similar to the approach taken by Repo.preload.

Despite using Task.async_stream, I am experiencing DB connection ownership issues. Here is a simplified version of the code structure:

      def preload(sources, preloads, adapter_opts) when is_list(sources) do
        adapter_opts =
          adapter_opts
          |> Keyword.put_new(:caller, self())
          |> Keyword.put_new(:after_load_hook, :disabled)

        sources =
          sources
          |> Enum.reject(&is_nil/1)
          |> Enum.uniq_by(&find_or_create_primary_key/1)

        [preloads]
        |> List.flatten()
        |> Task.async_stream(&process_preload(&1, sources, adapter_opts), timeout: :infinity)
        |> Enum.reduce(sources, &handle_preload_results/2)
      end

Error it raises

(DBConnection.ConnectionError) could not checkout the connection owned by #PID<0.1735.0>. When using the sandbox, connections are shared, so this may imply another process is using a connection. Reason: connection not available and request was dropped from queue after 8191ms. You can configure how long requests wait in the queue using :queue_target and :queue_interval. See DBConnection.start_link/2 for more information

Issues:

  • DB Connection Ownership: When using Task.async_stream, it seems like the DB connection ownership isn’t properly managed, leading to errors. I assume this is related to how connections are checked out and used within the asynchronous tasks.

Questions:

  1. Proper DB Connection Handling: What is the best practice for managing DB connections when performing asynchronous tasks in this context? How can I ensure that each task properly handles DB connections without running into ownership issues?
  2. Alternatives to Task.async_stream: Are there better alternatives to Task.async_stream for this use case that might avoid these issues, or specific configurations I should consider?

Any guidance, examples, or pointers to relevant resources would be greatly appreciated!

Thank you in advance for your help.

Can you post the actual errors you are getting during the running of the above code?

FWIW, there are two mechanisms in Ecto’s Preloader that seem to address the issues you’re seeing:

The not adapter_meta.adapter.checked_out?(adapter_meta) clause will skip parallel preloading entirely if inside a transaction.

I haven’t traced where the :caller option ends up being used, but the comment above it sounds very related to your issue.

Sure, here is the error it gives for above code

 ** (DBConnection.ConnectionError) could not checkout the connection owned by
 #PID<0.1735.0>. When using the sandbox, connections are shared, so this may
imply another process is using a connection. 
Reason: connection not available and request was dropped from queue after 8191ms.
You can configure how long requests wait in the queue using :queue_target and
:queue_interval. 
See DBConnection.start_link/2 for more informatio```

hmm interesting. Seems the code that causes this error is running inside of transation, which is not checked in my case I think. Let me try the check.

Naively I’d think you have too much concurrency.