I have a reproduction below that I asked Claude to extract from our implementation, and seems to suffer the same issue.
The background is we have a workflow that loads data from our DB (populated from a previous job), and then for each item (for us it’s insurance plans) we need to fetch n child items (policies for the plan) from an API. There are ~200 plans and per plan there’s 300-2000 policies. Due to the API’s pagination, and the variability of the number of policies, we use a sub-workflow per plan.
What I’m seeing is that the sub-workflows never run, or insert into the database.
From the documentation, it seems like I should not be using Oban.insert_all in the sub-workflow defining function, as that would result in orphaned sub-workflows.
Am I misusing apply_graft?
defmodule NestedWorkflowRepro do
@moduledoc """
Minimal reproduction of multi-level nested workflow pattern using Oban.Pro.Workflow.
This emulates the structure in PolicyBotPolicySync:
- Level 1: Main workflow fetches items
- Level 2: Graft per-item workflows
- Level 3: Graft per-chunk workflows within each item workflow
Each step returns simple concatenated strings for easy debugging.
"""
use Oban.Pro.Worker, queue: :default, max_attempts: 1
alias Oban.Pro.Workflow
require Logger
# Level 1: Main workflow functions
@doc """
Fetches the initial list of items to process.
Returns %{items: ["A", "B", "C"]}
"""
def fetch_items(_context) do
items = ["A", "B", "C"]
Logger.info("fetch_items: #{inspect(items)}")
%{items: items, result: "fetched_items"}
end
@doc """
Grafts a sub-workflow for each item.
This is Level 2 grafting - creates per-item workflows.
"""
def graft_per_item(%{fetch_items: %{items: items}}) do
Logger.info("graft_per_item: creating workflows for #{length(items)} items")
{items, &process_item_workflow/2}
|> Workflow.apply_graft(context: %{batch_size: 2})
|> Oban.insert_all()
end
# Level 2: Per-item workflow functions
@doc """
Creates a workflow to process a single item.
This workflow will:
1. Process initial batch
2. Determine if more batches are needed
3. If needed, graft sub-workflows for remaining batches (Level 3)
"""
def process_item_workflow(item, %{batch_size: batch_size}) do
workflow_name = "nested_workflow_repro_item_#{item}"
Workflow.new(workflow_name: workflow_name)
|> Workflow.put_context(%{
item: item,
batch_size: batch_size
})
|> Workflow.add_cascade(:process_initial_batch, &process_initial_batch/1)
|> Workflow.add_cascade(:determine_remaining, &determine_remaining/1, deps: :process_initial_batch)
|> Workflow.add_graft(:process_remaining_batches, &graft_remaining_batches/1,
deps: [:process_initial_batch, :determine_remaining]
)
end
@doc """
Processes the initial batch for an item.
Simulates work that determines total work needed.
"""
def process_initial_batch(%{item: item, batch_size: batch_size}) do
# Simulate: item "A" has 5 total chunks, "B" has 3, "C" has 1
total_chunks =
case item do
"A" -> 5
"B" -> 3
"C" -> 1
end
processed = min(batch_size, total_chunks)
result = "item_#{item}_initial_batch_#{processed}"
Logger.info("process_initial_batch: #{result}")
%{
result: result,
total_chunks: total_chunks,
processed: processed,
needs_more: processed < total_chunks
}
end
@doc """
Determines what remaining work is needed.
"""
def determine_remaining(%{
process_initial_batch: %{total_chunks: total, processed: processed, needs_more: needs_more},
item: item
}) do
result = "item_#{item}_determined_#{total - processed}_remaining"
Logger.info("determine_remaining: #{result}")
%{
result: result,
remaining_chunks: total - processed,
needs_more: needs_more
}
end
@doc """
Grafts sub-workflows for remaining batches if needed.
This is Level 3 grafting - creates per-chunk workflows.
"""
def graft_remaining_batches(%{
process_initial_batch: %{processed: processed, total_chunks: total, needs_more: true},
item: item,
batch_size: batch_size
}) do
# Calculate chunks needed (similar to calculate_policy_chunks)
chunks =
Range.new(processed, total - 1, batch_size)
|> Enum.map(fn idx -> %{chunk_index: idx, size: batch_size} end)
Logger.info("graft_remaining_batches: item=#{item}, chunks=#{length(chunks)}")
# Level 3: Graft per-chunk workflows
{chunks, &process_chunk/2}
|> Workflow.apply_graft()
|> Oban.insert_all()
end
# No additional chunks needed
def graft_remaining_batches(%{process_initial_batch: %{needs_more: false}, item: item}) do
Logger.info("graft_remaining_batches: item=#{item}, no additional chunks needed")
# Return empty workflow
Workflow.new()
|> Oban.insert_all()
end
# Level 3: Per-chunk workflow functions
@doc """
Processes a single chunk within an item's workflow.
"""
def process_chunk(%{chunk_index: idx, size: size}, %{item: item}) do
result = "item_#{item}_chunk_#{idx}_size_#{size}"
Logger.info("process_chunk: #{result}")
%{result: result}
end
@doc """
Builds the main workflow.
This creates a 3-level nested workflow structure:
- Level 1: Main workflow (fetch items -> graft per-item)
- Level 2: Per-item workflows (process initial -> graft per-chunk)
- Level 3: Per-chunk workflows (process individual chunks)
"""
def workflow(opts \\ []) do
batch_size = Keyword.get(opts, :batch_size, 2)
Workflow.new(workflow_name: "nested_workflow_repro_main")
|> Workflow.put_context(%{batch_size: batch_size})
|> Workflow.add_cascade(:fetch_items, &fetch_items/1)
|> Workflow.add_graft(:process_items, &graft_per_item/1, deps: [:fetch_items])
end
@impl Oban.Pro.Worker
def process(%Oban.Job{args: args}) do
batch_size = Map.get(args, "batch_size", 2)
workflow(batch_size: batch_size)
|> Oban.insert_all()
:ok
end
end




















