A graft inside another graft causes Oban.Pro.Workflow.status to recurse forever. oban_pro 1.7.5
defmodule App.NestedGraftRepro.Test do
# Minimal, self-contained reproduction of an Oban.Pro nested-graft bug (oban_pro 1.7.5).
#
# A graft that executes while its own job's `workflow_id` is already "grafted_"-prefixed
# (i.e. a graft nested inside a grafted workflow) produces a sub-job whose
# `workflow_id == sup_workflow_id`. `Oban.Pro.Workflow.status/1` walks sub-workflows via
# `sup_workflow_id -> workflow_id` with no visited-set, so that self-edge makes
# `expand_status` recurse forever (here: child process killed at max_heap).
#
# Topology:
# - W -> add_graft(:outer);
# - outer grafts a workflow containing add_graft(:inner);
# - inner grafts a leaf -> leaf.workflow_id == leaf.sup_workflow_id.
use ExUnit.Case, async: false
# `prefix: "sunrise"` matches this app's Oban schema; upstream you'd drop it (default "public").
use Oban.Pro.Testing, repo: App.Repo
import Ecto.Query
setup do
pid = Ecto.Adapters.SQL.Sandbox.start_owner!(App.Repo, shared: true)
on_exit(fn -> Ecto.Adapters.SQL.Sandbox.stop_owner(pid) end)
:ok
end
defmodule Leaf do
use Oban.Pro.Worker, queue: :general, max_attempts: 1
@impl Oban.Pro.Worker
def process(%Oban.Job{}), do: :ok
end
defmodule InnerGrafter do
use Oban.Pro.Worker, queue: :general, max_attempts: 1
@impl Oban.Pro.Worker
def process(%Oban.Job{}) do
Oban.Pro.Workflow.new()
|> Oban.Pro.Workflow.add(:leaf, Leaf.new(%{}))
|> Oban.Pro.Workflow.apply_graft()
|> Oban.insert_all()
:ok
end
end
defmodule OuterGrafter do
use Oban.Pro.Worker, queue: :general, max_attempts: 1
@impl Oban.Pro.Worker
def process(%Oban.Job{}) do
# Graft a workflow that itself contains a graft point -> nesting.
Oban.Pro.Workflow.new()
|> Oban.Pro.Workflow.add_graft(:inner, InnerGrafter.new(%{}))
|> Oban.Pro.Workflow.apply_graft()
|> Oban.insert_all()
:ok
end
end
test "nested graft produces workflow_id == sup_workflow_id, and status/1 never terminates" do
Oban.Testing.with_testing_mode(:manual, fn ->
Oban.Pro.Workflow.new()
|> Oban.Pro.Workflow.add_graft(:outer, OuterGrafter.new(%{}))
|> Oban.insert_all()
# Recursive drain (default) runs the dynamically-grafted jobs as they're inserted.
drain_jobs(queue: :general, with_safety: true)
jobs =
from(j in Oban.Job,
select: %{
name: fragment("?->>'name'", j.meta),
wid: fragment("?->>'workflow_id'", j.meta),
sup: fragment("?->>'sup_workflow_id'", j.meta)
}
)
|> App.Repo.all()
for j <- jobs, do: IO.puts("JOB #{j.name} wid=#{j.wid} sup=#{j.sup}")
# 1. The broken state: a job whose workflow_id == its own sup_workflow_id.
selfloop = Enum.find(jobs, fn j -> not is_nil(j.sup) and j.wid == j.sup end)
assert selfloop, "expected a workflow_id == sup_workflow_id self-loop job, found none"
IO.puts("SELF-LOOP NODE: name=#{selfloop.name} wid=#{selfloop.wid} sup=#{selfloop.sup}")
# 2. status/1 on that workflow never returns (infinite expand_status recursion).
# Run it in a heap-capped child so the test process survives.
parent = self()
{pid, ref} =
:erlang.spawn_opt(
fn ->
result = Oban.Pro.Workflow.status(selfloop.wid)
send(parent, {:returned, result})
end,
[:monitor, {:max_heap_size, %{size: 2_000_000, kill: true, error_logger: false}}]
)
outcome =
receive do
{:returned, _} -> :returned
{:DOWN, ^ref, :process, ^pid, reason} -> {:died, reason}
after
8_000 ->
Process.exit(pid, :kill)
:timeout
end
IO.puts("status/1 outcome: #{inspect(outcome)}")
refute outcome == :returned,
"status/1 returned — infinite recursion NOT reproduced (bug may be fixed)"
end)
end
end






















