I am using the Sage library to perform a set of operations involving DB inserts and calling external APIs. The Sage.transaction uses Ecto.Repo.transaction to wrap all actions performed on each step (similar to Ecto.Multi)
One of the step uses Task.Supervisor.async_nolink/5 to spawn a supervised process which inserts an Oban job. Something like:
defmodule SageModule do
@supervisor MyApp.Supervisor
def sage_function(_) do
Sage.new()
|> Sage.run(:do_something, &insert_background_job/2)
|> Sage.transaction(MyApp.Repo)
end
defp insert_background_job(attrs, _) do
task = Task.Supervisor.async_nolink(@supervisor, fn ->
%{attrs: attrs}
|> MyApp.NotifyExternalAPIWorker.new()
|> Oban.insert()
case Task.yield(task) || Task.shutdown(task) do
{:ok, _result} ->
:ok
error ->
# Log something
end
end
end
The code runs fine and executes the Oban job. But while testing I am unable to assert that the job was enqueued.
test "enqueues background job to notify external API" do
assert _ = SageModule.sage_function(params)
# Assertion fails with the following error
# Expected a job matching:
# %{worker: MyApp.NotifyExternalAPIWorker}
# to be enqueued in the "oban" schema. Instead found:
# []
assert_enqueued worker: MyApp.NotifyExternalAPIWorker
end
If I try to inspect the task output from the Task.yield(task) || Task.shutdown(task) case statement, I get the following error
{:exit,
{%DBConnection.ConnectionError{
message: "could not checkout the connection owned by #PID...
Is there a way I can checkout the connection for the Task.Supervisor process? I have tried using the shared mode as well but no luck.
:ok = Sandbox.checkout(MyApp.Repo)
Sandbox.mode(MyApp.Repo, {:shared, self()})






















