Here’s an example workflow that has a container sub-workflow that fans out multiple sub-workflows each of which contains a cascade function (pls don’t judge me)
defmodule Single do
alias Oban.Pro.Workflow
def workflow() do
Workflow.new()
|> Workflow.put_context(%{hello: "world"})
|> Workflow.add_cascade(:foo, &cascade/1)
end
def cascade(_context) do
IO.inspect("Single fan-out cascade complete")
end
end
defmodule Mothership do
alias Oban.Pro.Workflow
def workflow() do
fan_out_wf =
Enum.reduce(1..2, Workflow.new(), fn i, wf ->
Workflow.add_workflow(wf, to_string(i), Single.workflow())
end)
Workflow.new()
|> Workflow.add_workflow(:fan_out, fan_out_wf)
end
end
When I run it:
Mothership.workflow() |> Oban.insert_all()
Jobs fail with this error:
** (Ecto.MultipleResultsError) expected at most one result but got 2 in query:
from j0 in Oban.Job,
where: j0.state in [
"scheduled",
"available",
"executing",
"retryable",
"completed",
"discarded",
"cancelled"
],
where: fragment(
"? ? 'workflow_id' AND ?->>'workflow_id' = ?",
j0.meta,
j0.meta,
^"019b1e4a-fb19-76d5-b41c-b4e83d2e79e0"
),
where: j0.meta["name"] in ^["context"],
order_by: [asc: j0.id],
select: j0.meta
(ecto 3.13.5) lib/ecto/repo/queryable.ex:166: Ecto.Repo.Queryable.one/3
(oban_pro 1.6.9) lib/oban/pro/workflow.ex:2228: Oban.Pro.Workflow.get_recorded/3
(oban_pro 1.6.9) lib/oban/pro/workflow/cascade.ex:39: Oban.Pro.Workflow.Cascade.all_context/2
(oban_pro 1.6.9) lib/oban/pro/workflow/cascade.ex:23: Oban.Pro.Workflow.Cascade.process/1
(oban_pro 1.6.9) lib/oban/pro/worker.ex:1156: Oban.Pro.Worker.process/3
(oban 2.20.2) lib/oban/queue/executor.ex:143: Oban.Queue.Executor.perform/1
(oban 2.20.2) lib/oban/queue/executor.ex:75: Oban.Queue.Executor.call/1
(elixir 1.19.3) lib/task/supervised.ex:105: Task.Supervised.invoke_mfa/2
(elixir 1.19.3) lib/task/supervised.ex:40: Task.Supervised.reply/4
Without put_context it’s working fine. Seems like a bug?
Hmm I also run into the same error while trying to come up with a workaround using normal jobs, not cascade functions. The whole workflow composition must be problematic. Am I using it wrong?
It’s likely that we can put in a fix to prevent the error, but based on the example, you’re using it slightly wrong. There’s no need to add a workflow to a workflow without any other jobs in it, just return the workflow:
def workflow do
Enum.reduce(1..2, Workflow.new(), fn i, wf ->
Workflow.add_workflow(wf, to_string(i), Single.workflow())
end)
end
Fewer nested workflows are better when possible. We’ll work on a fix for the query issue regardless.
The reason I wrap fan-out in a workflow is so that I could depend on the fan-out without explicitly collecting every fan out job id at workflow creation time, if this makes sense!
I am seeing the same thing with Oban Pro v1.6.9. Is the workaround to avoid a second Workflow.put_context and use traditional Workers to get the values into my jobs?
This would appear to be a pretty big issue preventing the composition of workflows. The way I see it, this prevents sub-workflows added via add_workflow or add_graft from being able to set context data received from the parent workflow on themselves.
Maybe I am misunderstanding how relevant data should be shared between workflows. Is there another, more canonical/idiomatic way? If not, is there an estimated timeline for a fix? At the moment, this is a blocker for my work.
The original issue seems to be fixed. But I think it’s not over yet.
It looks like there’s an issue with the how the context is being shared with sub workflows.
Here’s a reproducer that Claude extracted from our use case that documents and demonstrates the issue:
Reproducer
# Oban Pro v1.6.10 - Nested Sub-Workflow Context Bug Reproducer
#
# Bug: When using `add_cascade` with fan-out tuples inside a sub-workflow,
# the `sup_workflow_id` of the fan-out jobs incorrectly points to the ROOT
# workflow instead of the immediate parent sub-workflow. This causes
# `Workflow.get_context/1` to return the wrong context.
#
# Expected: Fan-out jobs should have `sup_workflow_id` pointing to their
# immediate parent sub-workflow, so context lookup finds the correct context.
#
# Actual: Fan-out jobs have `sup_workflow_id` pointing to the root workflow,
# causing context lookup to skip the intermediate sub-workflow's context.
defmodule ReproducerWorkflows do
@moduledoc """
Minimal reproduction of nested sub-workflow context bug.
Run with:
mix run oban_pro_nested_context_bug.ex
Or in iex:
iex> ReproducerWorkflows.run()
"""
use Oban.Pro.Decorator
# ============================================================================
# Inner Workflow (simulates AmazonDsp)
# ============================================================================
def inner_workflow do
items = [1, 2] # Fan-out over 2 items
Oban.Pro.Workflow.new()
|> Oban.Pro.Workflow.put_context(%{
inner_secret: "I should be visible to fan-out jobs",
oauth_connection_id: 123
})
|> Oban.Pro.Workflow.add_cascade(
:step_one,
&step_one/1,
queue: :default
)
|> Oban.Pro.Workflow.add_cascade(
:fan_out_step,
{items, &fan_out_step/2},
queue: :default,
deps: [:step_one]
)
|> Oban.Pro.Workflow.add_cascade(
:final_step,
&final_step/1,
queue: :default,
deps: [:fan_out_step]
)
end
# ============================================================================
# Outer Workflow (simulates PublishCampaign)
# ============================================================================
def outer_workflow do
Oban.Pro.Workflow.new()
|> Oban.Pro.Workflow.put_context(%{
outer_only: "I am from the outer workflow",
campaign_id: 999
})
|> Oban.Pro.Workflow.add_workflow(:inner, inner_workflow())
|> Oban.Pro.Workflow.add_cascade(
:notify,
¬ify/1,
queue: :default,
deps: [:inner]
)
end
# ============================================================================
# Cascade Functions
# ============================================================================
@job queue: :default, recorded: true
def step_one(ctx) do
IO.puts("[step_one] Context keys: #{inspect(Map.keys(ctx))}")
{:ok, %{step_one_result: "done"}}
end
@job queue: :default, recorded: true
def fan_out_step(item, ctx) do
# BUG: ctx will contain outer_only and campaign_id (from outer workflow)
# instead of inner_secret and oauth_connection_id (from inner workflow)
IO.puts("[fan_out_step item=#{item}] Context keys: #{inspect(Map.keys(ctx))}")
IO.puts("[fan_out_step item=#{item}] Has inner_secret? #{Map.has_key?(ctx, :inner_secret)}")
IO.puts("[fan_out_step item=#{item}] Has outer_only? #{Map.has_key?(ctx, :outer_only)}")
if Map.has_key?(ctx, :outer_only) and not Map.has_key?(ctx, :inner_secret) do
IO.puts("\n!!! BUG REPRODUCED: fan_out_step received outer context instead of inner context !!!\n")
end
{:ok, %{item: item, processed: true}}
end
@job queue: :default, recorded: true
def final_step(ctx) do
IO.puts("[final_step] Context keys: #{inspect(Map.keys(ctx))}")
{:ok, %{finalized: true}}
end
@job queue: :default, recorded: true
def notify(ctx) do
IO.puts("[notify] Context keys: #{inspect(Map.keys(ctx))}")
{:ok, %{notified: true}}
end
# ============================================================================
# Runner
# ============================================================================
def run do
# Insert the workflow
{:ok, jobs} = Oban.insert_all(outer_workflow())
IO.puts("\n=== Inserted #{length(jobs)} jobs ===\n")
# Find and display the fan-out jobs
fan_out_jobs = Enum.filter(jobs, fn job ->
job.args["fun"] == "fan_out_step"
end)
IO.puts("=== Fan-out job analysis ===\n")
for job <- fan_out_jobs do
IO.puts("Job #{job.id} (#{job.args["fun"]}):")
IO.puts(" workflow_id: #{job.meta["workflow_id"]}")
IO.puts(" sup_workflow_id: #{job.meta["sup_workflow_id"]}")
IO.puts(" sub_name: #{job.meta["sub_name"]}")
IO.puts("")
end
# Find context jobs to show which workflow_id has which context
context_jobs = Enum.filter(jobs, fn job ->
job.meta["context"] == true
end)
IO.puts("=== Context job analysis ===\n")
for job <- context_jobs do
# Decode the return value to show the context
{:ok, decoded} = decode_return(job.meta["return"])
IO.puts("Context job #{job.id}:")
IO.puts(" workflow_id: #{job.meta["workflow_id"]}")
IO.puts(" sub_name: #{job.meta["sub_name"] || "(root)"}")
IO.puts(" context: #{inspect(decoded)}")
IO.puts("")
end
IO.puts("""
=============================================================================
EXPECTED BEHAVIOR:
- Fan-out jobs should have sup_workflow_id pointing to the "inner" sub-workflow
- This would allow them to find the context with inner_secret and oauth_connection_id
ACTUAL BEHAVIOR:
- Fan-out jobs have sup_workflow_id pointing to the ROOT workflow
- They find the outer context with outer_only and campaign_id instead
=============================================================================
""")
{:ok, jobs}
end
defp decode_return(encoded) do
# Oban Pro uses Base64-encoded ETF for recorded values
encoded
|> Base.decode64!()
|> :erlang.binary_to_term()
|> then(&{:ok, &1})
rescue
_ -> {:error, :decode_failed}
end
end
# ============================================================================
# Instructions for running
# ============================================================================
IO.puts("""
================================================================================
Oban Pro v1.6.10 - Nested Sub-Workflow Context Bug Reproducer
================================================================================
To run this reproducer:
1. Add this file to your project
2. Run: mix run oban_pro_nested_context_bug.ex
Or in iex: ReproducerWorkflows.run()
3. Observe that fan-out jobs have incorrect sup_workflow_id
To verify the bug at runtime:
1. Let the workflow execute
2. Check the logs - fan_out_step will report having outer_only instead of inner_secret
Environment:
- oban_pro ~> 1.6.10
- oban ~> 2.18 (or compatible version)
================================================================================
""")
[Workflow] Inherit all context for nested sub-workflows
When using add_cascade with fan-out inside nested sub-workflows, context from intermediate parent workflows was not accessible. Jobs would only see context from the outermost workflow, skipping any workflows in between.
Now context is correctly inherited through any depth of workflow nesting. Each level's context is merged in order from outermost to innermost, with closer ancestors overriding values from farther ones.
in the v1.6.11 — 2026-01-19 release.
I tested with the reproducer and it still fails to work. I guess the changelog is pointing to the original issue?
No, the changelog is pointing to an additional set of changes based on the reproducer and a new set of tests based on it. I’ll take a closer look at the reproducer and test it directly.