My goal is to create a system that behaves similarly to https://temporal.io/, where you can describe your workflow and ensure durable execution.
In the example below, I want to avoid running prepare_shipment(order)
twice if charge/2
returns {:error, :timeout}
and the job retries.
def process(%Job{args: args}) do
with :ok <- check_fraud(order.order_id, order.payment_info),
{:ok, order} <- prepare_shipment(order),
{:ok, order} <- charge(order.order_id, order.payment_info) do
ship!(order)
end
end
The initial idea was to use Oban.Pro.Workflows
, where each step depending on the previous one.
Workflow.new()
|> Workflow.add(:check_fraud, CheckFraudWorker.new(%{order_id: order.id}))
|> Workflow.add(:prepare_shipment, PrepareShipmentWorker.new(%{order_id: order.id}), deps: [:check_fraud])
|> Workflow.add(:charge, ChargeWorker.new(%{order_id: order.id}), deps: [:prepare_shipment])
|> Workflow.add(:ship, ShipWorker.new(%{order_id: order.id}), deps: [:charge])
However, it’s tricky for complex workflows where one steps appends more jobs that should be completed before downstream dependencies. Also some steps depends on the data that can be passed directly or as the output from another worker (I’m aware of recorded jobs). It’s definitely possible with workflows, but a bit complex.
My second attempt is to cache each step to make retries idempotent.
def process(%Job{args: args}) do
with :ok <- check_fraud(order.order_id, order.payment_info, job_id),
{:ok, order} <- prepare_shipment(order, job_id),
{:ok, order} = charge(order.order_id, order.payment_info, job_id) do
ship!(order, job_id)
Cache.remove(tag: job_id)
end
end
def prepare_shipment(order, job_id) do
key = "prepare_shipment_#{job_id}"
Cache.fetch([key: key, tag: job_id, ttl: :timer.hours(72)]) do
Shipment.prepare_shipment(order)
end
end
# ...
And the last requirement is to have a simple way to control concurrency. I was thinking to leverage Oban.Pro.Relay
and decorators
def prepare_shipment(order, job_id) do
key = "prepare_shipment_#{job_id}"
Cache.fetch([key: key, tag: job_id, ttl: :timer.hours(72)]) do
Shipment.relay_prepare_shipment(order, max_attempts: 1, timeout: :timer.hours(1), queue: :prepare_shipment)
end
end
It’s an unusual use of Relay
, but it helps me avoid building my own UI to manage concurrency and implementing rate limiting.
The only problem is that Oban’s inline testing mode doesn’t insert jobs, so job_id is always nil, which causes my tests to fail.