Here’s an example workflow that has a container sub-workflow that fans out multiple sub-workflows each of which contains a cascade function (pls don’t judge me)
defmodule Single do
alias Oban.Pro.Workflow
def workflow() do
Workflow.new()
|> Workflow.put_context(%{hello: "world"})
|> Workflow.add_cascade(:foo, &cascade/1)
end
def cascade(_context) do
IO.inspect("Single fan-out cascade complete")
end
end
defmodule Mothership do
alias Oban.Pro.Workflow
def workflow() do
fan_out_wf =
Enum.reduce(1..2, Workflow.new(), fn i, wf ->
Workflow.add_workflow(wf, to_string(i), Single.workflow())
end)
Workflow.new()
|> Workflow.add_workflow(:fan_out, fan_out_wf)
end
end
When I run it:
Mothership.workflow() |> Oban.insert_all()
Jobs fail with this error:
** (Ecto.MultipleResultsError) expected at most one result but got 2 in query:
from j0 in Oban.Job,
where: j0.state in [
"scheduled",
"available",
"executing",
"retryable",
"completed",
"discarded",
"cancelled"
],
where: fragment(
"? ? 'workflow_id' AND ?->>'workflow_id' = ?",
j0.meta,
j0.meta,
^"019b1e4a-fb19-76d5-b41c-b4e83d2e79e0"
),
where: j0.meta["name"] in ^["context"],
order_by: [asc: j0.id],
select: j0.meta
(ecto 3.13.5) lib/ecto/repo/queryable.ex:166: Ecto.Repo.Queryable.one/3
(oban_pro 1.6.9) lib/oban/pro/workflow.ex:2228: Oban.Pro.Workflow.get_recorded/3
(oban_pro 1.6.9) lib/oban/pro/workflow/cascade.ex:39: Oban.Pro.Workflow.Cascade.all_context/2
(oban_pro 1.6.9) lib/oban/pro/workflow/cascade.ex:23: Oban.Pro.Workflow.Cascade.process/1
(oban_pro 1.6.9) lib/oban/pro/worker.ex:1156: Oban.Pro.Worker.process/3
(oban 2.20.2) lib/oban/queue/executor.ex:143: Oban.Queue.Executor.perform/1
(oban 2.20.2) lib/oban/queue/executor.ex:75: Oban.Queue.Executor.call/1
(elixir 1.19.3) lib/task/supervised.ex:105: Task.Supervised.invoke_mfa/2
(elixir 1.19.3) lib/task/supervised.ex:40: Task.Supervised.reply/4
Without put_context it’s working fine. Seems like a bug?






















