I am using the Sage library to perform a set of operations involving DB inserts and calling external APIs. The Sage.transaction uses Ecto.Repo.transaction
to wrap all actions performed on each step (similar to Ecto.Multi
)
One of the step uses Task.Supervisor.async_nolink/5 to spawn a supervised process which inserts an Oban job. Something like:
defmodule SageModule do
@supervisor MyApp.Supervisor
def sage_function(_) do
Sage.new()
|> Sage.run(:do_something, &insert_background_job/2)
|> Sage.transaction(MyApp.Repo)
end
defp insert_background_job(attrs, _) do
task = Task.Supervisor.async_nolink(@supervisor, fn ->
%{attrs: attrs}
|> MyApp.NotifyExternalAPIWorker.new()
|> Oban.insert()
case Task.yield(task) || Task.shutdown(task) do
{:ok, _result} ->
:ok
error ->
# Log something
end
end
end
The code runs fine and executes the Oban job. But while testing I am unable to assert that the job was enqueued.
test "enqueues background job to notify external API" do
assert _ = SageModule.sage_function(params)
# Assertion fails with the following error
# Expected a job matching:
# %{worker: MyApp.NotifyExternalAPIWorker}
# to be enqueued in the "oban" schema. Instead found:
# []
assert_enqueued worker: MyApp.NotifyExternalAPIWorker
end
If I try to inspect the task output from the Task.yield(task) || Task.shutdown(task)
case statement, I get the following error
{:exit,
{%DBConnection.ConnectionError{
message: "could not checkout the connection owned by #PID...
Is there a way I can checkout the connection for the Task.Supervisor process? I have tried using the shared mode as well but no luck.
:ok = Sandbox.checkout(MyApp.Repo)
Sandbox.mode(MyApp.Repo, {:shared, self()})