Disabling parallel connections in Ecto preloads

I’m trying to disable parallel execution in ecto because i’m wrapping all my resolver queries in a transaction. (I’m doing this b/c the database is using row-level security, which requires me to set a config value in postgres before making my queries. You can see an example of that here.)

When dataloader is loading associations via ecto’s preload, they’re running in a different db connection, which means it’s not running in the same transaction. i’m passing in_parallel: false in my repo options, but i still seem to be having the same troubles.

To troubleshoot, I’ve even tried disabling parallel preloads altogether in ecto in this function:

  defp maybe_pmap(preloaders, repo_name, opts) do
    # if match?([_,_|_], preloaders) and not checked_out?(repo_name) and
    #      Keyword.get(opts, :in_parallel, false) do
    #   # We pass caller: self() so the ownership pool knows where
    #   # to fetch the connection from and set the proper timeouts.
    #   # Note while the ownership pool uses '$callers' from pdict,
    #   # it does not do so in automatic mode, hence this line is
    #   # still necessary.
    #   opts = Keyword.put_new(opts, :caller, self())
    #
    #   preloaders
    #   |> Task.async_stream(&(&1.(opts)), timeout: :infinity)
    #   |> Enum.map(fn {:ok, assoc} -> assoc end)
    # else
      Logger.warn("Running preloaders in parallel is disabled")
      Enum.map(preloaders, &(&1.(opts)))
    # end
  end

…but i’m still checking out different connections for my preloads. Really appreciate any help figuring this out.

Well, you can always use table-wide exclusive access?

Or utilize Redis’ distributed locks.

(Additionally, please format your code with triple backticks at the start and the end like this: ```)

1 Like

Sorry about the formatting, edited to fix. I’m not familiar with table-wide exclusive access, but am I wrong in thinking that in theory this should work, assuming I have ecto properly configured? I’d like to stick with row-level security, and as far as I can tell ecto would support my use case — but it seems like I’m missing something.

I wasn’t addressing your initial approach because I have no experience with it. Plus I wouldn’t want to rely on modified Ecto.

Look for “ACCESS EXCLUSIVE” on the page.

I don’t see a correlation between row-level security and serialized (non-parallel) access. Maybe others can chime in because I never used it.

I should clarify a couple of points:

I only modified ecto to ensure that it wasn’t running preloads in parallel. In theory you can do this with config, but the modified ecto is just for troubleshooting purposes.

Row-level security requires setting a config in postgres, something like this:

        SELECT
          set_config('role', 'my_authenticated_user', true),
          set_config('jwt.claims.session_id', my_session_id, true)

That session id is then used throughout postgres’s row-level security policies. The reason that parallel execution doesn’t work is that new connections aren’t running with those configs set, since they’re no longer running in the transaction where I set them (see this example).

1 Like

I think Repo.preload is a red herring here - dataloader does the actual loading in a separate process already:

2 Likes

@al2o3cr is correct, Dataloader adds its own parallelism here. However, I don’t think you need to remove it, you just need to use the available callbacks to set the value:

def data(context) do
    current_user_id = user_id_from_context(context) # adjust for your code

    Dataloader.Ecto.new(
      YourApp.Repo,
      query: &query/2,
      run_batch: &run_batch(user_id, &1, &2, &3, &4, &5),
      default_params: ctx
    )
end
  def run_batch(user_id, queryable, query, col, inputs, repo_opts) do
    Repo.transaction(fn ->
      # call your row level security config stuff here with the provided user id
      Dataloader.Ecto.run_batch(Sensetra.Repo, queryable, query, col, inputs, repo_opts)
    end, repo_opts)
  end
2 Likes

@al2o3cr thanks so much for pointing that out!

@benwilson512 that looks perfect. The only thing I’m stuck on with that sample is: How does this data callback work? I’m not sure how to access my context in there.

ETA: OH WAIT I see it, it’s in the context callback in the Absinthe schema. Thanks so much!

Ah @benwilson512 I’ve run into a snag with that path. When it’s running on an association, it doesn’t call that run_batch callback — it calls repo.preload instead:

      defp run_batch({{:assoc, schema, pid, field, queryable, opts} = key, records}, source) do
        {ids, records} = Enum.unzip(records)
        query = source.query.(queryable, opts) |> Ecto.Queryable.to_query()
        repo_opts = Keyword.put(source.repo_opts, :caller, pid)
        empty = schema |> struct |> Map.fetch!(field)
        records = records |> Enum.map(&Map.put(&1, field, empty))
        require Logger
        Logger.warn("Not using the custom batching function")

        results =
          if query.limit || query.offset do
            records
            |> preload_lateral(field, query, source.repo, repo_opts)
          else
            records
            |> source.repo.preload([{field, query}], repo_opts)
          end

        results = results |> Enum.map(&Map.get(&1, field))
        {key, Map.new(Enum.zip(ids, results))}
      end

So it’ll only run in that transaction when the batch is a queryable. Associations will still run out of the transaction. Am I interpreting that correctly?

I also noticed this dataloader PR that allows the user to optionally disable async execution if it’s not possible to wrap the associations in transactions.

I don’t think so, because preload here is only passed a single item |> source.repo.preload([{field, query}] and the preloader function you linked from Ecto will only pmap when there are multiple items.