Oban — Reliable and Observable Job Processing

Hi,

if my case is just to run a function at a specific date, which dep would be the best?

Thank you so much!

If you’re running on a cron-like schedule (e.g. the first of every month) then either one is viable. However, if you’re scheduling functions at a particular date on the fly (e.g. one day from now), then you’d want Oban.

1 Like

Announcing releases on the forum slipped a little, but better late than never:

Oban v2.16.0 was released (on 2023-09-22)!

:sheep: Oban Instance Module

New facade modules allow you to call Oban functions on instances with custom names, e.g. not Oban, without passing a t:Oban.name/0 as the first argument.

For example, rather than calling Oban.config/1 you’d call MyOban.config/0:

MyOban.config()

It also makes piping into Oban functions far more convenient:

%{some: :args}
|> MyWorker.new()
|> MyOban.insert()

:jigsaw: Partial Matches in Testing Assertions

It’s now possible to match a subset of fields on args or meta with all_enqueued, assert_enqueued, and refute_enqueued. For example, the following assertion will now pass:

# Given a job with these args: %{id: 123, mode: "active"}

assert_enqueued args: %{id: 123} #=> true
assert_enqueued args: %{mode: "active"} #=> true
assert_enqueued args: %{id: 321, mode: "active"} #=> false

The change applies to args and meta queries for all_enqueued/2, assert_enqueued/2 and refute_enqueued/2 helpers.

:timer_clock: Unique Timestamp Option

Jobs are frequently scheduled for a time far in the future and it’s often desirable for to consider scheduled jobs for uniqueness, but unique jobs only checked the :inserted_at timestamp.

Now unique has a timestamp option that allows checking the :scheduled_at timestamp instead:

use Oban.Worker, unique: [period: 120, timestamp: :scheduled_at]

v2.16.0 — 2023-09-22

Bug Fixes

  • [Reindexer] Correct relname match for reindexer plugin

    We can safely assume all indexes start with oban_jobs. The previous pattern was based on an outdated index format from older migrations.

  • [Testing] Support repo, prefix, and log query options in use Oban.Testing

1 Like

To keep playing catch-up, Oban v2.16.1 and v2.16.2 are also released!

:shushing_face: Notice that v2.16.2 sneaks in a frequently requested wildcard matcher for test assertions

v2.16.2 — 2023-10-03

Bug Fixes

  • [Testing] Match args/meta patterns in Elixir rather than the database

    The containment operators, @> and <@, used for pattern matching in tests are only available in Postgres and have some quirks. Most notably, containment considers matching any value in a list a successful match, which isn’t intuitive or desirable.

    The other issue with using a containment operator in tests is that SQLite doesn’t have those operators available and test helpers are shared between all engines.

Enhancements

  • [Testing] Support wildcard matcher in patterns for args/meta

    Now that we match in Elixir, it’s simple to support wildcard matching with a :_ to assert that a key is present in a json field without specifying an exact value.

    assert_enqueued args: %{batch_id: :_, callback: true}
    

v2.16.1 — 2023-09-25

Bug Fixes

  • [Testing] Restore splitting out all config options in helpers.

    Splitting all configuration keys is necessary when using perform_job/3 with non-job options such as :engine.

2 Likes

2 posts were split to a new topic: Oban Pro Relay and :infinity timeouts

3 posts were merged into an existing topic: Oban Pro Relay and :infinity timeouts

Oban v2.17 is out!

This release includes an optional database migration to disable triggers and relax priority checks. See the v2.17 upgrade guide for step-by-step instructions.

:pager: Universal Insert Notifications

Historically, Oban used database triggers to emit a notification after a job is inserted. That allowed jobs to execute sooner, without waiting up to a second until the next poll event. Those triggers and subsequent notifications added some overhead to database operations bulk inserts into the same queue, despite deduplication logic in the trigger. Even worse, trigger notifications didn’t work behind connection poolers and were restricted to the Postgres notifier.

Now insert notifications have moved out of the database and into application code, so it’s possible to disable triggers without running database migrations, and they work for any notifier, not just Postgres.

Disable notifications with the insert_trigger option if sub-second job execution isn’t important or you’d like to reduce PubSub chatter:

config :my_app, Oban,
  insert_trigger: false,
  ...

:factory_worker: Worker Conveniences

Workers received a few quality of life improvements to make defining unique behaviour more expressive and intuitive.

First, it’s now possible to define a job’s unique period with time units like {1, :minute} or {2, :hours}, just like a job’s :schedule_in option:

use Oban.Worker, unique: [period: {5, :minutes}]

Second, you can set the replace option in use Oban.Worker rather than in an overridden new/2 or as a runtime option. For example, to enable updating a job’s scheduled_at value on unique conflict:

use Oban.Worker, unique: [period: 60], replace: [scheduled: [:scheduled_at]]

:bird:‍:fire: Oban Phoenix Notifier

The new oban_notifiers_phoenix package allows Oban to share a Phoenix application’s PubSub for notifications. In addition to centralizing PubSub communications, it opens up the possible transports to all PubSub adapters. As Oban already provides Postgres and PG (Distributed Erlang) notifiers, the new package primarily enables Redis notifications.

config :my_app, Oban,
  notifier: {Oban.Notifiers.Phoenix, pubsub: MyApp.PubSub},
  ...

:level_slider: Ten Levels of Job Priority

Job priority may now be set to values between 0 (highest) and 9 (lowest). This increases the range from 4 to 10 possible priorities, giving applications much finer control over execution order.

args
|> MyApp.PrioritizedWorker.new(priority: 9)
|> Oban.insert()

Enhancements

  • [Oban] Add Oban.pause_all_queues/2 and Oban.resume_all_queues/2.

    Pause and resume all queues with a single function call and a single notification signal, rather than manually looping through all queues and issuing separate calls.

  • [Cron] Add non-raising Expression.parse/2 for use in Cron.parse/2 and shared validations.

    Multiple locations used parse! and converted a raised exception into an error tuple. That was inefficient, repetitive, and violated the common practice of avoiding exceptions for flow control.

  • [Validation] Use schema based validation for workers, plugins, and config.

    Validations are now simpler and more consistent, and behaviour based notifiers such as Engine, Repo, and Peer are more descriptive.

  • [Engine] Expand telemetry meta for all engine callbacks events.

    All callbacks now include every argument in telemetry event metadata. In some situations, e.g. :init, this simplifies testing and can be used to eliminate the need to poll a supervision tree to see which queues started.

  • [Notifier] Add Isolated notifier for local use and simplified testing.

    Using PG for async tests has occasional flakes due to its eventually consistent nature. In tests and single node systems, we don’t need to broadcast messages between instances or nodes, and a simplified “isolated” mechanism is ideal.

  • [Repo] Add Repo.query!/4 for Ecto.Repo parity

  • [Migration] Configure a third-party engine’s migrator using the repo’s config map.

Bug Fixes

  • [Cron] Guard against invalid cron range expressions where the left side is greater than the right, e.g. SAT-FRI.

  • [Testing] Disable the prefix by default in generated testing helpers.

    A prefix is only necessary when it’s not the standard “public” prefix, which is rarely the case in testing helpers. This makes it easier to use testing helpers with the Lite engine.

  • [Testing] Remove prefix segment from assert_enqueued error messages.

    Not all engines support a prefix and the assert/refute message in testing helpers is confusing when the prefix is nil.

Deprecations

  • [Gossip] The Gossip plugin is no longer needed, and shouldn’t be used, by applications running Oban Web v2.10 or above.
9 Likes

Oban v2.18 is out!

:telescope: Queue Shutdown Telemetry

A new queue shutdown event, [:oban, :queue, :shutdown], is emitted by each queue when it terminates. The event originates from the watchman process, which tracks the total ellapsed time from when termination starts to when all jobs complete or the allotted period is exhausted.

Any jobs that take longer than the :shutdown_grace_period (by default 15 seconds) are brutally killed and left as orphans. The ids of jobs left in an executing state are listed in the event’s orphaned meta.

This also adds queue:shutdown logging to the default logger. Only queues that shutdown with orphaned jobs are logged, which makes it easier to detect orphaned jobs and which jobs were affected:

[
  message: "jobs were orphaned because they didn't finish executing in the allotted time",
  queue: "alpha",
  source: "oban",
  event: "queue:shutdown",
  ellapsed: 500,
  orphaned: [101, 102, 103]
]

:truck: Distributed PostgreSQL Support

It’s now possible to run Oban in distributed PostgreSQL databases such as Yugabyte. This is made possible by a few simple changes to the Basic engine, and a new unlogged migration option.

Some PostgreSQL compatible databases don’t support unlogged tables. Making oban_peers unlogged isn’t a requirement for Oban to operate, so it can be disabled with a migration flag:

defmodule MyApp.Repo.Migrations.AddObanTables do
  use Ecto.Migration

  def up do
    Oban.Migration.up(version: 12, unlogged: false)
  end
end

:brain: Job Observability

Job stop and exception telemetry now includes the reported memory and total reductions from the job’s process. Values are pulled with Process.info/2 after the job executes and safely fall back to 0 in the event the process has crashed. Reductions are a rough proxy for CPU load, and the new measurements will make it easier to identify computationally expensive or memory hungry jobs.

In addition, thanks to the addition of Process.set_label in recent Elixir versions, the worker name is set as the job’s process label. That makes it possible to identify which job is running in a pid via observer or live dashboard.

v2.18.0 — 2024-07-26

Enhancements

  • [Job] Support simple unique: true and unique: false declarations

    Uniqueness can now be enabled with unique: true and disabled with unique: false from job options or a worker definition. The unique: true option uses all the standard defaults, but sets the period to :infinity for compatibility with Oban Pro’s new simple unique mode.

  • [Cron] Remove forced uniqueness when inserting scheduled jobs.

    Using uniqueness by default prevents being able to use the Cron plugin with databases that don’t support uniqueness because of advisory locks. Luckily, uniqueness hasn’t been necessary for safe cron insertion since leadership was introduced and scheduling changed to top-of-the-minute many versions ago.

  • [Engine] Introduce check_available/1 engine callback

    The check_available/1 callback allows engines to customize the query used to find jobs in the available state. That makes it possible for alternative engines, such Oban Pro’s Smart engine, to check for available jobs in a fraction of the time with large queues.

  • [Peer] Add Oban.Peer.get_leader/2 for checking leadership

    The get_leader/2 function makes it possible to check which node is currently the leader regardless of the Peer implementation, and without having to query the database.

  • [Producer] Log a warning for unhandled producer messages.

    Some messages are falling through to the catch-all handle_info/2 clause. Previously, they were silently ignored and it degraded producer functionality because inactive jobs with dead pids were still tracked as running in the producer.

  • [Oban] Use structured messages for most logger warnings.

    A standard structure for warning logs makes it easier to search for errors or unhandled messages from Oban or a particular module.

Bug Fixes

  • [Job] Include all fields in the unique section of Job.t/0.

    The unique spec lacked types for both keys and timestamp keys.

  • [Basic] Remove materialized option from fetch_jobs/3.

    The MATERIALIZED clause for CTEs didn’t make a meaningful difference in job fetching accuracy. In some situations it caused a performance regression (which is why it was removed from Pro’s Smart engine a while ago).

3 Likes

from where to learn oban

We’ll post this in another thread.
We’re actively working on an Oban 101 training release.

3 Likes

any existing resources? that will be too late.

Hey @sorentwo I would like to know if there is a way to test job cancellation in unit tests.

I want to know if I can trap the {:EXIT, pid, :shutdown} message from a perform/1 callback to execute some code becore calling exit(:shutdown).

Problem is that if I keep config :my_app, Oban, testing: :manual in the test config (which is useful for a lot of tests), calling Oban.start_queue/1 and/or Oban.resume_queue/1 in the test setup and Oban.stop_queue/1 in the on_exit callback do not seem to have effect. The queue does not seem to run. Should it ?

I tried to check with Oban.check_queue but I get a :noproc error on gen_server:call.

Is there a way to test trapping the cancellation ?

Thank you.

edit: I guess I’ll just send a fake {:EXIT, pid, :shutdown} message for now.

Sending an exit message manually is what I’d recommend for OSS. If your goal is to trap it to log, or run some side effect, and you have access to Pro then I recommend using a worker hook instead.

2 Likes

We should definitely have Oban Pro but my company is still using OSS for now.

Alright, thank you ! (I’ll actually use Process.exit from the test process to assert we are trapping them.)

1 Like

Hello,

Out of curiosity I was looking at the code of the reindexer plugin. It seems that it is dropping all oban indexes before calling REINDEX on each of them. But how does Postgres know the definition of the index (like the columns for instance) when the index was dropped and the SQL query only contains a name (like REINDEX INDEX CONCURRENTLY "public".oban_jobs_args_index) ?

I can see the indexes definitions in the v10 migration but surely this is not used at runtime.

(I see that the index is dropped only if it is not queryable, but I guess if this clause exists it’s because there are chances).

Thank you

It is dropping incomplete indexes that encountered an issue during reindexing. That can happen because the reindex command runs concurrently, without blocking.

It’s a clean up step for previous bad indexes (which aren’t used anyhow). The original, good, indexes aren’t dropped at all.

2 Likes

Ah ok got it!

Thank you :slight_smile:

How we deal with module rename of jobs processing?

Do I have to write migrations to deal with renaming of a worker?

That’s one approach. I find it slightly safer to leave two copies of the module in place under the old and new name until all the jobs referencing the old name complete processing, then delete that module.

3 Likes

That’s one way to do it. The other is with two modules, where the old delegates to the new.

Alternatively, with Pro you can use Worker Aliases to seamlessly transition without writing a migration.