Hold/pause an oban job until param is ready

Is there any way to hold or pause an oban job until it’s param is ready?

I have one Oban job A in a workflow that is inserted but then another Oban job B from some other part of the system is immediately inserted with a param i.e. id which comes from an external API call from job A and inserted in DB. Since job A may not be finished before job B is inserted, there is a chance the id can be nil.

How to ensure that job B does not fail due to nil argument?

Thought about two different solutions -

  1. With Retry:
use Oban.Worker, max_attempts: 10

def perform(%Oban.Job{args: %{"id" => id}}) do
  case Repo.get(MyApp.MySchema, id) do
    %MySchema{external_data: nil} ->
      {:error, :not_ready}

    %MySchema{external_data: data} ->
      do_something_with(data)
      :ok
  end
end
  1. With Re-enqueue with delay:
def perform(%Oban.Job{args: %{"id" => id}}) do
  case Repo.get(MyApp.MySchema, id) do
    %MySchema{external_data: nil} ->
      # Data not ready, re-enqueue after a delay
      __MODULE__.new(%{"id" => id}, schedule_in: 30)
      |> Oban.insert()
      :ok

    %MySchema{external_data: data} ->
      # Data ready, process it
      do_something(data)
      :ok
  end
end

I am wondering which solution is preferable or if there is any other better solution in this case.

Thanks.

How about only equeueing job B inside job A after it fetches the necessary data?

That’s beyond my control. Only thing I can control is Job A will be enqueued before Job B. But I can’t enqueue Job B inside Job A.

Why in the world is that beyond your control? :thinking:

But OK, I suppose in this case you can just look for the piece of data under a specially named key inside a shared ETS table and re-check each 1s until the piece becomes available, or timeout and fail (this try of) the job after several retries and/or after a predefined maximum amount of time that job A should take.

Or you might just use Phoenix.PubSub.

1 Like

If the data you’re waiting on isn’t available yet, you can return {:snooze, snooze_seconds} and your job will run again after that delay. See: Oban.Worker — Oban v2.19.4

3 Likes

I found an alternative solution that works following documentation here - Oban.Pro.Relay — Oban Pro v1.6.0-rc.5

Basically, doing something like this -

  1..3
  |> Enum.map(&DoubleWorker.new(%{int: &1}))
  |> Enum.map(&Oban.Pro.Relay.async/1)
  |> Oban.Pro.Relay.await_many()

So instead of making any changes in Job B, it waits for completion of Job A. As documented this works using PubSub to transmit results.