Nested grafts + Oban.Pro.Workflow.status = Infinite recursion

A graft inside another graft causes Oban.Pro.Workflow.status to recurse forever. oban_pro 1.7.5

defmodule App.NestedGraftRepro.Test do
  # Minimal, self-contained reproduction of an Oban.Pro nested-graft bug (oban_pro 1.7.5).
  #
  # A graft that executes while its own job's `workflow_id` is already "grafted_"-prefixed
  # (i.e. a graft nested inside a grafted workflow) produces a sub-job whose
  # `workflow_id == sup_workflow_id`. `Oban.Pro.Workflow.status/1` walks sub-workflows via
  # `sup_workflow_id -> workflow_id` with no visited-set, so that self-edge makes
  # `expand_status` recurse forever (here: child process killed at max_heap).
  #
  # Topology: 
  # - W -> add_graft(:outer); 
  #   - outer grafts a workflow containing add_graft(:inner); 
  #     - inner grafts a leaf -> leaf.workflow_id == leaf.sup_workflow_id.
  use ExUnit.Case, async: false
  # `prefix: "sunrise"` matches this app's Oban schema; upstream you'd drop it (default "public").
  use Oban.Pro.Testing, repo: App.Repo

  import Ecto.Query

  setup do
    pid = Ecto.Adapters.SQL.Sandbox.start_owner!(App.Repo, shared: true)
    on_exit(fn -> Ecto.Adapters.SQL.Sandbox.stop_owner(pid) end)
    :ok
  end

  defmodule Leaf do
    use Oban.Pro.Worker, queue: :general, max_attempts: 1
    @impl Oban.Pro.Worker
    def process(%Oban.Job{}), do: :ok
  end

  defmodule InnerGrafter do
    use Oban.Pro.Worker, queue: :general, max_attempts: 1

    @impl Oban.Pro.Worker
    def process(%Oban.Job{}) do
      Oban.Pro.Workflow.new()
      |> Oban.Pro.Workflow.add(:leaf, Leaf.new(%{}))
      |> Oban.Pro.Workflow.apply_graft()
      |> Oban.insert_all()

      :ok
    end
  end

  defmodule OuterGrafter do
    use Oban.Pro.Worker, queue: :general, max_attempts: 1

    @impl Oban.Pro.Worker
    def process(%Oban.Job{}) do
      # Graft a workflow that itself contains a graft point -> nesting.
      Oban.Pro.Workflow.new()
      |> Oban.Pro.Workflow.add_graft(:inner, InnerGrafter.new(%{}))
      |> Oban.Pro.Workflow.apply_graft()
      |> Oban.insert_all()

      :ok
    end
  end

  test "nested graft produces workflow_id == sup_workflow_id, and status/1 never terminates" do
    Oban.Testing.with_testing_mode(:manual, fn ->
      Oban.Pro.Workflow.new()
      |> Oban.Pro.Workflow.add_graft(:outer, OuterGrafter.new(%{}))
      |> Oban.insert_all()

      # Recursive drain (default) runs the dynamically-grafted jobs as they're inserted.
      drain_jobs(queue: :general, with_safety: true)

      jobs =
        from(j in Oban.Job,
          select: %{
            name: fragment("?->>'name'", j.meta),
            wid: fragment("?->>'workflow_id'", j.meta),
            sup: fragment("?->>'sup_workflow_id'", j.meta)
          }
        )
        |> App.Repo.all()

      for j <- jobs, do: IO.puts("JOB #{j.name} wid=#{j.wid} sup=#{j.sup}")

      # 1. The broken state: a job whose workflow_id == its own sup_workflow_id.
      selfloop = Enum.find(jobs, fn j -> not is_nil(j.sup) and j.wid == j.sup end)
      assert selfloop, "expected a workflow_id == sup_workflow_id self-loop job, found none"
      IO.puts("SELF-LOOP NODE: name=#{selfloop.name} wid=#{selfloop.wid} sup=#{selfloop.sup}")

      # 2. status/1 on that workflow never returns (infinite expand_status recursion).
      #    Run it in a heap-capped child so the test process survives.
      parent = self()

      {pid, ref} =
        :erlang.spawn_opt(
          fn ->
            result = Oban.Pro.Workflow.status(selfloop.wid)
            send(parent, {:returned, result})
          end,
          [:monitor, {:max_heap_size, %{size: 2_000_000, kill: true, error_logger: false}}]
        )

      outcome =
        receive do
          {:returned, _} -> :returned
          {:DOWN, ^ref, :process, ^pid, reason} -> {:died, reason}
        after
          8_000 ->
            Process.exit(pid, :kill)
            :timeout
        end

      IO.puts("status/1 outcome: #{inspect(outcome)}")

      refute outcome == :returned,
             "status/1 returned — infinite recursion NOT reproduced (bug may be fixed)"
    end)
  end
end

Welcome :waving_hand: @anotherpit,

Thanks for catching a strange edge case and giving us the report. It’s fixed and ready for the next patch release.

2 Likes