Oban Pro (Extensions, Workers, and Plugins that expand Oban) — Info, News & Updates Thread

Oban.Pro is a collection of extensions, workers, and plugins that expand what Oban is capable of while making complex workflows possible.


Oban Pro v1.2 is out!

This release includes expanded priorities, defining replace in workers, notifier changes, and validation improvements which require Oban v2.17+

:card_file_box: DynamicPartitioner

The DynamicPartitioner plugin adds partitioned table support for optimized query performance, minimal database bloat, and efficiently pruned historic jobs. Partitioning can minimize database bloat for tables of any size, but it’s ideally suited for high throughput applications that run millions of jobs a week.

A synthetic benchmark of 7m jobs, (1m a day for 7 days), showed the following performance improvements:

  • 40% Smaller tables (6,281MB to 4,121MB)
  • 95% Less bloat after vacuum (4,625MB to 230MB)
  • 2.5x Faster vacuuming (28,638ms to 11,529ms)
  • 2.1x Faster reindexing (6,248ms to 2,939ms)
  • 1000x Faster job pruning (51,170ms to 49ms)

Systems with higher throughput may see more dramatic performance and bloat improvements.

See the DynamicPartitioner docs for more details and instructions on switching over.

:alarm_clock: Scheduling Guarantees

The DynamicCron plugin gained an option to improve reliability with schedule guarantees. Depending on an application’s restart timing or as the result of unexpected downtime, a job’s scheduled insert period can be missed. To compensate, you can enable guaranteed mode for the entire crontab or on an individual bases.

In guaranteed mode, jobs inserts are attempted every minute, but with uniqueness calculated to prevent any overlaps. Here’s an example of enabling guaranteed insertion for the entire crontab:

[
  guaranteed: true,
  crontab: [
    {"@hourly", MyApp.HourlyJob},
    {"@daily", MyApp.DailyJob},
    {"@monthly", MyApp.DailyJob, guaranteed: false},
  ]
]

See DynamicCron’s section on scheduling guarantees for use cases, configuration, and potential caveats.

:identification_card: UUIDv7 for Binary Ids

Now producers, batches, and workflows all use time-ordered UUIDv7 values for binary ids. UUIDv7 has natural ordering while retaining UUID compatibility, making it a direct replacement with sortability and more efficient indexing.

This change uses a pure Elixir implementation and doesn’t introduce any dependencies or new Postgres requirements.

Bug Fixes

  • [DynamicQueues] Drop unsupported :only option before calling Oban.scale_queue
10 Likes

Oban Pro v1.3 is out!

This release is entirely dedicated to Smart engine optimizations, from slashing queue transactions to boosting bulk insert performance.

:postbox: Async Tracking

Rather than synchronously recording updates (acks) in a separate transaction after jobs execute, the Smart engine now bundles acks together to minimize transactions and reduce load on the database.

Async tracking, combined with the other enhancements detailed below, showed the following improvements over the previous Smart engine when executing 1,000 jobs with concurrency set to 20:

  • Transactions reduced by 97% (1,501 to 51)
  • Queries reduced by 94% (3,153 to 203)

That means less Ecto pool contention, fewer transactions, fewer queries, and fewer writes to the oban_producers table! There are similar, albeit less flashy, improvements over the Basic engine as well.

Notes and Implementation Details

  • Acks are stored centrally per queue and flushed with the next transaction using a lock-free mechanism that never drops an operation.

  • Acks are grouped and executed as a single query whenever possible. This is most visible in high throughput queues.

  • Acks are preserved across transactions to guarantee nothing is lost in the event of a rollback or an exception.

  • Acks are flushed on shutdown and when the queue is paused to ensure data remains as consistent as the previous synchronous version.

  • Acking is synchronous in testing mode, when draining jobs, and when explicitly enabled by a flag provided to the queue.

See the Smart engine’s async tracking section for more details and instructions on how to selectively opt out of async mode.

v1.3.0 — 2024-01-16

Enhancements

  • [Smart] Skip extra query to “touch” the producer when acking without global or rate limiting enabled. This change reduces overall producer updates from 1 per job to 2 per minute for standard queues.

  • [Smart] Avoid refetching the local producer’s data when fetching new jobs.

    Async acking is centralized through the producer, which guarantees global and rate tracking data is up-to-date before fetching without an additional read.

  • [Smart] Optimize job insertion with fewer iterations.

    Iterating through job changesets as a map/reduce with fewer conversions improves inserting 1k jobs by 10% while reducing overall memory by 9%.

  • [Smart] Efficiently count changesets during insert_all.

    Prevent duplicate iterations through changesets to count unique jobs. Iterating through them once to accumulate multiple counts improved insertion by 3% and reduced overall memory by 2%.

  • [Smart] Acking cancelled jobs is done with a single operation and limited to queues with global limiting.

Bug Fixes

  • [Smart] Always merge acked meta updates dynamically.

    All meta-updating queries are dynamically merged with existing meta. This prevents recorded jobs from clobbering other meta updates made while the job executed.

  • [Smart] Safely extract producer uuid from attempted_by with more than two elements

  • [DynamicCron] Preserve stored opts such as args, priority, etc., on reboot when no new opts are set.

  • [Relay] Skip attempting relay notifications when the associated Oban pid isn’t alive.

1 Like

Oban Pro v1.4 is out!

:bullettrain_front: Streamlined Workflows

Workflows now use automatic scheduling to run workflow jobs in order, without any polling or
snoozing. Jobs with upstream dependencies are automatically marked as on_hold and scheduled far
into the future. After the upstream dependency executes they’re made available to run, with
consideration for retries, cancellations, and discards.

  • An optimized, debounced query system uses up to 10x fewer queries to execute slower workflows.

  • Queue concurrency limits don’t impact workflow execution, and even complex workflows can quickly
    run in parallel with global_limit: 1 and zero snoozing.

  • Cancelled, deleted, and discarded dependencies are still handled according to their respective
    ignore_* policies.

All of the previous workflow “tuning” options like waiting_limit and waiting_snooze are gone,
as they’re not needed to optimize workflow execution. Finally, older “in flight” workflows will
still run with the legacy polling mechanism to ensure backwards compatibility.

:timer_clock: Job Execution Deadlines

Jobs that shouldn’t run after some period of time can be marked with a deadline. After the
deadline has passed the job will be pre-emptively cancelled on its next run, or optionally
during its next run if desired.

defmodule DeadlinedWorker do
  use Oban.Pro.Worker, deadline: {1, :hour}

  @impl true
  def process(%Job{args: args}) do
    # If this doesn't run within an hour, it's cancelled
  end
end

Deadlines may be set at runtime as well:

DeadlinedWorker.new(args, deadline: {30, :minutes})

In either case, the deadline is always relative and computed at runtime. That also allows the
deadline to consider scheduling—a job scheduled to run 1 hour from now with a 1 hour deadline will
expire 2 hours in the future.

:mage: Automatic Crontab Syncing

Synchronizing persisted entries manually required two deploys: one to flag it with deleted: true
and another to clean up the entry entirely. That extra step isn’t ideal for applications that
don’t insert or delete jobs at runtime.

To delete entries that are no longer listed in the crontab automatically set the sync_mode
option to :automatic:

[
  sync_mode: :automatic,
  crontab: [
    {"0 * * * *", MyApp.BasicJob},
    {"0 0 * * *", MyApp.OtherJob}
  ]
]

To remove unwanted entries, simply delete them from the crontab:

 crontab: [
   {"0 * * * *", MyApp.BasicJob},
-  {"0 0 * * *", MyApp.OtherJob}
 ]

With :automatic sync, the entry for MyApp.OtherJob will be deleted on the next deployment.

:mending_heart::star2: Upgrading to v1.4 {: .warning}

Changes to DynamicCron require a migration to add the new insertions column. You must
re-run
the Oban.Pro.Migrations.DynamicCron migration when upgrading.

defmodule MyApp.Repo.Migrations.UpdateObanCron do
  use Ecto.Migration

  defdelegate change, to: Oban.Pro.Migrations.DynamicCron
end

v1.4.0 — 2024-03-21

Enhancements

  • [DynamicCron] Add :sync_mode for automatic entry management.

    Most users expect that when they delete an entry from the crontab it won’t keep running after
    the next deploy. A new sync_mode option allows selecting between automatic and manual
    entry management.

    In addition, this moves cron entry management into the evaluate handler. Inserting and deleting
    at runtime can’t leverage leadership, because in rolling deployments the new nodes are never
    the leader.

  • [DynamicCron] Use recorded job insertion timestamps for guaranteed cron.

    A new field on the oban_crons table records each time a cron job is inserted up to a
    configurable limit. That field is then used for guaranteed cron jobs, and optionally for
    historic inspection beyond a job’s retention period.

  • [DynamicCron] Stop adding a unique option when inserting jobs, regardless of the guaranteed
    option.

    There’s no need for uniqueness checks now that insertions are tracked for each entry. Inserting
    without uniqueness is significantly faster.

  • [DynamicCron] Inject cron information into scheduled job meta.

    This change mirrors the addition of cron: true and cron_expr: expression added to Oban’s
    Cron in order to make cron jobs easier to identify and report on through tools like Sentry.

  • [Worker] Add :deadline option for auto cancelling jobs

    Jobs that shouldn’t run after some period of time can be marked with a deadline. After the
    deadline has passed the job will be pre-emptively cancelled on its next run, or optionally
    during its next run if desired.

  • [Workflow] Invert workflow execution to avoid bottlenecks caused by polling and snoozing.

    Workflow jobs no longer poll or wait for upstream dependencies while executing. Instead, jobs with dependencies are “held” until they’re made available by a facilitator function. This nverted flow makes fewer queries, doesn’t clog queues with jobs that aren’t ready, avoids snoozing, and is generally more efficient.

  • [Workflow] Expose functions and direct callback docs to function docs.

    Most workflow functions aren’t true callbacks, and shouldn’t be overwritten. Now all callbacks point to the public function they wrap. Exposing workflow functions makes it easier to find and link to documentation.

Bug Fixes

  • [DynamicCron] Don’t consider the node rebooted until it is leader

    With rolling deploys it is frequent that a node isn’t the leader the first time it evaluates. However, the :rebooted flag was set to true on the first run, which prevented reboots from being inserted when the node ultimately acquired leadership.

  • [DynamicQueues] Accept streamline partition syntax for global_limit and rate_limit options.

    DynamicQueues didn’t normalize the newer partition syntax before validation. This was an oversight, and a sign that validation had drifted between the Producer and Queue schemas. Now schemas use the same changesets to ensure compatibility.

  • [Smart] Handle partitioning by :worker and :args regardless of order.

    The docs implied partitioning by worker and args was possible, but there wasn’t a clause that handled any order correctly.

  • [Smart] Explicitly cast transactional advisory lock prefix to integer.

    Postgres 16.1 may throw an error that it’s unable to determine the argument type while taking bulk unique locks.

  • [Smart] Preserve recorded values between retries when sync acking failed.

    Acking a recorded value for a batch or workflow is synchronous, and a crash or timeout failure
    would lose the recorded value on subsequent attempts. Now the value is persisted between retries
    to ensure values are always recorded.

  • [Smart] Revert “Force materialized CTE in smart fetch query”.

    Forcing a materialized CTE in the fetch query was added for reliability, but it can cause performance regressions under heavy workloads.

  • [Testing] Use configured queues when ensuring all started.

    Starting a supervised Oban in manual mode with tests specified would fail because in :manual testing mode the queues option is overridden to be empty.

8 Likes

Oban Pro v1.5.0-rc.0 is out!

This release includes the new job decorator, unified migrations, a index-backed simple unique mode, changes for distributed PostgreSQL, improved batches, streamlined chains, worker aliases, hybrid job composition, and other performance improvements.

Elixir Support

This release requires a minimum of Elixir v1.14. We officially support 3 Elixir versions back,
and use of some newer Elixir and Erlang/OTP features bumped the minimum up to v1.14.

:art: Job Decorator

The new Oban.Pro.Decorator module converts functions into Oban jobs with a teeny-tiny @job true annotation. Decorated functions, such as those in contexts or other non-worker modules, can be executed as fully fledged background jobs with retries, priority, scheduling, uniqueness, and all the other guarantees you have come to expect from Oban jobs.

defmodule Business do
  use Oban.Pro.Decorator

  @job max_attempts: 3, queue: :notifications
  def activate(account_id) when is_integer(account_id) do
    case Account.fetch(account_id) do
      {:ok, account} ->
        account
        |> notify_admin()
        |> notify_users()

      :error ->
        {:cancel, :not_found}
    end
  end
end

# Insert a Business.activate/1 job
Business.insert_activate(123)

The @job decorator also supports most standard Job options, validated at compile time. As expected, the options can be overridden at runtime through an additional generated clause. Along with generated insert_ functions, there’s also a new_ variant that be used to build up job changesets for bulk insert, and a relay_ variant that operates like a distributed async/await.

Finally, the generated functions also respect patterns and guards, so you can write assertive clauses that defend against bad inputs or break logic into multiple clauses.

:duck: Unified Migrations

Oban has had centralized, versioned migrations from the beginning. When there’s a new release with database changes, you run the migrations and it figures out what to change on its own. Pro behaved differently for reasons that made sense when there was a single producers table, but don’t track with multiple tables and custom indexes.

Now Pro has unified migrations to keep all the necessary tables and indexes updated and fresh, and you’ll be warned at runtime if the migrations aren’t current.

See the Oban.Pro.Migration module for more details, or check the v1.5 Upgrade Guide for instructions on putting it to use.

:unicorn: Enhanced Unique

Oban’s standard unique options are robust, but they require multiple queries and centralized locks to function. Now Pro supports an simplified, opt-in unique mode designed for speed, correctness, scalability, and simplicity.

The enhanced hybrid and simple modes allows slightly fewer options while boosting insert performance 1.5x-3.5x, from reducing database load with fewer queries, improving memory usage, and staying correct across multiple processes/nodes.

Here’s a comparison between inserting various batches with legacy and simple modes:

jobs legacy simple boost
100 45.08 33.93 1.36
1000 140.64 81.452 1.72
10000 3149.71 979.47 3.22
20000 oom error 1741.67

See more in the Enhanced Unique section.

:truck: Distributed PostgreSQL

There were a handful of PostgreSQL features used in Oban and Pro that prevented it from running in distributed PostgreSQL clients such as Yugabyte.

A few table creation options prevented even running the migrations due to unsupported database features. Then there were advisory locks, which are part of how Oban normally handles unique jobs, and how Pro coordinates queues globally.

We’ve worked around both all of these limitations and it’s possible to run Oban and Pro on Yugabyte with most of the same functionality as regular PostgreSQL (global, rate limits, queue partitioning).

:cookie: Improved Batches

One of the Pro’s original three features, batches link the execution of many jobs as a group and run optional callback jobs after jobs are processed.

Composing batches used to rely on a dedicated worker, one that couldn’t be composed with other worker types. Now, there’s a stand alone Oban.Pro.Batch module that’s used to dynamically build, append, and manipulate batches from any type of job, and with much more functionality.

Batches gain support for streams (creating and appending with them), clearer callbacks, and allow setting any Oban.Job option on callback jobs.

alias Oban.Pro.Batch

mail_jobs = Enum.map(mail_args, &MyApp.MailWorker.new/1)
push_jobs = Enum.map(push_args, &MyApp.PushWorker.new/1)

[callback_opts: [priority: 9], callback_worker: CallbackWorker]
|> Batch.new()
|> Batch.add(mail_jobs)
|> Batch.add(push_jobs)

See more in the Batch docs.

:link: Streamlined Chains

Chains now operate like workflows, where jobs are scheduled until they’re ready to run and then descheduled after the previous link in the chain completes. Preemptive chaining doesn’t clog queues with waiting jobs, and it chews through a backlog without any polling.

Chains are also a standard Oban.Pro.Worker option now. There’s no need to define a chain specific worker, in fact, doing so is deprecated. Just add the chain option and you’re guaranteed a FIFO chain of jobs:

-  use Oban.Pro.Workers.Chain, by: :worker
+  use Oban.Pro.Worker, chain: [by: :worker]

See more in the Chained Jobs section.

:paperclips: Improved Workflows

Workflows began the transition from a dedicated worker to a stand-alone module several versions ago. Now that transition is complete, and workflows can be composed from any type of job.

All workflow management functions have moved to a centralized Oban.Pro.Workflow module. An expanded set of functions, including the ability to cancel an entire workflow, conveniently work with either a workflow job or id, so it’s possible to maneuver workflows from anywhere.

Perhaps the most exciting addition, because it’s visual and we like shiny things, is the addition
of mermaid output for visualization. Mermaid has become the graphing
standard, and it’s an excellent way to visualize workflows in tools like LiveBook.

alias Oban.Pro.Workflow

workflow =
  Workflow.new()
  |> Workflow.add(:a, EchoWorker.new(%{id: 1}))
  |> Workflow.add(:b, EchoWorker.new(%{id: 2}), deps: [:a])
  |> Workflow.add(:c, EchoWorker.new(%{id: 3}), deps: [:b])
  |> Workflow.add(:d, EchoWorker.new(%{id: 4}), deps: [:c])

Oban.insert_all(workflow)

Workflow.cancel_jobs(workflow.id)

:people_holding_hands: Worker Aliases

Worker aliases solve a perennial production issue—how to rename workers without breaking existing jobs. Aliasing allows jobs enqueued with the original worker name to continue executing without exceptions using the new worker code.

-defmodule MyApp.UserPurge do
+defmodule MyApp.DataPurge do
-  use Oban.Pro.Worker
+  use Oban.Pro.Worker, aliases: [MyApp.UserPurge]

See more in the Worker Aliases section.

v1.5.0-rc.0 — 2024-07-26

Enhancements

  • [Smart] Implement check_available/1 engine callback for faster staging queries.

    The smart engine defines a custom check_available/1 callback that vastly outperforms the Basic implementation on large tables. This table illustrates the difference in a benchmark of 24 queues with an event split of available and completed jobs on a local database with no additional load:

    jobs original optimized boost
    2.4m 107.79ms 0.72ms 149x
    4.8m 172.10ms 1.15ms 149x
    7.2m 242.32ms 4.28ms 56x
    9.6m 309.46ms 7.89ms 39x

    The difference in production may be much greater.

Worker

  • [Worker] Add before_process/1 callback.

    The new callback is applied before process/1 is called, and is able to to modify the job or stop processing entirely. Like after_process, it executes in the job’s process after all internal processing (encryption, structuring) are applied.

  • [Worker] Avoid re-running stages during after_process/1 hooks.

    Stages such as structuring and encryption are only ran once during execution, then the output is reused in hooks.

  • [Worker] Prevent compile time dependencies in worker hooks.

    Explicit worker hooks are now aliased to prevent a compile time dependency. Initial validation for explicit hooks is reduced as a result, but it retains checks for global hooks.

Batch

  • [Batch] Add new callbacks with clearer, batch-specific names.

    The old handle_ names aren’t clear in the context of hybrid job compositions. This introduces new batch_ prefixed callbacks for the Oban.Pro.Batch module. The legacy callbacks are still handled for backward compatibility.

  • [Batch] Add from_workflow/2 for simplified Batch/Worker composition.

    Create a batch from a workflow by augmenting all jobs to also be part of a batch.

  • [Batch] Add cancel_jobs/2 for easy cancellation of an entire batch.

  • [Batch] Add append/2 for extending batches with new jobs.

  • [Batch] Uniformly accept job or batch_id in all functions.

    Now it’s possible to fetch batch jobs from within a process/1 block via a Job struct, or from anywhere in code with the batch_id alone.

Workflow

  • [Workflow] Add get_job/3 for fetching a single dependent job.

    Internally it uses all_jobs/3, but the name and arguments make it clear that this is the way to get a single workflow job.

  • [Workflow] Add cancel_jobs/3 for easy cancellation of an entire workflow.

    The new helper function simplifies cancelling full or partial workflows using the same query logic behind all_jobs and stream_jobs.

  • [Workflow] Drop optional dependency on libgraph

    The libgraph dependency hasn’t had a new release in years and is no longer necessary, we can build the same simple graph with the digraph module available in OTP.

  • [Workflow] Add to_mermaid/1 for mermaid graph output.

    Now that we’re using digraph we can also take control of rendering our own output, such as mermaid.

  • [Workflow] Uniformly accept job or workflow_id in all functions.

    Now it’s possible to fetch workflow jobs from within a process/1 block via a Job struct, or from anywhere in code with the workflow_id alone.

Bug Fixes

  • [Smart] Require a global queue lock with any flush handlers.

    Flush handlers for batches, chains, and workflows must be staggered without overlapping transactions. Without a mutex to stagger fetching the transactions on different nodes can’t see all completed jobs and handlers can misfire.

    This is most apparent with batches, as they don’t have a lifeline to clean up after there’s an issue as with workflows.

  • [DynamicPruner] Use state-specific timestamps when pruning.

    Previously, the most recent timestamp was used rather than the timestamp for the job’s current state.

  • [Workflow] Prevent incorrect workflow_id types by validating options passed to new/1.

Deprecations

  • [Oban.Pro.Workers.Batch] The worker is deprecated in favor of composing with the new Oban.Pro.Batch module.

  • [Oban.Pro.Workers.Chain] The worker is deprecated in favor of composing with the new chain option in Oban.Pro.Worker.

  • [Oban.Pro.Workers.Workflow] The worker is deprecated in favor of composing with the new Oban.Pro.Workflow module.

4 Likes