Process associated to Task.Supervisor cannot checkout DB connection in test

I am using the Sage library to perform a set of operations involving DB inserts and calling external APIs. The Sage.transaction uses Ecto.Repo.transaction to wrap all actions performed on each step (similar to Ecto.Multi)

One of the step uses Task.Supervisor.async_nolink/5 to spawn a supervised process which inserts an Oban job. Something like:

defmodule SageModule do
  @supervisor MyApp.Supervisor

  def sage_function(_) do
    |>, &insert_background_job/2)
    |> Sage.transaction(MyApp.Repo)

  defp insert_background_job(attrs, _) do
    task = Task.Supervisor.async_nolink(@supervisor, fn ->
      %{attrs: attrs}
      |> Oban.insert()

      case Task.yield(task) || Task.shutdown(task) do
        {:ok, _result} ->
        error ->
          # Log something

The code runs fine and executes the Oban job. But while testing I am unable to assert that the job was enqueued.

test "enqueues background job to notify external API" do
  assert _ = SageModule.sage_function(params)
  # Assertion fails with the following error
  #   Expected a job matching:

  #   %{worker: MyApp.NotifyExternalAPIWorker}

  #   to be enqueued in the "oban" schema. Instead found:

  #   []
  assert_enqueued worker: MyApp.NotifyExternalAPIWorker

If I try to inspect the task output from the Task.yield(task) || Task.shutdown(task) case statement, I get the following error

    message: "could not checkout the connection owned by #PID...

Is there a way I can checkout the connection for the Task.Supervisor process? I have tried using the shared mode as well but no luck.

:ok = Sandbox.checkout(MyApp.Repo)

Sandbox.mode(MyApp.Repo, {:shared, self()})

Running the sandbox in shared mode should allow the task process to access the connection. Are you sure you need to wrap inserting the Oban job in a task at all? Maybe try without the task and verify that it’s working?

Adding the Oban insert in the task is required since I dont want it to rollback in case of a transaction rollback.
Basically the example I mentioned is slightly different than my actual implementation. In my actual implementation, the insert_background_job step only gets called when any step within the Sage.transaction fails. So the actual implementation looks something like

  def sage_function(_) do
    |>, &do_something/2, &insert_background_job/3)
    |>, &other_step_function/2)
    |> Sage.transaction(MyApp.Repo)

In the corrected example, the Sage transaction rolls back in case any of the step fails. So in this case, if :some_other_step_that_might_cause_rollback fails, everyting will be rolled back and additionally any compensation functions associated to a step will be executed. In this case it is the insert_background_job/3. More info on Sage compensation functions can be found here.

I had tried inserting the Oban job directly without wrapping it inside a task but unfortunately the transaction rollback causes it to rollback as well. Also for some reason the shared mode does not ensure the connection is granted to the task.