Setting up proper uniqueness options for Queue in Oban

Hi everyone,

I’m currently working on a project using Oban Pro 1.4.0 with Smart Engine, and I need some guidance on configuring a queue with multiple workers. Specifically, I have the following requirements:

  1. The queue should execute a maximum of 10 jobs per node.
  2. It should not execute more than one unique worker with the same arguments globally (i.e., no more than one instance of the same worker/arguments combination should run simultaneously across the entire cluster).
  3. If a worker with the same arguments is already executing, any new job with the same worker/arguments combination should be placed in the available or scheduled state, so it can be processed immediately after the current one finishes.

Here’s what I’ve tried so far:

Oban Queue Configuration:

my_queue: [
  local_limit: 10,
  global_limit: [allowed: 1, partition: [fields: [:args, :worker]]]
]

Worker Configuration:

use Oban.Worker,
  queue: :my_queue,
  unique: [
    fields: [:args, :worker],
    states: [:available, :scheduled, :retryable],
    period: :infinity
  ]

However, when running tests, I noticed that if I enqueue the same worker with the same arguments 15 times, it starts executing 10 workers and puts 1 in the available state. I expected it to execute 1 worker and also put 1 in the available state since it’s the same worker and arguments.

In contrast, if I enqueue 15 different unique worker/arguments combinations, I would expect it to start executing 10 jobs and puts the remaining 5 in the available state.

I’m running multiple different workers for the same queue, so I can’t rely only on args or only on the worker; I need to rely on both.

Is it possible to configure Oban in this way? If so, what adjustments do I need to make?

Thanks for your help!

In your worker configuration the states list doesn’t include executing, so an executing job is no longer considered unique and you’ll end up with multiples.

That’s fine, global partitioning by worker and args is possible. However, I suggest using an explicit list of keys whenever possible for predictability and performance.

[allowed: 1, partition: [:worker, args: :some_key]]

Based on your criteria it sounds like you might want to use the Chain worker rather than uniqueness.

Chain workers link jobs together to ensure they run in a strict sequential order. Downstream jobs won’t execute until the upstream job is completed, cancelled, or discarded. Behaviour in the event of cancellation or discards is customizable to allow for uninterrupted processing, holding for outside intervention, or cascading cancellation.

Jobs in a chain only run after the previous job completes successfully, regardless of snoozing or retries.

Chains use the same partitioning format as queues, so optimally you’ll match the options to ensure only one job in a chain runs at once. There’s more in the Optimizing Chains section of the module docs.

Thank you for the answer.

I don’t specifically need the jobs to run in a particular order; my main requirement is to ensure that the same worker/args does not run concurrently.

Most of these workers index data in Elasticsearch after specific events occur in the app. Some workers might run for several minutes, and during this time, there could be multiple updates. In such cases, I need to schedule another indexing worker, but only the latest update is relevant. Therefore, I don’t need to run other indexing workers that might have been scheduled in the meantime.

Consider a simple worker:

defmodule SimpleWorker do
  use Oban.Worker,
    queue: :my_queue,
    unique: [
      fields: [:args, :worker],
      states: [:available, :scheduled, :retryable],
      period: :infinity
    ]

  @spec perform(Oban.Job.t()) :: :ok
  def perform(%Oban.Job{args: %{"client_id" => client_id} = _args}) do
    :timer.sleep(10000)

    :ok
  end
end

This is how I test this scenario:
[1, 2, 3] |> Enum.each(fn cid -> for i <- 1..10, do: Oban.enqueue(SimpleWorker, %{client_id: cid}) end)

When I added executing to states in the configuration, what happened was that SimpleWorker with client_id 1, 2, 3 started executing. There were no others in the executing state (so 3 concurrent total), but also there were no jobs waiting in the available state. My requirements are to additionally put SimpleWorker with client_id 1, 2, 3 in the available state so that when the current executing ones finish, they will just start working.

If I understand the unique option correctly, it searches for the specific worker/args combination in all states. So, if there is one in the executing state, it won’t create another one in the available state. Ideally, for my case, Oban would check for uniqueness separately for each state. This way, if there is currently an executing worker, it won’t start executing another one and instead “wait” in the available state, it won’t add another one with the same args.

Is there a way to configure Oban to handle this scenario?

Thanks again for your help!

It can be used to that effect, but there’s an easier way.

The use case you’re describing can be most easily accomplished by debouncing. Set the unique period to a shorter period, and then insert the jobs in a scheduled state. That will prevent accumulating a string of jobs, so you’ll have one executing and then another one ready to run next.

Here’s a tweak on the worker you shared above:

defmodule SimpleWorker do
  use Oban.Worker,
    queue: :my_queue,
    unique: [states: [:available, :scheduled, :retryable], period: 30]

  @impl Oban.Worker
  def perform(%Job{args: %{"client_id" => client_id}}) do
    Process.sleep(10_000)

    :ok
  end
end

Then build new workers with SimpleWorker.new(%{}, schedule_in: 30). Set the period to prevent overlapping jobs, and you don’t even need the global partitioning because there’s only one of each job anyhow.

Thank you very much, I will collect all the info and try to craft the appropriate solution for my needs :slight_smile:

1 Like

Sorry to resurrect this, but would your approach not work with a period of :infinity. As in, does the uniqueness period have to be 30 or does the thing that really matter here the fact that the job is being scheduled in 30 seconds?

If infinity doesn’t work, why?

It may work, depending on your scenario. For the original problem of debouncing updates you have a good chance of losing updates with uniqueness set to :infinity.

Hm, I have a confusing scenario then. This is my worker config:

use Oban.Worker,
    max_attempts: 1,
    queue: :my_queue,
    unique: [
      period: :infinity, # now changed to 60
      states: [:available, :scheduled, :retryable],
      fields: [:worker, :queue, :args],
      keys: [:team_id, :type]
    ]

it’s what got me to this topic. Enqueuing multiple jobs with the same args in quick succession, scheduled in 60 seconds, ends up in multiple jobs running at the same time, even though their args is exactly the same.

I must be missing something obvious, but can’t for the life of me figure out what.

That’s because it doesn’t include the executing state. If another job is already running it will enqueue another.

In contrast to what I shared above, it’s least confusing to use the default states or at least include the incomplete states (available, scheduled, retryable, executing)

I’m using schedule_in: 60, though, and seeing multiple scheduled jobs spawning at the same time.

Uniqueness is only applied at insert time, not at runtime. Without chains or partitioned globally limited queues there’s nothing to automatically stop related jobs from running at the same time.

Yeah, I get that.

What I’m saying here is, because multiple processes try to insert the job at about the same time (single digit ms appart) apparently, uniqueness isn’t guaranteed at insert time either?

I ran an experiment of spawning 1000 parallel tasks, each inserting the same job with the same arguments, and while not 1000 get inserted, it’s definitely in the range of several tens of them.

I guess i understand why that’s the case, but I’m wondering if I’m missing something.

The chain option might at least allow me they don’t execute at the same time, even if they are duplicate inserts, right?

Uniqueness in OSS, and Pro prior to v1.5, is best effort. It uses advisory locks and queries, but competing transactions can allow duplicates in from race conditions. That’s changed in Pro v1.5, which uses a unique index for enforcement, so it isn’t subject to race conditions.

Right; it would.