Oban await_signal with a Workflow.add_many?

Does the Worker.await_signal work with Workflow.add_many?

I’d like to wait for every job in the add_many to complete (including awaiting_for_signal) before the next workflow step runs.

I’ve tried it by having each job in the add_many awaiting a signal, and the next workflow step depends on that, but it never runs and is forever stuck as suspended

The add_many function builds a sub-workflow which the next step depends on. The fact that one or more of those jobs are awaiting a signal shouldn’t change dependency resolution for the rest of the workflow. Can you share a minimal example of your workflow and how you’re testing/verifying it?

I’m rewriting a workflow that was previously doing some janky stuff with genservers while waiting for order status webhooks, to try and take advantage of signals instead.

defmodule OrderWorkflow do
  def new_workflow(context) do
    Workflow.new()
    |> Workflow.put_context(context)
    |> Workflow.add_cascade(:orders, &create_orders/1)
    |> Workflow.add_graft(:send, &place_orders/1, deps: [:create]
  end

  def create_orders(context) do
    {:ok, orders} = Foo.create_orders_from_position_changes(context.position_changes)

    {sell_order_ids, buy_order_ids} =
      Enum.reduce(orders, {[], []}, fn
        %{status: :IGNORED}, acc -> acc
        %{side: :SELL, id: id}, {sells, buys} -> {[id | sells], buys}
        %{side: :BUY, id: id}, {sells, buys} -> {sells, [id | buys]}
      end)

    {:ok, {sell_order_ids, buy_order_ids}}
  end

  def place_orders(context) do
    %{orders: {sell_order_ids, buy_order_ids}} = context

    sell_order_jobs =
      Enum.map(sell_order_ids, &PlaceOrderWorker.new(%{order_id: &1}))

    buy_order_jobs =
      Enum.map(buy_order_ids, &PlaceOrderWorker.new(%{order_id: &1}))

    Workflow.new()
    |> Workflow.add_many(:sells, sell_order_jobs)
    |> Workflow.add_many(:buys, buy_order_jobs, deps: :sells)
    |> Workflow.apply_graft()
    |> Oban.insert_all()
  end
end
defmodule PlaceOrderWorker do
  use Oban.Pro.Worker

  args_schema do
    field :order_id, :binary_id, required: true
  end

  @impl Oban.Pro.Worker
  def process(%{args: %{order_id: order_id}, id: job_id}) do
    place_order(order_id, job_id)
  end

  defp place_order(order, job_id) do
    with {:ok, _body} <- Foo.place_order(order),
         {:ok, order} <- Foo.mark_order_as_sent(order, %{job_id: job_id}) do
      wait_for_status(order, job_id)
    end
  end

  defp wait_for_status(%Order{} = order, job_id) do
    case Oban.Pro.Worker.await_signal(wait_for: {1, :minutes}) do
      {:ok, %{status: :FILLED}} -> {:ok, order.id}
      {:ok, %{status: :FAILED, attempts: 3}} -> {:cancel, "failed"}
      {:ok, %{status: :FAILED} -> place_order(order.id, job_id)
      {:error, :timeout} -> {:cancel, "timeout error [#{order.id}]"}
    end
  end
end

In this example (much simplified from actual code), the buy_order_jobs are suspended, even though the sell_order_jobs complete successfully.

In this case, the sells must complete before the buys to ensure there is enough cash for the buys.

The workflows look straightforward. Before I try to reproduce with a test case, how are you verifying it? In development, with an automated test? If it’s with a test, can you share what the test looks like?

I haven’t written tests yet. I’m running the workflow and looking at Oban web, and iex

This is most likely because you’re scoping the signal to the parent workflow, and add_many creates a sub workflow. If you use the sub_workflow_id instead it should find the correct dependency. However, this seems like it should work intuitively and we’ll fix it in a patch.

I’m saving job_id on the orders table and sending the signal to the job_id, though…

The sells jobs are getting marked as complete in oban web, but the next workflow step jobs (buys) just stay in suspended.

There goes that hypothesis then :slightly_smiling_face:

You said this was modified from an existing workflow; did it have the same shape? From my investigation, the problem appears to be from the competing add_many/3,4 calls as part of a graft. It seems like a workflow bug unrelated to the use of signals.

Will you give this variant a try (using maps instead of lists for the sells/buys):

  sells = Map.new(sell_order_ids, &{"sell_#{&1}", PlaceOrderWorker.new(%{order_id: &1})})
  buys = Map.new(buy_order_ids,  &{"buy_#{&1}", PlaceOrderWorker.new(%{order_id: &1})})

  Workflow.new()
  |> Workflow.add_many(:sells, sells)
  |> Workflow.add_many(:buys, buys, deps: :sells)
  |> Workflow.apply_graft()
  |> Oban.insert_all()

Thanks, I will give this a try.

I was using two reduce functions before, because add_many behaved weirdly when I first wrote the workflow. However, I saw it was potentially fixed in 1.6.5, so I went with that.

Reproduced the issue synthetically and applied a patch for the add_many + apply_graft case. That will be in the next patch, but the alternate version I suggested above should achieve the same result.