Oban Pro (Extensions, Workers, and Plugins that expand Oban) — Info, News & Updates Thread

Oban.Pro is a collection of extensions, workers, and plugins that expand what Oban is capable of while making complex workflows possible.


Oban Pro v1.2 is out!

This release includes expanded priorities, defining replace in workers, notifier changes, and validation improvements which require Oban v2.17+

:card_file_box: DynamicPartitioner

The DynamicPartitioner plugin adds partitioned table support for optimized query performance, minimal database bloat, and efficiently pruned historic jobs. Partitioning can minimize database bloat for tables of any size, but it’s ideally suited for high throughput applications that run millions of jobs a week.

A synthetic benchmark of 7m jobs, (1m a day for 7 days), showed the following performance improvements:

  • 40% Smaller tables (6,281MB to 4,121MB)
  • 95% Less bloat after vacuum (4,625MB to 230MB)
  • 2.5x Faster vacuuming (28,638ms to 11,529ms)
  • 2.1x Faster reindexing (6,248ms to 2,939ms)
  • 1000x Faster job pruning (51,170ms to 49ms)

Systems with higher throughput may see more dramatic performance and bloat improvements.

See the DynamicPartitioner docs for more details and instructions on switching over.

:alarm_clock: Scheduling Guarantees

The DynamicCron plugin gained an option to improve reliability with schedule guarantees. Depending on an application’s restart timing or as the result of unexpected downtime, a job’s scheduled insert period can be missed. To compensate, you can enable guaranteed mode for the entire crontab or on an individual bases.

In guaranteed mode, jobs inserts are attempted every minute, but with uniqueness calculated to prevent any overlaps. Here’s an example of enabling guaranteed insertion for the entire crontab:

[
  guaranteed: true,
  crontab: [
    {"@hourly", MyApp.HourlyJob},
    {"@daily", MyApp.DailyJob},
    {"@monthly", MyApp.DailyJob, guaranteed: false},
  ]
]

See DynamicCron’s section on scheduling guarantees for use cases, configuration, and potential caveats.

:identification_card: UUIDv7 for Binary Ids

Now producers, batches, and workflows all use time-ordered UUIDv7 values for binary ids. UUIDv7 has natural ordering while retaining UUID compatibility, making it a direct replacement with sortability and more efficient indexing.

This change uses a pure Elixir implementation and doesn’t introduce any dependencies or new Postgres requirements.

Bug Fixes

  • [DynamicQueues] Drop unsupported :only option before calling Oban.scale_queue
10 Likes

Oban Pro v1.3 is out!

This release is entirely dedicated to Smart engine optimizations, from slashing queue transactions to boosting bulk insert performance.

:postbox: Async Tracking

Rather than synchronously recording updates (acks) in a separate transaction after jobs execute, the Smart engine now bundles acks together to minimize transactions and reduce load on the database.

Async tracking, combined with the other enhancements detailed below, showed the following improvements over the previous Smart engine when executing 1,000 jobs with concurrency set to 20:

  • Transactions reduced by 97% (1,501 to 51)
  • Queries reduced by 94% (3,153 to 203)

That means less Ecto pool contention, fewer transactions, fewer queries, and fewer writes to the oban_producers table! There are similar, albeit less flashy, improvements over the Basic engine as well.

Notes and Implementation Details

  • Acks are stored centrally per queue and flushed with the next transaction using a lock-free mechanism that never drops an operation.

  • Acks are grouped and executed as a single query whenever possible. This is most visible in high throughput queues.

  • Acks are preserved across transactions to guarantee nothing is lost in the event of a rollback or an exception.

  • Acks are flushed on shutdown and when the queue is paused to ensure data remains as consistent as the previous synchronous version.

  • Acking is synchronous in testing mode, when draining jobs, and when explicitly enabled by a flag provided to the queue.

See the Smart engine’s async tracking section for more details and instructions on how to selectively opt out of async mode.

v1.3.0 — 2024-01-16

Enhancements

  • [Smart] Skip extra query to “touch” the producer when acking without global or rate limiting enabled. This change reduces overall producer updates from 1 per job to 2 per minute for standard queues.

  • [Smart] Avoid refetching the local producer’s data when fetching new jobs.

    Async acking is centralized through the producer, which guarantees global and rate tracking data is up-to-date before fetching without an additional read.

  • [Smart] Optimize job insertion with fewer iterations.

    Iterating through job changesets as a map/reduce with fewer conversions improves inserting 1k jobs by 10% while reducing overall memory by 9%.

  • [Smart] Efficiently count changesets during insert_all.

    Prevent duplicate iterations through changesets to count unique jobs. Iterating through them once to accumulate multiple counts improved insertion by 3% and reduced overall memory by 2%.

  • [Smart] Acking cancelled jobs is done with a single operation and limited to queues with global limiting.

Bug Fixes

  • [Smart] Always merge acked meta updates dynamically.

    All meta-updating queries are dynamically merged with existing meta. This prevents recorded jobs from clobbering other meta updates made while the job executed.

  • [Smart] Safely extract producer uuid from attempted_by with more than two elements

  • [DynamicCron] Preserve stored opts such as args, priority, etc., on reboot when no new opts are set.

  • [Relay] Skip attempting relay notifications when the associated Oban pid isn’t alive.

1 Like

Oban Pro v1.4 is out!

:bullettrain_front: Streamlined Workflows

Workflows now use automatic scheduling to run workflow jobs in order, without any polling or
snoozing. Jobs with upstream dependencies are automatically marked as on_hold and scheduled far
into the future. After the upstream dependency executes they’re made available to run, with
consideration for retries, cancellations, and discards.

  • An optimized, debounced query system uses up to 10x fewer queries to execute slower workflows.

  • Queue concurrency limits don’t impact workflow execution, and even complex workflows can quickly
    run in parallel with global_limit: 1 and zero snoozing.

  • Cancelled, deleted, and discarded dependencies are still handled according to their respective
    ignore_* policies.

All of the previous workflow “tuning” options like waiting_limit and waiting_snooze are gone,
as they’re not needed to optimize workflow execution. Finally, older “in flight” workflows will
still run with the legacy polling mechanism to ensure backwards compatibility.

:timer_clock: Job Execution Deadlines

Jobs that shouldn’t run after some period of time can be marked with a deadline. After the
deadline has passed the job will be pre-emptively cancelled on its next run, or optionally
during its next run if desired.

defmodule DeadlinedWorker do
  use Oban.Pro.Worker, deadline: {1, :hour}

  @impl true
  def process(%Job{args: args}) do
    # If this doesn't run within an hour, it's cancelled
  end
end

Deadlines may be set at runtime as well:

DeadlinedWorker.new(args, deadline: {30, :minutes})

In either case, the deadline is always relative and computed at runtime. That also allows the
deadline to consider scheduling—a job scheduled to run 1 hour from now with a 1 hour deadline will
expire 2 hours in the future.

:mage: Automatic Crontab Syncing

Synchronizing persisted entries manually required two deploys: one to flag it with deleted: true
and another to clean up the entry entirely. That extra step isn’t ideal for applications that
don’t insert or delete jobs at runtime.

To delete entries that are no longer listed in the crontab automatically set the sync_mode
option to :automatic:

[
  sync_mode: :automatic,
  crontab: [
    {"0 * * * *", MyApp.BasicJob},
    {"0 0 * * *", MyApp.OtherJob}
  ]
]

To remove unwanted entries, simply delete them from the crontab:

 crontab: [
   {"0 * * * *", MyApp.BasicJob},
-  {"0 0 * * *", MyApp.OtherJob}
 ]

With :automatic sync, the entry for MyApp.OtherJob will be deleted on the next deployment.

:mending_heart::star2: Upgrading to v1.4 {: .warning}

Changes to DynamicCron require a migration to add the new insertions column. You must
re-run
the Oban.Pro.Migrations.DynamicCron migration when upgrading.

defmodule MyApp.Repo.Migrations.UpdateObanCron do
  use Ecto.Migration

  defdelegate change, to: Oban.Pro.Migrations.DynamicCron
end

v1.4.0 — 2024-03-21

Enhancements

  • [DynamicCron] Add :sync_mode for automatic entry management.

    Most users expect that when they delete an entry from the crontab it won’t keep running after
    the next deploy. A new sync_mode option allows selecting between automatic and manual
    entry management.

    In addition, this moves cron entry management into the evaluate handler. Inserting and deleting
    at runtime can’t leverage leadership, because in rolling deployments the new nodes are never
    the leader.

  • [DynamicCron] Use recorded job insertion timestamps for guaranteed cron.

    A new field on the oban_crons table records each time a cron job is inserted up to a
    configurable limit. That field is then used for guaranteed cron jobs, and optionally for
    historic inspection beyond a job’s retention period.

  • [DynamicCron] Stop adding a unique option when inserting jobs, regardless of the guaranteed
    option.

    There’s no need for uniqueness checks now that insertions are tracked for each entry. Inserting
    without uniqueness is significantly faster.

  • [DynamicCron] Inject cron information into scheduled job meta.

    This change mirrors the addition of cron: true and cron_expr: expression added to Oban’s
    Cron in order to make cron jobs easier to identify and report on through tools like Sentry.

  • [Worker] Add :deadline option for auto cancelling jobs

    Jobs that shouldn’t run after some period of time can be marked with a deadline. After the
    deadline has passed the job will be pre-emptively cancelled on its next run, or optionally
    during its next run if desired.

  • [Workflow] Invert workflow execution to avoid bottlenecks caused by polling and snoozing.

    Workflow jobs no longer poll or wait for upstream dependencies while executing. Instead, jobs with dependencies are “held” until they’re made available by a facilitator function. This nverted flow makes fewer queries, doesn’t clog queues with jobs that aren’t ready, avoids snoozing, and is generally more efficient.

  • [Workflow] Expose functions and direct callback docs to function docs.

    Most workflow functions aren’t true callbacks, and shouldn’t be overwritten. Now all callbacks point to the public function they wrap. Exposing workflow functions makes it easier to find and link to documentation.

Bug Fixes

  • [DynamicCron] Don’t consider the node rebooted until it is leader

    With rolling deploys it is frequent that a node isn’t the leader the first time it evaluates. However, the :rebooted flag was set to true on the first run, which prevented reboots from being inserted when the node ultimately acquired leadership.

  • [DynamicQueues] Accept streamline partition syntax for global_limit and rate_limit options.

    DynamicQueues didn’t normalize the newer partition syntax before validation. This was an oversight, and a sign that validation had drifted between the Producer and Queue schemas. Now schemas use the same changesets to ensure compatibility.

  • [Smart] Handle partitioning by :worker and :args regardless of order.

    The docs implied partitioning by worker and args was possible, but there wasn’t a clause that handled any order correctly.

  • [Smart] Explicitly cast transactional advisory lock prefix to integer.

    Postgres 16.1 may throw an error that it’s unable to determine the argument type while taking bulk unique locks.

  • [Smart] Preserve recorded values between retries when sync acking failed.

    Acking a recorded value for a batch or workflow is synchronous, and a crash or timeout failure
    would lose the recorded value on subsequent attempts. Now the value is persisted between retries
    to ensure values are always recorded.

  • [Smart] Revert “Force materialized CTE in smart fetch query”.

    Forcing a materialized CTE in the fetch query was added for reliability, but it can cause performance regressions under heavy workloads.

  • [Testing] Use configured queues when ensuring all started.

    Starting a supervised Oban in manual mode with tests specified would fail because in :manual testing mode the queues option is overridden to be empty.

8 Likes