Does the Worker.await_signal work with Workflow.add_many?
I’d like to wait for every job in the add_many to complete (including awaiting_for_signal) before the next workflow step runs.
Does the Worker.await_signal work with Workflow.add_many?
I’d like to wait for every job in the add_many to complete (including awaiting_for_signal) before the next workflow step runs.
I’ve tried it by having each job in the add_many awaiting a signal, and the next workflow step depends on that, but it never runs and is forever stuck as suspended…
The add_many function builds a sub-workflow which the next step depends on. The fact that one or more of those jobs are awaiting a signal shouldn’t change dependency resolution for the rest of the workflow. Can you share a minimal example of your workflow and how you’re testing/verifying it?
I’m rewriting a workflow that was previously doing some janky stuff with genservers while waiting for order status webhooks, to try and take advantage of signals instead.
defmodule OrderWorkflow do
def new_workflow(context) do
Workflow.new()
|> Workflow.put_context(context)
|> Workflow.add_cascade(:orders, &create_orders/1)
|> Workflow.add_graft(:send, &place_orders/1, deps: [:create]
end
def create_orders(context) do
{:ok, orders} = Foo.create_orders_from_position_changes(context.position_changes)
{sell_order_ids, buy_order_ids} =
Enum.reduce(orders, {[], []}, fn
%{status: :IGNORED}, acc -> acc
%{side: :SELL, id: id}, {sells, buys} -> {[id | sells], buys}
%{side: :BUY, id: id}, {sells, buys} -> {sells, [id | buys]}
end)
{:ok, {sell_order_ids, buy_order_ids}}
end
def place_orders(context) do
%{orders: {sell_order_ids, buy_order_ids}} = context
sell_order_jobs =
Enum.map(sell_order_ids, &PlaceOrderWorker.new(%{order_id: &1}))
buy_order_jobs =
Enum.map(buy_order_ids, &PlaceOrderWorker.new(%{order_id: &1}))
Workflow.new()
|> Workflow.add_many(:sells, sell_order_jobs)
|> Workflow.add_many(:buys, buy_order_jobs, deps: :sells)
|> Workflow.apply_graft()
|> Oban.insert_all()
end
end
defmodule PlaceOrderWorker do
use Oban.Pro.Worker
args_schema do
field :order_id, :binary_id, required: true
end
@impl Oban.Pro.Worker
def process(%{args: %{order_id: order_id}, id: job_id}) do
place_order(order_id, job_id)
end
defp place_order(order, job_id) do
with {:ok, _body} <- Foo.place_order(order),
{:ok, order} <- Foo.mark_order_as_sent(order, %{job_id: job_id}) do
wait_for_status(order, job_id)
end
end
defp wait_for_status(%Order{} = order, job_id) do
case Oban.Pro.Worker.await_signal(wait_for: {1, :minutes}) do
{:ok, %{status: :FILLED}} -> {:ok, order.id}
{:ok, %{status: :FAILED, attempts: 3}} -> {:cancel, "failed"}
{:ok, %{status: :FAILED} -> place_order(order.id, job_id)
{:error, :timeout} -> {:cancel, "timeout error [#{order.id}]"}
end
end
end
In this example (much simplified from actual code), the buy_order_jobs are suspended, even though the sell_order_jobs complete successfully.
In this case, the sells must complete before the buys to ensure there is enough cash for the buys.
The workflows look straightforward. Before I try to reproduce with a test case, how are you verifying it? In development, with an automated test? If it’s with a test, can you share what the test looks like?
I haven’t written tests yet. I’m running the workflow and looking at Oban web, and iex
This is most likely because you’re scoping the signal to the parent workflow, and add_many creates a sub workflow. If you use the sub_workflow_id instead it should find the correct dependency. However, this seems like it should work intuitively and we’ll fix it in a patch.
I’m saving job_id on the orders table and sending the signal to the job_id, though…
The sells jobs are getting marked as complete in oban web, but the next workflow step jobs (buys) just stay in suspended.
There goes that hypothesis then ![]()
You said this was modified from an existing workflow; did it have the same shape? From my investigation, the problem appears to be from the competing add_many/3,4 calls as part of a graft. It seems like a workflow bug unrelated to the use of signals.
Will you give this variant a try (using maps instead of lists for the sells/buys):
sells = Map.new(sell_order_ids, &{"sell_#{&1}", PlaceOrderWorker.new(%{order_id: &1})})
buys = Map.new(buy_order_ids, &{"buy_#{&1}", PlaceOrderWorker.new(%{order_id: &1})})
Workflow.new()
|> Workflow.add_many(:sells, sells)
|> Workflow.add_many(:buys, buys, deps: :sells)
|> Workflow.apply_graft()
|> Oban.insert_all()
Thanks, I will give this a try.
I was using two reduce functions before, because add_many behaved weirdly when I first wrote the workflow. However, I saw it was potentially fixed in 1.6.5, so I went with that.
Reproduced the issue synthetically and applied a patch for the add_many + apply_graft case. That will be in the next patch, but the alternate version I suggested above should achieve the same result.