Bug or incorrect usage w/ sub-workflows from a graft with a `cascade_capture`

I have a reproduction below that I asked Claude to extract from our implementation, and seems to suffer the same issue.

The background is we have a workflow that loads data from our DB (populated from a previous job), and then for each item (for us it’s insurance plans) we need to fetch n child items (policies for the plan) from an API. There are ~200 plans and per plan there’s 300-2000 policies. Due to the API’s pagination, and the variability of the number of policies, we use a sub-workflow per plan.

What I’m seeing is that the sub-workflows never run, or insert into the database.

From the documentation, it seems like I should not be using Oban.insert_all in the sub-workflow defining function, as that would result in orphaned sub-workflows.

Am I misusing apply_graft?

defmodule NestedWorkflowRepro do
  @moduledoc """
  Minimal reproduction of multi-level nested workflow pattern using Oban.Pro.Workflow.

  This emulates the structure in PolicyBotPolicySync:
  - Level 1: Main workflow fetches items
  - Level 2: Graft per-item workflows
  - Level 3: Graft per-chunk workflows within each item workflow

  Each step returns simple concatenated strings for easy debugging.
  """

  use Oban.Pro.Worker, queue: :default, max_attempts: 1

  alias Oban.Pro.Workflow

  require Logger

  # Level 1: Main workflow functions

  @doc """
  Fetches the initial list of items to process.
  Returns %{items: ["A", "B", "C"]}
  """
  def fetch_items(_context) do
    items = ["A", "B", "C"]
    Logger.info("fetch_items: #{inspect(items)}")
    %{items: items, result: "fetched_items"}
  end

  @doc """
  Grafts a sub-workflow for each item.
  This is Level 2 grafting - creates per-item workflows.
  """
  def graft_per_item(%{fetch_items: %{items: items}}) do
    Logger.info("graft_per_item: creating workflows for #{length(items)} items")

    {items, &process_item_workflow/2}
    |> Workflow.apply_graft(context: %{batch_size: 2})
    |> Oban.insert_all()
  end

  # Level 2: Per-item workflow functions

  @doc """
  Creates a workflow to process a single item.
  This workflow will:
  1. Process initial batch
  2. Determine if more batches are needed
  3. If needed, graft sub-workflows for remaining batches (Level 3)
  """
  def process_item_workflow(item, %{batch_size: batch_size}) do
    workflow_name = "nested_workflow_repro_item_#{item}"

    Workflow.new(workflow_name: workflow_name)
    |> Workflow.put_context(%{
      item: item,
      batch_size: batch_size
    })
    |> Workflow.add_cascade(:process_initial_batch, &process_initial_batch/1)
    |> Workflow.add_cascade(:determine_remaining, &determine_remaining/1, deps: :process_initial_batch)
    |> Workflow.add_graft(:process_remaining_batches, &graft_remaining_batches/1,
      deps: [:process_initial_batch, :determine_remaining]
    )
  end

  @doc """
  Processes the initial batch for an item.
  Simulates work that determines total work needed.
  """
  def process_initial_batch(%{item: item, batch_size: batch_size}) do
    # Simulate: item "A" has 5 total chunks, "B" has 3, "C" has 1
    total_chunks =
      case item do
        "A" -> 5
        "B" -> 3
        "C" -> 1
      end

    processed = min(batch_size, total_chunks)

    result = "item_#{item}_initial_batch_#{processed}"
    Logger.info("process_initial_batch: #{result}")

    %{
      result: result,
      total_chunks: total_chunks,
      processed: processed,
      needs_more: processed < total_chunks
    }
  end

  @doc """
  Determines what remaining work is needed.
  """
  def determine_remaining(%{
        process_initial_batch: %{total_chunks: total, processed: processed, needs_more: needs_more},
        item: item
      }) do
    result = "item_#{item}_determined_#{total - processed}_remaining"
    Logger.info("determine_remaining: #{result}")

    %{
      result: result,
      remaining_chunks: total - processed,
      needs_more: needs_more
    }
  end

  @doc """
  Grafts sub-workflows for remaining batches if needed.
  This is Level 3 grafting - creates per-chunk workflows.
  """
  def graft_remaining_batches(%{
        process_initial_batch: %{processed: processed, total_chunks: total, needs_more: true},
        item: item,
        batch_size: batch_size
      }) do
    # Calculate chunks needed (similar to calculate_policy_chunks)
    chunks =
      Range.new(processed, total - 1, batch_size)
      |> Enum.map(fn idx -> %{chunk_index: idx, size: batch_size} end)

    Logger.info("graft_remaining_batches: item=#{item}, chunks=#{length(chunks)}")

    # Level 3: Graft per-chunk workflows
    {chunks, &process_chunk/2}
    |> Workflow.apply_graft()
    |> Oban.insert_all()
  end

  # No additional chunks needed
  def graft_remaining_batches(%{process_initial_batch: %{needs_more: false}, item: item}) do
    Logger.info("graft_remaining_batches: item=#{item}, no additional chunks needed")

    # Return empty workflow
    Workflow.new()
    |> Oban.insert_all()
  end

  # Level 3: Per-chunk workflow functions

  @doc """
  Processes a single chunk within an item's workflow.
  """
  def process_chunk(%{chunk_index: idx, size: size}, %{item: item}) do
    result = "item_#{item}_chunk_#{idx}_size_#{size}"
    Logger.info("process_chunk: #{result}")

    %{result: result}
  end

  @doc """
  Builds the main workflow.

  This creates a 3-level nested workflow structure:
  - Level 1: Main workflow (fetch items -> graft per-item)
  - Level 2: Per-item workflows (process initial -> graft per-chunk)
  - Level 3: Per-chunk workflows (process individual chunks)
  """
  def workflow(opts \\ []) do
    batch_size = Keyword.get(opts, :batch_size, 2)

    Workflow.new(workflow_name: "nested_workflow_repro_main")
    |> Workflow.put_context(%{batch_size: batch_size})
    |> Workflow.add_cascade(:fetch_items, &fetch_items/1)
    |> Workflow.add_graft(:process_items, &graft_per_item/1, deps: [:fetch_items])
  end

  @impl Oban.Pro.Worker
  def process(%Oban.Job{args: args}) do
    batch_size = Map.get(args, "batch_size", 2)

    workflow(batch_size: batch_size)
    |> Oban.insert_all()

    :ok
  end
end

Semi-related, but when then viewing the Cascade jobs in Oban.Web for the process_item_workflow I get an error.

Presumably it’s not able to render the Workflow struct

13:36:37.150 line=2785 pid=<0.92537.0> file=gen_server.erl domain=otp mfa=:gen_server.error_info/7 [error] GenServer #PID<0.92537.0> terminating
** (ArgumentError) cannot deserialize &Ecto.Type.empty_trimmed?/2, the term is not safe for deserialization
    (plug_crypto 2.1.1) lib/plug/crypto.ex:73: Plug.Crypto.non_executable_terms/1
    (plug_crypto 2.1.1) lib/plug/crypto.ex:80: Plug.Crypto.non_executable_list/1
    (plug_crypto 2.1.1) lib/plug/crypto.ex:59: anonymous fn/3 in Plug.Crypto.non_executable_terms/1
    (stdlib 7.1) maps.erl:894: :maps.fold_1/4
    (plug_crypto 2.1.1) lib/plug/crypto.ex:80: Plug.Crypto.non_executable_list/1
    (plug_crypto 2.1.1) lib/plug/crypto.ex:59: anonymous fn/3 in Plug.Crypto.non_executable_terms/1
    (stdlib 7.1) maps.erl:894: :maps.fold_1/4

Changing graft_per_item to this:

  def graft_per_item(%{fetch_items: %{items: items}} = context) do
    Logger.info("graft_per_item: creating workflows for #{length(items)} items")

    workflow =
      items
      |> Enum.reduce(Workflow.new(), fn item, workflow ->
        Workflow.add_workflow(workflow, item, process_item_workflow(item, context))
      end)

    workflow
    |> Workflow.apply_graft()
    |> Oban.insert_all()

    :ok
  end

results in an Ecto.MultipleResultsError from trying to fetch multiple contexts…

14:56:50.735 line=36 memory=204528 pid=<0.146908.0> reason=** (Ecto.MultipleResultsError) expected at most one result but got 3 in query:

...

You need to use insert_all after apply_graft or the jobs won’t be inserted at all.

This is because the term (whatever the struct name is) isn’t available on the Web node and it’s an unsafe action to render it. You can override that with a custom resolver ( Oban.Web.Resolver — Oban Web v2.11.6 ). It can also happen in development mode because modules are loaded lazily.

That effectively grafts multiple workflows like they are one, which causes that MultipleResultsError as you saw.

We’re working through your top example to see if there’s an alternate version that will work, or if there’s a bug to fix.

1 Like

Ok, so at least “surface-level” the place where I’m calling Oban.insert_all in graft_per_item makes sense, since it’s after the apply_graft

Ok <3

I’ll also post here if I find a workaround. I am going to try breaking it into Worker modules at different layers to see if anything helps there too. I also changed it to explicitly set workflow_id in the Workflow.new but it looks like it does the same thing you were mentioning (trying to run it as a single workflow), by using the same workflow as when you don’t explicitly provide it :frowning:

  def process_item_workflow(item, %{batch_size: batch_size} = context) do
    workflow_name = "nested_workflow_repro_item_#{item}"
    workflow_id = workflow_name <> UUIDv7.generate()

    Workflow.new(workflow_name: workflow_name, workflow_id: workflow_id)

TL;DR - after refactoring some to work around it, I can’t get after_cancelled to be called in test for a step dependent on a failing grafted step, but it calls it correctly in dev.

I’ve refactored the process_item_workflow into its own worker that for right now just builds the workflow then inserts it, so they are independent of the “top-level”/original workflow. I’ve also extracted all the add_cascade steps from that “per-item” workflow into small workers, and swapped to just add.

I’ve then added a final step in the per-item workflow that depends on :process_remaining_batches and updates the DB to say that it’s all complete, and an after_cancelled that marks it as “failed”.

This works okay for now, but there’s an issue I’m encountering with testing the MarkCompletedWorker.after_cancelled scenario. When running manually in dev it correctly fires the process if everything runs correctly, and fires the after_cancelled if one of the grafted steps fails enough.

But when I use run_workflow or drain_jobs I can’t get the callback to fire for the grafted jobs. It does fire the callback when I cause the :process_initial_batch step’s worker to fail and I use drain_jobs. run_workflow just fails from the induced error, but I assume this is intended, since it’s running synchronously.

I thought this might be semi-related, since it seems to be around graft+sub-workflow handling.

Nice to hear that you were able to refactor around the issue for now.

Are you calling run_workflow or drain_jobs with safety enabled (with_safety: true)? If not, a crash or exception will bubble up into the test and the after_cancelled callback won’t have a chance to run.

Unfortunately, yes that’s using drain_jobs(queue: :all, with_safety: true) (and same options when testing run_workflow)

Interesting thing I think I forgot to mention is that since I’m tagging capture_log: true I can see that it’s logging the (intentionally induced) errors I emit from the grafted jobs when I use drain_jobs (because ExUnit.CaptureLog only shows the logs if the test fails) but I don’t get any logs if I use run_workflow.

This doesn’t explain why the after_cancelled isn’t running for drain_jobs on the “grafted errors” scenario but it would seem related to why run_workflow wasn’t running the callback on the “initial step error” scenario.