Can't fan out workflows with cascade functions if they have a context

Here’s an example workflow that has a container sub-workflow that fans out multiple sub-workflows each of which contains a cascade function (pls don’t judge me)

defmodule Single do
alias Oban.Pro.Workflow
  
  def workflow() do
    Workflow.new()
    |> Workflow.put_context(%{hello: "world"})
    |> Workflow.add_cascade(:foo, &cascade/1)
  end
  
  def cascade(_context) do
    IO.inspect("Single fan-out cascade complete")
  end
end

defmodule Mothership do
  alias Oban.Pro.Workflow
	
  def workflow() do
    fan_out_wf =
      Enum.reduce(1..2, Workflow.new(), fn i, wf ->
        Workflow.add_workflow(wf, to_string(i), Single.workflow())
      end)
  
    Workflow.new()
    |> Workflow.add_workflow(:fan_out, fan_out_wf)
  end
end

When I run it:

Mothership.workflow() |> Oban.insert_all()

Jobs fail with this error:

** (Ecto.MultipleResultsError) expected at most one result but got 2 in query:

from j0 in Oban.Job,
  where: j0.state in [
  "scheduled",
  "available",
  "executing",
  "retryable",
  "completed",
  "discarded",
  "cancelled"
],
  where: fragment(
  "? ? 'workflow_id' AND ?->>'workflow_id' = ?",
  j0.meta,
  j0.meta,
  ^"019b1e4a-fb19-76d5-b41c-b4e83d2e79e0"
),
  where: j0.meta["name"] in ^["context"],
  order_by: [asc: j0.id],
  select: j0.meta

    (ecto 3.13.5) lib/ecto/repo/queryable.ex:166: Ecto.Repo.Queryable.one/3
    (oban_pro 1.6.9) lib/oban/pro/workflow.ex:2228: Oban.Pro.Workflow.get_recorded/3
    (oban_pro 1.6.9) lib/oban/pro/workflow/cascade.ex:39: Oban.Pro.Workflow.Cascade.all_context/2
    (oban_pro 1.6.9) lib/oban/pro/workflow/cascade.ex:23: Oban.Pro.Workflow.Cascade.process/1
    (oban_pro 1.6.9) lib/oban/pro/worker.ex:1156: Oban.Pro.Worker.process/3
    (oban 2.20.2) lib/oban/queue/executor.ex:143: Oban.Queue.Executor.perform/1
    (oban 2.20.2) lib/oban/queue/executor.ex:75: Oban.Queue.Executor.call/1
    (elixir 1.19.3) lib/task/supervised.ex:105: Task.Supervised.invoke_mfa/2
    (elixir 1.19.3) lib/task/supervised.ex:40: Task.Supervised.reply/4

Without put_context it’s working fine. Seems like a bug?

1 Like

Hmm I also run into the same error while trying to come up with a workaround using normal jobs, not cascade functions. The whole workflow composition must be problematic. Am I using it wrong?

1 Like

It’s likely that we can put in a fix to prevent the error, but based on the example, you’re using it slightly wrong. There’s no need to add a workflow to a workflow without any other jobs in it, just return the workflow:

  def workflow do
    Enum.reduce(1..2, Workflow.new(), fn i, wf ->
      Workflow.add_workflow(wf, to_string(i), Single.workflow())
    end)
  end

Fewer nested workflows are better when possible. We’ll work on a fix for the query issue regardless.

2 Likes

Thanks! I purposefully removed from the example as much as possible. The full version contains more stages, and looks more like this:

  def workflow(task_ids) do
    fan_out_wf =
      Enum.reduce(task_ids, Workflow.new(), fn task_id, wf ->
        Workflow.add_workflow(wf, task_id, Single.workflow(task_id))
      end)
  
    Workflow.new()
    |> Workflow.add(:prepare_fan_out, ...)
    |> Workflow.add_workflow(:fan_out, fan_out_wf, deps: :prepare_fan_out)
    |> Workflow.add(:process_results, ..., deps: :fan_out)
  end

The reason I wrap fan-out in a workflow is so that I could depend on the fan-out without explicitly collecting every fan out job id at workflow creation time, if this makes sense!

2 Likes