I’m trying to disable parallel execution in ecto because i’m wrapping all my resolver queries in a transaction. (I’m doing this b/c the database is using row-level security, which requires me to set a config value in postgres before making my queries. You can see an example of that here.)
When dataloader is loading associations via ecto’s preload, they’re running in a different db connection, which means it’s not running in the same transaction. i’m passing in_parallel: false in my repo options, but i still seem to be having the same troubles.
To troubleshoot, I’ve even tried disabling parallel preloads altogether in ecto in this function:
defp maybe_pmap(preloaders, repo_name, opts) do
# if match?([_,_|_], preloaders) and not checked_out?(repo_name) and
# Keyword.get(opts, :in_parallel, false) do
# # We pass caller: self() so the ownership pool knows where
# # to fetch the connection from and set the proper timeouts.
# # Note while the ownership pool uses '$callers' from pdict,
# # it does not do so in automatic mode, hence this line is
# # still necessary.
# opts = Keyword.put_new(opts, :caller, self())
#
# preloaders
# |> Task.async_stream(&(&1.(opts)), timeout: :infinity)
# |> Enum.map(fn {:ok, assoc} -> assoc end)
# else
Logger.warn("Running preloaders in parallel is disabled")
Enum.map(preloaders, &(&1.(opts)))
# end
end
…but i’m still checking out different connections for my preloads. Really appreciate any help figuring this out.
Sorry about the formatting, edited to fix. I’m not familiar with table-wide exclusive access, but am I wrong in thinking that in theory this should work, assuming I have ecto properly configured? I’d like to stick with row-level security, and as far as I can tell ecto would support my use case — but it seems like I’m missing something.
I only modified ecto to ensure that it wasn’t running preloads in parallel. In theory you can do this with config, but the modified ecto is just for troubleshooting purposes.
Row-level security requires setting a config in postgres, something like this:
That session id is then used throughout postgres’s row-level security policies. The reason that parallel execution doesn’t work is that new connections aren’t running with those configs set, since they’re no longer running in the transaction where I set them (see this example).
@al2o3cr is correct, Dataloader adds its own parallelism here. However, I don’t think you need to remove it, you just need to use the available callbacks to set the value:
def data(context) do
current_user_id = user_id_from_context(context) # adjust for your code
Dataloader.Ecto.new(
YourApp.Repo,
query: &query/2,
run_batch: &run_batch(user_id, &1, &2, &3, &4, &5),
default_params: ctx
)
end
def run_batch(user_id, queryable, query, col, inputs, repo_opts) do
Repo.transaction(fn ->
# call your row level security config stuff here with the provided user id
Dataloader.Ecto.run_batch(Sensetra.Repo, queryable, query, col, inputs, repo_opts)
end, repo_opts)
end
@benwilson512 that looks perfect. The only thing I’m stuck on with that sample is: How does this data callback work? I’m not sure how to access my context in there.
ETA: OH WAIT I see it, it’s in the context callback in the Absinthe schema. Thanks so much!
Ah @benwilson512 I’ve run into a snag with that path. When it’s running on an association, it doesn’t call that run_batch callback — it calls repo.preload instead:
defp run_batch({{:assoc, schema, pid, field, queryable, opts} = key, records}, source) do
{ids, records} = Enum.unzip(records)
query = source.query.(queryable, opts) |> Ecto.Queryable.to_query()
repo_opts = Keyword.put(source.repo_opts, :caller, pid)
empty = schema |> struct |> Map.fetch!(field)
records = records |> Enum.map(&Map.put(&1, field, empty))
require Logger
Logger.warn("Not using the custom batching function")
results =
if query.limit || query.offset do
records
|> preload_lateral(field, query, source.repo, repo_opts)
else
records
|> source.repo.preload([{field, query}], repo_opts)
end
results = results |> Enum.map(&Map.get(&1, field))
{key, Map.new(Enum.zip(ids, results))}
end
So it’ll only run in that transaction when the batch is a queryable. Associations will still run out of the transaction. Am I interpreting that correctly?
I also noticed this dataloader PR that allows the user to optionally disable async execution if it’s not possible to wrap the associations in transactions.
I don’t think so, because preload here is only passed a single item |> source.repo.preload([{field, query}] and the preloader function you linked from Ecto will only pmap when there are multiple items.