Oban — Reliable and Observable Job Processing

Of course, we’re all busy. I’m not concerned about your issue from the Oban side, as I’m confident it is configuration or user error of some kind. Maybe you pruned jobs faster than the unique setting? Maybe you were using insert_all?

I urge people not to create blanket unique constraints for worker/args as it’s inflexible and wasteful.

This is not a good approach, imo. With the information you gave no one can tell whether it’s because of a genuine bug or because of a misstep while using the library. If you want to air something out like this you should be willing to work through the details to get to the truth.

For our use case we absolutely want to be inflexible and not have multiple jobs. Oban did not respect the unique setting one way or another, and the only real way for us to guarantee uniqueness was through this database unique index.

What we are looking at next is try to catch these attempts at inserting dupes and tracing that back to try to figure it out. @slouchpie is very thorough and if he doesn’t share additional things is because we really don’t have anything additional to report, it’s quite a mystery.

As it stands, make sure you add a unique database index or you will somehow get dupe jobs.

But you haven’t shared any additional information to help diagnose the underlying issue. Instead, you’re giving contrary advice based on your assumptions about a bug.

Unique jobs is an older feature and I’ve never heard of this bug before (let alone crashes within a transaction causing writes to the database). If you truly feel that this is because of a bug in Oban, then please share additional information so we can diagnose it and potentially fix it for others.

With the information that I have currently it doesn’t look like an Oban issue at all, and you’re spreading FUD rather than collaborating.

10 Likes

Hi @sorentwo , is it possible to add logic to the job uniqueness feature?

For instance if we want to perform long operations when user data has changed, we may use a job with the following flags:

  • job 1 [flag_1: true, flag_2: false]
  • job 2 [flag_1: false, flag_2: true]

The uniqueness would be used to debounce the long operation.

Now, is it possible to have [flag_1: true, flag_2: true] in the end ?

Thank you.

Unique jobs have a lot of complexity around querying and replacement already, and I’d prefer not to add more.

I’d look at modeling your problem using more nuanced args/meta and a ‘replace` option.

1 Like

Alright, thank you :slight_smile:

Hi, i am a new Oban user. I am running a system with multiple nodes one database. Trying to understand the section on job uniqueness Oban — Oban v2.13.4. From what i understand there is a default 60 seconds :period for which a job will be considered unique.

So say my system has nodes A,B,C. If A,B,C all triggered the same scheduled job (all running same codebase and hence Oban config), one of them will hit the DB first and then the other two will be rejected because of a lock for 60 seconds.

Then why would i need to code stuff like use Oban.Worker, unique: [period: 300, states: [:available, :scheduled, :executing]]?

I am definitely misunderstanding something. I did

 Enum.each(1..10, fn _i ->
      %{id: 1, in_the: "business", of_doing: "business"}
      |> MyWorker.new()
      |> Oban.insert()
    end)

All this happens within 60 seconds, i was expecting to see only 1 job but i see 10


Update:

This quoted paragraph below is misleading. It seems Oban does not have any defaults at all. I needed to specify use Oban.Worker, unique: [period: 60] to get the behaviour i expected

:period — The number of seconds until a job is no longer considered duplicate. You should always specify a period, otherwise Oban will default to 60 seconds. :infinity can be used to indicate the job be considered a duplicate as long as jobs are retained.

Are you assuming that uniqueness is set by default rather than, if you enable unique jobs, the default period is 60s?

I believe that default 60s would apply if you specified unique: true, or if you specified unique with other options but not period explicitly.

It does not make sense for all jobs to be unique by default.

Ok this kinda make sense thanks

Oban v2.14 has been released!

:feather: SQLite3 Support with the Lite Engine

Increasingly, developers are choosing SQLite for small to medium-sized projects, not just in the
embedded space where it’s had utility for many years. Many of Oban’s features, such as isolated
queues, scheduling, cron, unique jobs, and observability, are valuable in smaller or embedded
environments. That’s why we’ve added a new SQLite3 storage engine to bring Oban to smaller,
stand-alone, or embedded environments where PostgreSQL isn’t ideal (or possible).

There’s frighteningly little configuration needed to run with SQLite3. Migrations, queues, and
plugins all “Just Work™”.

To get started, add the ecto_sqlite3 package to your deps and configure Oban to use the
Oban.Engines.Lite engine:

config :my_app, Oban,
  engine: Oban.Engines.Lite,
  queues: [default: 10],
  repo: MyApp.Repo

Presto! Run the migrations, include Oban in your application’s supervision tree, and then start
inserting and executing jobs as normal.

:warning: SQLite3 support is new, and while not experimental, there may be sharp edges. Please report any
issues or gaps in documentation.

:woman_scientist: Smarter Job Fetching

The most common cause of “jobs not processing” is when PubSub isn’t available. Our troubleshooting
section instructed people to investigate their PubSub and optionally include the Repeater
plugin. That kind of manual remediation isn’t necessary now! Instead, we automatically switch back
to local polling mode when PubSub isn’t available—if it is a temporary glitch, then fetching
returns to the optimized global mode after the next health check.

Along with smarter fetching, Stager is no longer a plugin. It wasn’t ever really a plugin, as
it’s core to Oban’s operation, but it was treated as a plugin to simplify configuration and
testing. If you’re in the minority that tweaked the staging interval, don’t worry, the existing
plugin configuration is automatically translated for backward compatibility. However, if you’re a
stickler for avoiding deprecated options, you can switch to the top-level stage_interval:

config :my_app, Oban,
  queues: [default: 10],
- plugins: [{Stager, interval: 5_000}]
+ stage_interval: 5_000

:satellite: Comprehensive Telemetry Data

Oban has exposed telemetry data that allows you to collect and track metrics about jobs and queues
since the very beginning. Telemetry events followed a job’s lifecycle from insertion through
execution. Still, there were holes in the data—it wasn’t possible to track the exact state of your
entire Oban system through telemetry data.

Now that’s changed. All operations that change job state, whether inserting, deleting, scheduling,
or processing jobs report complete state-change events for every job including queue, state,
and worker details. Even bulk operations such as insert_all_jobs, cancel_all_jobs, and
retry_all_jobs return a subset of fields for all modified jobs, rather than a simple count.

See the 2.14 upgrade guide for step-by-step instructions (all two of them).

Enhancements

  • [Oban] Store a {:cancel, :shutdown} error and emit [:oban, :job, :stop] telemetry when jobs
    are manually cancelled with cancel_job/1 or cancel_all_jobs/1.

  • [Oban] Include “did you mean” suggestions for Oban.start_link/1 and all nested plugins when a
    similar option is available.

    Oban.start_link(rep: MyApp.Repo, queues: [default: 10])
    ** (ArgumentError) unknown option :rep, did you mean :repo?
        (oban 2.14.0-dev) lib/oban/validation.ex:46: Oban.Validation.validate!/2
        (oban 2.14.0-dev) lib/oban/config.ex:88: Oban.Config.new/1
        (oban 2.14.0-dev) lib/oban.ex:227: Oban.start_link/1
        iex:1: (file)
    
  • [Oban] Support scoping queue actions to a particular node.

    In addition to scoping to the current node with :local_only, it is now possible to scope
    pause, resume, scale, start, and stop queues on a single node using the :node
    option.

    Oban.scale_queue(queue: :default, node: "worker.123")
    
  • [Oban] Remove retry_job/1 and retry_all_jobs/1 restriction around retrying scheduled jobs.

  • [Job] Restrict replace option to specific states when unique job’s have a conflict.

    # Replace the scheduled time only if the job is still scheduled
    SomeWorker.new(args, replace: [scheduled: [:schedule_in]], schedule_in: 60)
    
    # Change the args only if the job is still available
    SomeWorker.new(args, replace: [available: [:args]])
    
  • [Job] Introduce format_attempt/1 helper to standardize error and attempt formatting
    across engines

  • [Repo] Wrap nearly all Ecto.Repo callbacks.

    Now every Ecto.Repo callback, aside from a handful that are only used to manage a Repo
    instance, are wrapped with code generation that omits any typespecs. Slight inconsistencies
    between the wrapper’s specs and Ecto.Repo’s own specs caused dialyzer failures when nothing
    was genuinely broken. Furthermore, many functions were missing because it was tedious to
    manually define every wrapper function.

  • [Peer] Emit telemetry events for peer leadership elections.

    Both peer modules, Postgres and Global, now emit [:oban, :peer, :election] events during
    leader election. The telemetry meta includes a leader? field for start and stop events to
    indicate if a leadership change took place.

  • [Notifier] Allow passing a single channel to listen/2 rather than a list.

  • [Registry] Add lookup/2 for conveniently fetching registered {pid, value} pairs.

Bug Fixes

  • [Basic] Capture StaleEntryError on unique replace.

    Replacing while a job is updated externally, e.g. it starts executing, could occasionally raise
    an Ecto.StaleEntryError within the Basic engine. Now, that exception is translated into an
    error tuple and bubbles up to the insert call site.

  • [Job] Update t:Oban.Job/0 to indicate timestamp fields are nullable.

Deprecations

  • [Stager] Deprecate the Stager plugin as it’s part of the core supervision tree and may be
    configured with the top-level stage_interval option.

  • [Repeater] Deprecate the Repeater plugin as it’s no longer necessary with hybrid staging.

  • [Migration] Rename Migrations to Migration, but continue delegating functions for backward
    compatibility.

From the CHANGELOG

15 Likes

Thanks, the SQLite support is awesome!

1 Like

Is there a way to have a particular job run every Monday, every 5 minutes for 6 hours(And not run any other time)?

There is: */5 0-6 * * 1 will do it. Take a look at a tool like crontab.guru to play around with schedules.

1 Like

Oban v2.15.0 has been released!

:clamp: Notification Compression

Oban uses notifications across most core functionality, from job staging to cancellation. Some notifications, such as gossip, contain massive redundancy that compresses nicely. For example, this table breaks down the compression ratios for a fairly standard gossip payload containing data from ten queues:

Mode Bytes % of Original
Original 4720 100%
Gzip 307 7%
Encode 64 412 9%

Minimizing notification payloads is especially important for Postgres because it applies an 8kb limit to all messages. Now all pub/sub notifications are compressed automatically, with a safety mechanism for compatibility with external notifiers, namely Postgres triggers.

:card_file_box: Query Improvements

There has been an ongoing issue with systems recording a job attempt twice when it only executed once. While that sounds minor, it could break an entire queue when the attempt exceeded max attempts because it would violate a database constraint.

Apparently, the Postgres planner may choose to generate a plan that executes a nested loop over the LIMITing subquery, causing more UPDATEs than LIMIT. That could cause unexpected updates, including attempts > max_attempts in some cases. The solution is to use a CTE as an “optimization fence” that forces Postgres not to optimize the query.

We also worked in a few additional query improvements:

  • Use an index only scan for job staging to safely handle tables with millions of scheduled jobs.
  • Remove unnecessary row locking from staging and pruning queries.

:feather: New Engine Callbacks for SQL Compatibility

We’re pleased to share improvements in Oban’s SQLite integration. A few SQLite pioneers identified pruning and staging compatibility bugs, and instead of simply patching around the issues with conditional logic, we tackled them with new engine callbacks: stage_jobs/3 and prune_jobs/3. The result is safer, optimized queries for each specific database.

Introducing new engine callbacks with database-specific queries paves the way for working with other databases. There’s even an open issue for MySQL support

v2.15.0 — 2023-04-13

Enhancements

  • [Oban] Use DynamicSupervisor to supervise queues for optimal shutdown

    Standard supervisors shut down in a fixed order, which could make shutting down queues with active jobs and a lengthy grace period very slow. This switches to a DynamicSupervisor for queue supervision so queues can shut down simultaneously while still respecting the grace period.

  • [Executor] Retry acking infinitely after job execution

    After jobs execute the producer must record their status in the database. Previously, if lacking failed due to a connection error after 10 retries it would orphan the job. Now, acking retries infinitely (with backoff) until the function succeeds. The result is stronger execution guarantees with backpressure during periods of database fragility.

  • [Oban] Accept a Job struct as well as a job id for cancel_job/1 and retry_job/1

    Now it’s possible to write Oban.cancel_job(job) directly, rather than Oban.cancel_job(job.id).

  • [Worker] Allow snoozing jobs for zero seconds.

    Returning {:snooze, 0} immediately reschedules a job without any delay.

  • [Notifier] Accept arbitrary channel names for notifications, e.g. “my-channel”

  • [Telemetry] Add ‘detach_default_logger/0’ to programmatically disable an attached logger.

  • [Testing] Avoid unnecessary query for “happy path” assertion errors in assert_enqueued/2

  • [Testing] Inspect charlists as lists in testing assertions

    Args frequently contain lists of integers like [123], which was curiously displayed as '{'.

Bug Fixes

  • [Executor] Correctly raise “unknown worker” errors.

    Unknown workers triggered an unknown case error rather than the appropriate “unknown worker” runtime error.

  • [Testing] Allow assert_enqueued with a scheduled_at time for available jobs

    The use of Job.new to normalize query fields would change assertions with a “scheduled_at” date to only check scheduled, never “available”

  • [Telemetry] Remove :worker from engine and plugin query meta.

    The worker isn’t part of any query indexes and prevents optimal index usage.

  • [Job] Correct priority type to cover default of 0

8 Likes

Is this still the case @sorentwo ?

Sorry to ping you, but I ask because I’m starting a job during application boot (inside start/2 from application.ex) and, no matter what I try with unique, I keep getting 2 jobs when the application starts (deploying to Fly with 2 machines).

So I was wondering if this is what is happening here.

Let me know if you want more context.

Cheers.

(and thanks for the awesome work)

It’s true, cron jobs are unique for 59 seconds by default. However, you can override the unique period and there’s now a leadership mechanism to ensure only one node enqueues cron jobs.

How are you starting a job during boot? Is it an @reboot cron job? What are the unique settings? There isn’t enough information for me to help diagnose your issue.

Thanks for getting back to me and confirming the first question.

Sorry I wasn’t more specific before.

This job does not rely on cron, I’m enqueuing it on start up like so, inside my application.ex:

defmodule MyApp.Application do
  # See https://hexdocs.pm/elixir/Application.html
  # for more information on OTP Applications
  @moduledoc false

  use Application
  alias MyApp.Workers.MyWorker

  @impl true
  def start(_type, _args) do
    children = [
      # Start the Telemetry supervisor
      MyAppWeb.Telemetry,
      # Start the Ecto repository
      MyApp.Repo,
      # Start the PubSub system
      {Phoenix.PubSub, name: MyApp.PubSub},
      # Start Finch
      {Finch, name: MyApp.Finch},
      # Start the Endpoint (http/https)
      MyAppWeb.Endpoint,
      {Oban, Application.fetch_env!(:my_app, Oban)}
      # Start a worker by calling: MyApp.Worker.start_link(arg)
      # {MyApp.Worker, arg}
    ]

    # See https://hexdocs.pm/elixir/Supervisor.html
    # for other strategies and supported options
    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    result = Supervisor.start_link(children, opts)
    MyWorker.enqueue()
    result
  end

  # Tell Phoenix to update the endpoint configuration
  # whenever the application is updated.
  @impl true
  def config_change(changed, _new, removed) do
    MyAppWeb.Endpoint.config_change(changed, removed)
    :ok
  end
end

And the worker (simplfied version):

defmodule MyApp.Workers.MyWorker do
  use Oban.Worker, unique: [fields: [:queue, :worker], period: :infinity]

  def perform(_) do
    with :ok <- do_some_stuff(),
         :ok <- do_more_stuff() do
      enqueue()
    else
      error -> error
    end

    :ok
  end
  
  def enqueue(args \\ %{}) do 
      args
      |> Oban.Job.new(worker: __MODULE__)
      |> Oban.insert!()
  end
end

I also tried to move the unique options to the enqueue function and got the same situation.
As you can see, I’m not really passing any args, but I did try passing a dummy job: :search arg just so I could try and limit by that as well.

In any case, I managed to make it work using an insert helper function like you suggested here.

But now I’m wondering if my initial issue was due to job pruning, like we can see here.

In any case, thank you again for all your work and help here.

Cheers.