Oban Pro 1.5 migration: unique jobs with period:infinity

According to the 1.5 upgrade guide:

If uniqueness is essential to your system, upgrade to Pro v1.4.13+ and let it run as long as your longest unique period before upgrading to v1.5. For example, if your longest unique period is 1 day, let it run for at least a day. That will ensure the correct unique values are present for index-backed uniqueness when you upgrade.

We have an issue with this:
we have jobs with an infinite unique period that MUST be kept in table.

Is there a way to compute the uniq_bmp in postgresql so we can change the old records ?

How long are you retaining those jobs in the table? Meaning, what’s your maximum prune time?

You can inject the correct uniq_bmp with a query, but you’ll have to map the states to the correct integers beforehand because the unique states option isn’t in the database. Here’s an example that sets a uniq_bmp using the standard set of unique states (everything but cancelled and discarded):

update oban_jobs
set meta || '{"uniq_bmp":[0,1,2,3,4]}'
where worker = 'MyApp.SomeWorker' and meta ? 'uniq_key'

If you’ve set uniqueness across all states, then you would use all values, 0..6.

2 Likes

Hello again

We use the following policy:

     mode: {:max_age, {30, :days}},
     state_overrides: [
       discarded: {:max_age, {180, :days}}
     ],

We tried to update the legacy jobs but the migration failed when we try to deploy it (oban pro 1.5).
The oban_jobs_unique_index failed to create with error 23505 (unique_violation).
We looked in our database with the following:

select count(*) from oban_jobs
where meta ? 'uniq_key'
group by
	queue, worker, meta->'uniq_key', state
having count(*) > 1;

And found many results

The old unique keys were calculated with phash2, and only intended for unique checks on insert. With 180 days worth of retained jobs you’re bound to have conflicts. That’s why the new unique keys are calculated as cryptographic hashes. We posted about the issue a few months ago.

This is all a big change, but essential for accuracy, performance, and compatibility with distributed databases.

Since you need to retain those old jobs, you’ll need a way to update the unique keys for them, which must be done in Elixir. Here’s an example script that shows how to query for the old unique format and update them as a stream:

import Ecto.Query

# These are the default unique options, tweak as necessary
unique = %{
  fields: ~w(args queue worker)a,
  keys: [],
  period: 60,
  states: ~w(scheduled available executing retryable completed)a
}

stream =
  Oban.Job
  |> where([j], j.state in ~w(completed cancelled discarded))
  |> where([j], fragment(~s|jsonb_path_query(?, '$.uniq_key.type()') = '"number"'::jsonb|, j.meta))
  |> order_by(:id)
  |> MyApp.Repo.stream()

MyApp.Repo.transaction(fn ->
  Enum.each(stream, fn job ->
    job
    |> Ecto.Changeset.change(%{unique: unique})
    |> Oban.Pro.Unique.with_uniq_meta()
    |> MyApp.Repo.update!()
  end)
end)

Depending on the number of jobs you have, it may be better to batch than to stream. The important bit is to make a changeset and call Oban.Pro.Unique.with_uniq_meta/1 on it.

3 Likes