Testing Oban.Pro.Relay in manual mode

I’m using Workflow and Batch, which are easy to test in manual mode using drain_jobs helper. However, the tests hang after adding Relay. It seems that Relay can be tested in inline mode, but that’s not an option for me since I rely on job.id.

Is there a way to set up an integration test that runs Workflow and Relay?

1 Like

Could you please share more details, how is job.id being a blocker?

On the other note, in our application we configured Oban.Rely to run on a separate Oban instance with a different DB schema (prefix). And that instance for Relay in tests is configured with testing: :inline. As a result, the code that exercises Oban.Relay always runs “inline”, without the need for drain_jobs.

I use job.id as part of the cache key to cache requests to external services, preventing them from being called twice on retry.

It would work, but having a separate DB schema and an instance of Oban solely for testing doesn’t feels right.

Running workflows with relay is trickier, but possible. There’s a start_supervised_oban/1 helper to start a test focused, isolated oban isntance.

This snippet will start an instance that runs jobs, but is still within the sandbox and can run async.

name = start_supervised_oban!(repo: MyApp.Repo, stage_interval: 5, queues: [default: 10])

Then use that name to insert jobs using that Oban instance so it’s connected to the same pubsub notifier (essential for Relay):

Relay.async(name, MyWorker.new(args))

Hopefully that helps! We’ll look at making relay work with drain_jobs to simplify this whole setup.

1 Like

I can’t pass the Oban name into Relay because it’s a controller test (unless I’m missing something).
I can use inline mode, but I really want to insert the job before executing it, so I have the job.id filled.

Will you please share a psuedo version of the code that inserts the workflow and the test? It would help to find a way to test with Relay, or figure out what changes need to make it into Pro testing.

My goal is to create a system that behaves similarly to https://temporal.io/, where you can describe your workflow and ensure durable execution.

In the example below, I want to avoid running prepare_shipment(order) twice if charge/2 returns {:error, :timeout} and the job retries.

def process(%Job{args: args}) do
  with :ok <- check_fraud(order.order_id, order.payment_info),
       {:ok, order} <- prepare_shipment(order),
       {:ok, order} <- charge(order.order_id, order.payment_info) do
    ship!(order)
  end
end

The initial idea was to use Oban.Pro.Workflows, where each step depending on the previous one.

Workflow.new()
|> Workflow.add(:check_fraud, CheckFraudWorker.new(%{order_id: order.id}))
|> Workflow.add(:prepare_shipment, PrepareShipmentWorker.new(%{order_id: order.id}), deps: [:check_fraud])
|> Workflow.add(:charge, ChargeWorker.new(%{order_id: order.id}), deps: [:prepare_shipment])
|> Workflow.add(:ship, ShipWorker.new(%{order_id: order.id}), deps: [:charge])

However, it’s tricky for complex workflows where one steps appends more jobs that should be completed before downstream dependencies. Also some steps depends on the data that can be passed directly or as the output from another worker (I’m aware of recorded jobs). It’s definitely possible with workflows, but a bit complex.

My second attempt is to cache each step to make retries idempotent.

def process(%Job{args: args}) do
  with :ok <- check_fraud(order.order_id, order.payment_info, job_id),
       {:ok, order} <- prepare_shipment(order, job_id),
       {:ok, order} = charge(order.order_id, order.payment_info, job_id) do
    ship!(order, job_id)
    Cache.remove(tag: job_id)
  end
end

def prepare_shipment(order, job_id) do
  key = "prepare_shipment_#{job_id}"

  Cache.fetch([key: key, tag: job_id, ttl: :timer.hours(72)]) do
    Shipment.prepare_shipment(order)
  end
end
# ...

And the last requirement is to have a simple way to control concurrency. I was thinking to leverage Oban.Pro.Relay and decorators

def prepare_shipment(order, job_id) do
  key = "prepare_shipment_#{job_id}"

  Cache.fetch([key: key, tag: job_id, ttl: :timer.hours(72)]) do
    Shipment.relay_prepare_shipment(order, max_attempts: 1, timeout: :timer.hours(1), queue: :prepare_shipment)
  end
end

It’s an unusual use of Relay, but it helps me avoid building my own UI to manage concurrency and implementing rate limiting.

The only problem is that Oban’s inline testing mode doesn’t insert jobs, so job_id is always nil, which causes my tests to fail.