Can't fan out workflows with cascade functions if they have a context

Here’s an example workflow that has a container sub-workflow that fans out multiple sub-workflows each of which contains a cascade function (pls don’t judge me)

defmodule Single do
alias Oban.Pro.Workflow
  
  def workflow() do
    Workflow.new()
    |> Workflow.put_context(%{hello: "world"})
    |> Workflow.add_cascade(:foo, &cascade/1)
  end
  
  def cascade(_context) do
    IO.inspect("Single fan-out cascade complete")
  end
end

defmodule Mothership do
  alias Oban.Pro.Workflow
	
  def workflow() do
    fan_out_wf =
      Enum.reduce(1..2, Workflow.new(), fn i, wf ->
        Workflow.add_workflow(wf, to_string(i), Single.workflow())
      end)
  
    Workflow.new()
    |> Workflow.add_workflow(:fan_out, fan_out_wf)
  end
end

When I run it:

Mothership.workflow() |> Oban.insert_all()

Jobs fail with this error:

** (Ecto.MultipleResultsError) expected at most one result but got 2 in query:

from j0 in Oban.Job,
  where: j0.state in [
  "scheduled",
  "available",
  "executing",
  "retryable",
  "completed",
  "discarded",
  "cancelled"
],
  where: fragment(
  "? ? 'workflow_id' AND ?->>'workflow_id' = ?",
  j0.meta,
  j0.meta,
  ^"019b1e4a-fb19-76d5-b41c-b4e83d2e79e0"
),
  where: j0.meta["name"] in ^["context"],
  order_by: [asc: j0.id],
  select: j0.meta

    (ecto 3.13.5) lib/ecto/repo/queryable.ex:166: Ecto.Repo.Queryable.one/3
    (oban_pro 1.6.9) lib/oban/pro/workflow.ex:2228: Oban.Pro.Workflow.get_recorded/3
    (oban_pro 1.6.9) lib/oban/pro/workflow/cascade.ex:39: Oban.Pro.Workflow.Cascade.all_context/2
    (oban_pro 1.6.9) lib/oban/pro/workflow/cascade.ex:23: Oban.Pro.Workflow.Cascade.process/1
    (oban_pro 1.6.9) lib/oban/pro/worker.ex:1156: Oban.Pro.Worker.process/3
    (oban 2.20.2) lib/oban/queue/executor.ex:143: Oban.Queue.Executor.perform/1
    (oban 2.20.2) lib/oban/queue/executor.ex:75: Oban.Queue.Executor.call/1
    (elixir 1.19.3) lib/task/supervised.ex:105: Task.Supervised.invoke_mfa/2
    (elixir 1.19.3) lib/task/supervised.ex:40: Task.Supervised.reply/4

Without put_context it’s working fine. Seems like a bug?

1 Like

Hmm I also run into the same error while trying to come up with a workaround using normal jobs, not cascade functions. The whole workflow composition must be problematic. Am I using it wrong?

1 Like

It’s likely that we can put in a fix to prevent the error, but based on the example, you’re using it slightly wrong. There’s no need to add a workflow to a workflow without any other jobs in it, just return the workflow:

  def workflow do
    Enum.reduce(1..2, Workflow.new(), fn i, wf ->
      Workflow.add_workflow(wf, to_string(i), Single.workflow())
    end)
  end

Fewer nested workflows are better when possible. We’ll work on a fix for the query issue regardless.

2 Likes

Thanks! I purposefully removed from the example as much as possible. The full version contains more stages, and looks more like this:

  def workflow(task_ids) do
    fan_out_wf =
      Enum.reduce(task_ids, Workflow.new(), fn task_id, wf ->
        Workflow.add_workflow(wf, task_id, Single.workflow(task_id))
      end)
  
    Workflow.new()
    |> Workflow.add(:prepare_fan_out, ...)
    |> Workflow.add_workflow(:fan_out, fan_out_wf, deps: :prepare_fan_out)
    |> Workflow.add(:process_results, ..., deps: :fan_out)
  end

The reason I wrap fan-out in a workflow is so that I could depend on the fan-out without explicitly collecting every fan out job id at workflow creation time, if this makes sense!

2 Likes

I am seeing the same thing with Oban Pro v1.6.9. Is the workaround to avoid a second Workflow.put_context and use traditional Workers to get the values into my jobs?

This would appear to be a pretty big issue preventing the composition of workflows. The way I see it, this prevents sub-workflows added via add_workflow or add_graft from being able to set context data received from the parent workflow on themselves.

Maybe I am misunderstanding how relevant data should be shared between workflows. Is there another, more canonical/idiomatic way? If not, is there an estimated timeline for a fix? At the moment, this is a blocker for my work.

We’re working on a fix currently and hope to have something ready in the next day or so. If not, I’ll suggest an alternative way to handle this.

Just checked this against 1.6.10 and this seems to be fixed

The original issue seems to be fixed. But I think it’s not over yet.

It looks like there’s an issue with the how the context is being shared with sub workflows.

Here’s a reproducer that Claude extracted from our use case that documents and demonstrates the issue:

Reproducer
# Oban Pro v1.6.10 - Nested Sub-Workflow Context Bug Reproducer
#
# Bug: When using `add_cascade` with fan-out tuples inside a sub-workflow,
# the `sup_workflow_id` of the fan-out jobs incorrectly points to the ROOT
# workflow instead of the immediate parent sub-workflow. This causes
# `Workflow.get_context/1` to return the wrong context.
#
# Expected: Fan-out jobs should have `sup_workflow_id` pointing to their
# immediate parent sub-workflow, so context lookup finds the correct context.
#
# Actual: Fan-out jobs have `sup_workflow_id` pointing to the root workflow,
# causing context lookup to skip the intermediate sub-workflow's context.

defmodule ReproducerWorkflows do
  @moduledoc """
  Minimal reproduction of nested sub-workflow context bug.
  
  Run with:
    mix run oban_pro_nested_context_bug.ex
  
  Or in iex:
    iex> ReproducerWorkflows.run()
  """

  use Oban.Pro.Decorator

  # ============================================================================
  # Inner Workflow (simulates AmazonDsp)
  # ============================================================================

  def inner_workflow do
    items = [1, 2]  # Fan-out over 2 items

    Oban.Pro.Workflow.new()
    |> Oban.Pro.Workflow.put_context(%{
      inner_secret: "I should be visible to fan-out jobs",
      oauth_connection_id: 123
    })
    |> Oban.Pro.Workflow.add_cascade(
      :step_one,
      &step_one/1,
      queue: :default
    )
    |> Oban.Pro.Workflow.add_cascade(
      :fan_out_step,
      {items, &fan_out_step/2},
      queue: :default,
      deps: [:step_one]
    )
    |> Oban.Pro.Workflow.add_cascade(
      :final_step,
      &final_step/1,
      queue: :default,
      deps: [:fan_out_step]
    )
  end

  # ============================================================================
  # Outer Workflow (simulates PublishCampaign)
  # ============================================================================

  def outer_workflow do
    Oban.Pro.Workflow.new()
    |> Oban.Pro.Workflow.put_context(%{
      outer_only: "I am from the outer workflow",
      campaign_id: 999
    })
    |> Oban.Pro.Workflow.add_workflow(:inner, inner_workflow())
    |> Oban.Pro.Workflow.add_cascade(
      :notify,
      &notify/1,
      queue: :default,
      deps: [:inner]
    )
  end

  # ============================================================================
  # Cascade Functions
  # ============================================================================

  @job queue: :default, recorded: true
  def step_one(ctx) do
    IO.puts("[step_one] Context keys: #{inspect(Map.keys(ctx))}")
    {:ok, %{step_one_result: "done"}}
  end

  @job queue: :default, recorded: true
  def fan_out_step(item, ctx) do
    # BUG: ctx will contain outer_only and campaign_id (from outer workflow)
    #      instead of inner_secret and oauth_connection_id (from inner workflow)
    
    IO.puts("[fan_out_step item=#{item}] Context keys: #{inspect(Map.keys(ctx))}")
    IO.puts("[fan_out_step item=#{item}] Has inner_secret? #{Map.has_key?(ctx, :inner_secret)}")
    IO.puts("[fan_out_step item=#{item}] Has outer_only? #{Map.has_key?(ctx, :outer_only)}")
    
    if Map.has_key?(ctx, :outer_only) and not Map.has_key?(ctx, :inner_secret) do
      IO.puts("\n!!! BUG REPRODUCED: fan_out_step received outer context instead of inner context !!!\n")
    end
    
    {:ok, %{item: item, processed: true}}
  end

  @job queue: :default, recorded: true
  def final_step(ctx) do
    IO.puts("[final_step] Context keys: #{inspect(Map.keys(ctx))}")
    {:ok, %{finalized: true}}
  end

  @job queue: :default, recorded: true
  def notify(ctx) do
    IO.puts("[notify] Context keys: #{inspect(Map.keys(ctx))}")
    {:ok, %{notified: true}}
  end

  # ============================================================================
  # Runner
  # ============================================================================

  def run do
    # Insert the workflow
    {:ok, jobs} = Oban.insert_all(outer_workflow())
    
    IO.puts("\n=== Inserted #{length(jobs)} jobs ===\n")
    
    # Find and display the fan-out jobs
    fan_out_jobs = Enum.filter(jobs, fn job ->
      job.args["fun"] == "fan_out_step"
    end)
    
    IO.puts("=== Fan-out job analysis ===\n")
    
    for job <- fan_out_jobs do
      IO.puts("Job #{job.id} (#{job.args["fun"]}):")
      IO.puts("  workflow_id:     #{job.meta["workflow_id"]}")
      IO.puts("  sup_workflow_id: #{job.meta["sup_workflow_id"]}")
      IO.puts("  sub_name:        #{job.meta["sub_name"]}")
      IO.puts("")
    end
    
    # Find context jobs to show which workflow_id has which context
    context_jobs = Enum.filter(jobs, fn job ->
      job.meta["context"] == true
    end)
    
    IO.puts("=== Context job analysis ===\n")
    
    for job <- context_jobs do
      # Decode the return value to show the context
      {:ok, decoded} = decode_return(job.meta["return"])
      
      IO.puts("Context job #{job.id}:")
      IO.puts("  workflow_id: #{job.meta["workflow_id"]}")
      IO.puts("  sub_name:    #{job.meta["sub_name"] || "(root)"}")
      IO.puts("  context:     #{inspect(decoded)}")
      IO.puts("")
    end
    
    IO.puts("""
    =============================================================================
    EXPECTED BEHAVIOR:
    - Fan-out jobs should have sup_workflow_id pointing to the "inner" sub-workflow
    - This would allow them to find the context with inner_secret and oauth_connection_id
    
    ACTUAL BEHAVIOR:
    - Fan-out jobs have sup_workflow_id pointing to the ROOT workflow
    - They find the outer context with outer_only and campaign_id instead
    =============================================================================
    """)
    
    {:ok, jobs}
  end
  
  defp decode_return(encoded) do
    # Oban Pro uses Base64-encoded ETF for recorded values
    encoded
    |> Base.decode64!()
    |> :erlang.binary_to_term()
    |> then(&{:ok, &1})
  rescue
    _ -> {:error, :decode_failed}
  end
end

# ============================================================================
# Instructions for running
# ============================================================================

IO.puts("""
================================================================================
Oban Pro v1.6.10 - Nested Sub-Workflow Context Bug Reproducer
================================================================================

To run this reproducer:

1. Add this file to your project
2. Run: mix run oban_pro_nested_context_bug.ex
   Or in iex: ReproducerWorkflows.run()

3. Observe that fan-out jobs have incorrect sup_workflow_id

To verify the bug at runtime:
1. Let the workflow execute
2. Check the logs - fan_out_step will report having outer_only instead of inner_secret

Environment:
- oban_pro ~> 1.6.10
- oban ~> 2.18 (or compatible version)
================================================================================
""")
1 Like

@alexbiehl Thanks for the reproduction, that helped. It wasn’t over yet! Fixed for the next patch.

1 Like

Awesome. Looking forward to give it a try.

That being said, I saw

[Workflow] Inherit all context for nested sub-workflows

When using add_cascade with fan-out inside nested sub-workflows, context from intermediate parent workflows was not accessible. Jobs would only see context from the outermost workflow, skipping any workflows in between.

Now context is correctly inherited through any depth of workflow nesting. Each level's context is merged in order from outermost to innermost, with closer ancestors overriding values from farther ones.

in the v1.6.11 — 2026-01-19 release.

I tested with the reproducer and it still fails to work. I guess the changelog is pointing to the original issue?

:disappointed_face: No, the changelog is pointing to an additional set of changes based on the reproducer and a new set of tests based on it. I’ll take a closer look at the reproducer and test it directly.

Cool, thank you. Any luck so far?

Yes, had luck there. It was a highly specific context. This is fixed for v1.6.13 next week :slightly_smiling_face:

1 Like