Oban — Reliable and Observable Job Processing

This can definitely be achieved in Oban. Unlike Exq (or most any of the other libraries) you have complete control over how your jobs are inserted into the database.

First, a quick definition of “unique jobs” because in my experience the meaning can be confusing. Usually the uniqueness only applies to jobs that are in the queue. That means you can prevent putting two jobs with the same arguments in the queue at the same time, but it doesn’t prevent multiple jobs with the same arguments within a window of time. You mentioned this difference below, but I wanted to call it out for anybody that may not be familiar with the feature.

Here are some ways I can think of to implement unique jobs with different types of guarantees.

1. Partial Index

Add an index for the worker that you want to enforce uniqueness for:

create index(
  :oban_jobs,
  [:worker, :args],
  unique: true,
  where: "worker = 'MyApp.Worker' and state in ('available', 'scheduled')"
)

Then, pass on_conflict: :nothing as options to Repo.insert/2 and it will ensure you don’t have duplicate jobs.

2. Insert Helper

Create a helper inside your Repo that will check for existing jobs before trying to insert:

def insert_unique(changeset, opts \\ []) do
  worker = get_change(changeset, :worker)

  case get_by(Oban.Job, worker: worker, state: "scheduled") do
    nil ->
      insert(changeset, opts)

    _job ->
      {:ignored, changeset}
  end
end

That function is a bit rough, but you get the idea.

Neither of these are as convenient as declaring it in the worker, i.e. use Oban.Worker, unique_for: :timer.minutes(1). I’ll think about this some more!

Honestly, I hadn’t considered first class tags before. This can be accomplished by providing additional fields to args.

Here we are adding an additional tag value to the args:

%{id: record.id, tag: "dependent"}
|> Oban.Job.new()
|> MyApp.Repo.insert()

Later, if the record is deleted we can also delete any dependent jobs within the same transaction:

dependents =
  Oban.Job
  |> where([j]. j.state in ["available", "scheduled"])
  |> where([j], fragment("?->>'tag' = 'dependent' and ?->>'id' = ?::text", j.args, ^record.id)

Ecto.Multi.new()
|> Ecto.Multi.delete(:delete, record)
|> Ecto.Multi.delete_all(:jobs, dependents)
|> MyApp.Repo.transaction()

I hope those ideas are somewhat helpful. Thanks for checking out Oban :slight_smile:

7 Likes