Oban — Reliable and Observable Job Processing

Hey @sorentwo !
I have one striking issue regarding jobs execution in multi node environment.
Please, correct me if Im wrong but I assumed that Oban guarantees that single job will be not executed by multiple nodes, i.e evey job will be run only once.
In our case, during cluster startup, each node creates Oban unique job.
We observe that only one job was inserted into oban table (which is as expected) but then we also observe that sometimes multiple nodes executes that job. Hence, we have a job that is run multiple times!
Isn’t Oban’s responsibility to prevent such scenarios with locking mechanism?

1 Like

That shouldn’t be possible. There are various locking mechanisms in play to enforce uniqueness. It handles all of that within the database queries. If there is some situation that makes it possible I definitely want to track it down. Will you open an issue and include some more details (oban version, pg version, worker options, etc)?

1 Like

Sure! I believe it may be some misunderstandment from my side so I leave an issue with tech details and configuration: https://github.com/sorentwo/oban/issues/250

Thanks!

2 Likes

Oban 2.0.0-rc.1 is out today, along with some big news about the introduction of Oban Web+Pro!

Highlights from the Oban CHANGELOG are in the post along with a breakdown of what’s gone into the new Oban Pro. Oban 2.0.0-rc.0 was silently released last week as part of the Web+Pro development process and didn’t get any fanfare :tada:

The changeset for 2.0.0 is massive and seems like too much to drop in this thread, so I’ll leave it at a link to the CHANGELOG.

Please leave any questions about the blog post, changes in Oban 2.0, or the Web+Pro package here. Alternatively, you can find me in #oban on elixir-slack.

13 Likes

Congratulations! :tada:

One question about Batch. Let’s say we want to run a batch where we retry each subtask N times, then give up if it still fails. At the end of the batch, we’d send an email report showing which tasks succeeded and which failed.

Would it make sense to use the discarded state and Batch callbacks to implement this? It sounds like we would need an equivalent to handle_completed that’s fired once all tasks are either completed or permanently discarded - I get the impression it can’t be done with the current callbacks.

Or would it be better to have a catch-all within the task (or do the actual work in a separate process), so that it “completes” (from Oban’s POV) no matter what, and store the fact that it failed ourselves in DB?

1 Like

You’re correct, that can’t be done with the current callbacks. The best way to accomplish what you’re requesting is with a different callback, say handle_exhausted, which is fires when all jobs are either completed or discarded. I can see the utility in an additional callback here because there isn’t anything else to hook into at that point. I’ll add it as a feature request.

Note that depending on how aggressively you are pruning discarded jobs, or how long you’re waiting before discarding, that callback may never fire.

1 Like

Oban 2.0.0.rc-2 is now available. It has some important bug fixes related to stability and a few breaking changes / features that enable local only pause/resume/scale operations .

Here is the complete CHANGELOG on GitHub, or on HexDocs if you prefer.

5 Likes

Pretty interesting read that CHANGELOG, liked it! :023:

I’d like to give you an idea for a blog article: how do you monetise a project like this? And how do you enforce the paid licenses don’t get pirated? I am behind on this topic (last I read about it was like 10 years ago) and I’d be very curious to read about it.

Do you plan on writing an article like that?

1 Like

I suppose that’s the trick, isn’t it? I’m trying to figure it out as we go while drawing inspiration from others that have gone before. It’s hard to write a blog post when you don’t feel like you have the answer :slightly_smiling_face:

I hadn’t, but I’ll certainly keep it in mind :+1:

3 Likes

Hi ,

I am evaluating Oban for job processing which would publish messages to a Kafka broker , I have a doubt and would be great if anyone of you could answer it.

So in prod my application would be deployed in 3 nodes , where the Oban workers will run , there is a single postgres DB though from where the Workers would read/write from through the Oban’s Repo.

how does it handle the multi-node scenario as in is there any chance of a conflict where all the workers are reading the same job since the DB and the Repo is common ?

If yes is there any way to handle/tackle/configure it so that theres no clash among the workers ?

Oban guarantees that a single job will only ever be run by a single worker at a time.

1 Like

Oban 2.0 final is out, along with ObanWeb 2.0 and ObanPro 0.3. I haven’t been especially vigilant about announcing releases in this thread because so much has changed since Oban 1.2.

Check out the full CHANGELOG, take a look at the official docs on hex, or browse the broadly expanded guides.

Thanks for all of the support and feedback along this journey. Please ask any questions you may have about the changes, or share your experiences upgrading :rocket: :tada: :yellow_heart:

7 Likes

Hey @sorentwo congrats on the release. Is there any chance you could compile a list of 1.0 vs 2.0 changes in the changelog? The reason I ask is that some of the items there seem to track diffs between 2.0 pre release versions and it isn’t clear whether all of them are still applicable. One that comes to mind specifically are the pruner changes. One set of changes were introduced in rc.0, and then some rolled back in rc.3, and it’s no longer clear what differences exist between 1.0 and 2.0 final.

4 Likes

Absolutely! Here’s a condensed and comprehensive list of the breaking changes, changes, additions, fixes and removals between 1.2 and 2.0:

Breaking Changes

  • [Oban.Worker] The perform/2 callback is replaced with perform/1, where the only argument is an Oban.Job struct. This unifies the interface for all Oban.Worker callbacks and helps to eliminate confusion around pattern matching on arguments.

    To migrate change all worker definitions from accepting an args map and a job struct:

    def perform(%{"id" => id}, _job), do: IO.inspect(id)
    

    To accept a single job struct and match on the args key directly:

    def perform(%Job{args: %{"id" => id}}), do: IO.inspect(id)
    
  • [Oban.Worker] The backoff/1 callback now expects a job struct instead of an integer. That allows applications to finely control backoff based on more than just the current attempt number. Use of backoff/1 with an integer is no longer supported.

    To migrate change any worker definitions that used a raw attempt like this:

    def backoff(attempt), do: attempt * 60
    

    To match on a job struct instead, like this:

    def backoff(%Job{attempt: attempt}), do: attempt * 60
    
  • [Oban.Config] The :verbose setting is renamed to :log. The setting started off as a simple boolean, but it has morphed to align with the log values accepted by calls to Ecto.Repo.

    To migrate, replace any :verbose declarations:

    config :my_app, Oban,
      verbose: false,
      ...
    

    With use of :log instead:

    config :my_app, Oban,
      log: false,
      ...
    
  • [Oban] The interface for start_queue/3 is replaced with start_queue/2 and stop_queue/2 no longer accepts a queue name as the second argument. Instead, both functions now accept a keyword list of options. This enables the new local_only flag, which allows you to dynamically start and stop queues only for the local node.

    Where you previously called start_queue/2,3 or stop_queue/2 like this:

    :ok = Oban.start_queue(:myqueue, 10)
    :ok = Oban.stop_queue(:myqueue)
    

    You’ll now them with options, like this:

    :ok = Oban.start_queue(queue: :myqueue, limit: 10)
    :ok = Oban.stop_queue(queue: :myqueue)
    

    Or, to only control the queue locally:

    :ok = Oban.start_queue(queue: :myqueue, limit: 10, local_only: true)
    :ok = Oban.stop_queue(queue: :myqueue, local_only: true)
    
  • [Oban] Replace drain_queue/3 with drain_queue/2, which now has an interface consistent with the other *_queue/2 operations.

    Where you previously called drain_queue/2,3 like this:

    Oban.drain_queue(:myqueue, with_safety: false)
    

    You’ll now it with options, like this:

    Oban.drain_queue(queue: :myqueue, with_safety: false)
    
  • [Oban] The interface for pause_queue/2, resume_queue/2 and scale_queue/3 now matches the recently changed start_queue/2 and stop_queue/2. All queue manipulation functions now have a consistent interface, including the ability to work in :local_only mode.

  • [Oban.Telemetry] The format for telemetry events has changed to match the new telemetry span convention. This listing maps the old event to the new one:

    • [:oban, :started] -> [:oban, :job, :start]
    • [:oban, :success] -> [:oban, :job, :stop]
    • [:oban, :failure] -> [:oban, :job, :exception]
    • [:oban, :trip_circuit] -> [:oban, :circuit, :trip]
    • [:oban, :open_circuit] -> [:oban, :circuit, :open]

    In addition, for exceptions the stacktrace meta key has changed from :stack to the standardized :stacktrace.

  • [Oban.Beat] Pulse tracking and periodic job rescue are no longer available. Pulse tracking and rescuing will be handled by an external plugin. This is primarily an implementation detail, but it means that jobs may be left in the executing state after a crash or forced shutdown.

    Remove any :beats_maxage, :rescue_after or :rescue_interval settings from your config.

  • [Oban.Plugins.Pruner] Built in pruning is handled by the new plugin system. A fixed period pruning module is enabled as a default plugin. The plugin allows light configuration through a max_age value. For customizable per-queue, per-worker or per-state pruning see the DynamicPruner available in Oban Pro.

    Remove any :prune, :prune_interval or prune_limit settings from your config. To disable the pruning plugin in test mode set plugins: false instead.

    Replace any use of :prune, :prune_interval or :prune_limit in your config and pass a max_age value to the plugin:

    config :my_app, Oban,
      plugins: [{Oban.Plugins.Pruner, max_age: 60}]
      ...
    

Fixed

  • [Oban.Scheduler] Ensure isolation between transaction locks in different prefixes. A node with multiple prefix-isolated instances (i.e. “public” and “private”) would always attempt to schedule cron jobs at the same moment. The first scheduler would acquire a lock and block out the second, preventing the second scheduler from ever scheduling jobs.

  • [Oban.Query] Correctly prefix unprepared unique queries. Unique queries always targeted the “public” prefix, which either caused incorrect results when there were both “public” and an alternate prefix. In situations where there wasn’t a public oban_jobs table at all it would cause cryptic transaction errors.

  • [Oban.Query] Wrap all job fetching in an explicit transaction to enforce FOR UPDATE SKIP LOCKED semantics. Prior to this it was possible to run the same job at the same time on multiple nodes.

  • [Oban.Crontab] Fix weekday matching for Sunday, which is represented as 0 in crontabs.

  • [Oban.Crontab.Cron] Do not raise an ArgumentError exception when the crontab configuration includes a step of 1, which is a valid step value.

  • [Oban.Breaker] Prevent connection bomb when the Notifier experiences repeated disconnections.

  • [Oban.Telemetry] Correctly record timings using native time units, but log them using microseconds. Previously they used a mixture of native and microseconds, which yielded inconsistent values.

  • [Oban.Telemetry] Stop logging the :error value for circuit trip events. The error is a struct that isn’t JSON encodable. We include the normalized Postgrex / DBConnection message already, so the error is redundant.

Changed

  • [Oban.Notifier] Make the module public and clean up the primary function interfaces. Listening for and delivering notifications is simplified and no longer requires macros for pattern matching.

    Notifier dispatching performance is slightly improved as well. It is now a no-op if no processes are listening to a notification’s channel.

  • [Oban.Query] The completed_at timestamp is no longer set for failed jobs, whether they are put in the discarded or retryable state. However, the information is still available and is recorded in the errors array as the at value with the error for that attempt.

    This corrects a long standing inconsistency between discarding a job manually or automatically when it exhausts retries.

  • [Oban.Producer] Stop dispatching jobs immediately on queue startup. Instead, only dispatch on the first poll. This makes it possible to send the producer a message or allow sandboxed connection access before the initial dispatch.

  • [Oban.Worker] Limit default backoff calculations to 20 attempts, or roughly 24 days. The change addresses an issue with snoozing, which can increase a job’s attempts into the hundreds or thousands. In this situation the algorithm calculates the backoff using a ratio of attempts to max attempts, but is still limited to roughly 24 days.

Added

  • [Oban.Worker] Support returning {:snooze, seconds} from perform/1 to re-schedule a job some number of seconds in the future. This is useful for recycling jobs that aren’t ready to run yet, e.g. because of rate limiting.

  • [Oban.Worker] Support returning :discard from perform/1 to immediately discard a job. This is useful when a job encounters an error that won’t resolve with time, e.g. invalid arguments or a missing record.

  • [Oban.Job] Introduce a virtual unsaved_error field, which is populated with an error map after failed execution. The unsaved_error field is set before any calls to the worker’s backoff/1 callback, allowing workers to calculate a custom backoff depending on the error that failed the job.

  • [Oban.Worker] Add :infinity option for unique period.

  • [Oban] Bubble up errors and exits when draining queues by passing with_safety: false as an option to Oban.drain_queue/3.

  • [Oban] Add Oban.cancel_job/2 for safely discarding scheduled jobs or killing executing jobs. This deprecates kill_job/2, which isn’t as flexible.

  • [Oban.Telemetry] Add span/3 for reporting normalized :start, :stop and :exception events with timing information.

  • [Oban.Telemetry] Include the configured prefix in all event metadata. This makes it possible to identify which schema prefix a job ran with, which is useful for differentiating errors in a multi-tenant system.

  • [Oban.Telemetry] Include queue_time as a measurement with stop and exception events. This is a measurement in milliseconds of the amount of time between when a job was scheduled to run and when it was last attempted.

  • [Oban.Testing] Add perform_job/2,3 helper to automate validating, normalizing and performing jobs while unit testing. This is now the preferred way to unit test workers.

    To update your tests replace any calls to perform/1,2 with the new Oban.Testing.perform_job/2,3 helper:

    defmodule MyApp.WorkerTest do
      use MyApp.DataCase, async: true
    
      use Oban.Testing, repo: MyApp.Repo
    
      alias MyApp.Worker
    
      test "doing business in my worker" do
        assert :ok = perform_job(Worker, %{id: 1})
      end
    end
    

    The perform_job/2,3 helper will verify the worker, the arguments and any provided options. It will then verify that your worker returns a valid result and return the value for you to assert on.

  • [Oban.Crontab] Add support for non-standard expressions such as @daily, @hourly, @midnight, and @reboot

  • [Oban.Crontab] Add support for using step values in conjunction with ranges, enabling expressions like 10-30/2, 15-45/3, etc.

  • [Oban.Telemetry] Include job queue_time in the default logger output.

  • [Oban.Telemetry] Add new :producer events for descheduling and dispatching jobs from queue producers.

Removed

  • [Oban] Removed Oban.kill_job/2, which wasn’t as flexible as Oban.cancel_job/2. Use Oban.cancel_job/2 instead to safely discard scheduled jobs or killing executing jobs.
11 Likes

In light of how expansive the 2.0 CHANGELOG is I’ve published an upgrade guide to walk you through the important / breaking changes. If you find any issues or anything that’s missing please let me know.

9 Likes

Oban v2.1.0 is out with a few small bug fixes and some great ease-of-use improvements :tada:

As always the full CHANGELOG is avilable, and here are some highlights:

Changed

  • Wrap {:error, reason}, {:discard, reason}, :timeout and crash errors in proper exception structs (Oban.PerformError, Oban.TimeoutError and Oban.CrashError respectively). This provides better error grouping and, most importantly, fixes error reporting for Sentry >= 8.0. Thanks to @anthonator for the issue.

  • No more misleading stacktraces when perform/1 returns an error, discard, or timeout. The stacktrace always came from the executor module and didn’t contain any information from the worker module itself. To get a useful stacktrace you must raise or crash the job process.

Added

  • Discard jobs with a reason using a {:discard, reason} tuple. This behaves similarly to an error tuple, except that the job is discarded and a [:oban, :job, :stop] telemetry event is emitted rather than an :exception event. Thanks to @jc00ke for the issue.

  • It is now possible to override settings like :poll_interval at the queue level. This allows finer grained control for busy queues and paves the way for some upcoming custom producers through Oban Pro.

  • Add support for sub-arg uniqueness with a new :keys option. This allows you to only consider some of the keys in args when comparing them to historic jobs for uniqueness. It is valuable in situations where you add other metadata to job args but still need the job to be considered unique. For example

    # Args from an existing job
    %{from: "somebody", url: "https://a.co"}
    
    # Args from a new job that should be considered duplicate because they share a `url`.
    %{from: "somebody-else", url: "https://a.co"}
    

    For more details see the original issue.

8 Likes

Oban v2.2.0 is now available! It includes some minor bug fixes and several wonderful features from contributors :yellow_heart:. There are some large invisible architectural changes in this release that will require subscribers to upgrade to Oban Web v2.2.2 and Oban Pro v0.4.1.

Here are some highlights and attributions from the CHANGELOG:

Added

  • Replace local dynamically composed names with a registry. This dramatically simplifies locating nested children, avoids unnecessary atom creation at runtime and improves the performance of config lookups. Thanks to @sasajuric for all the hard work that went into making this change.

  • The new Oban.Repo module wraps interactions with underlying Ecto repos. This ensures consistent prefix and log level handling, while also adding full dynamic repo support. Also contributed by @sasajuric.

  • Augment the unique keys option with replace_args, which allows enqueuing a unique job and replacing the args subsequently. For example, given a job with these args:

    %{some_value: 1, id: 123}
    

    Attempting to insert a new job:

    %{some_value: 2, id: 123}
    |> MyJob.new(schedule_in: 10, replace_args: true unique: [keys: [:id]])
    |> Oban.insert()
    

    Will result in a single job with the args:

    %{some_value: 2, id: 123}
    

    Thanks to @Gazler for the feature suggestion and the PR itself.

Changed

  • The default backoff algorithm now includes a small amount of jitter. The jitter helps prevent jobs that fail simultaneously from repeatedly retrying together. Thanks to @coladarci for the feature request.
9 Likes

Firstly I would like to thank all the contributor of this project. I have been using it for a while and it’s working great.

I have couple of quick questions to ask which are

  1. I see that the primary key of the job table is bigint. My question is that what if I created jobs so much that eventually ran out of the bigint's valid range. Does Oban provide a mechanism to resolve this issue yet?. Currently I am using customer pruner to remove jobs from the db already but I would like to know if I ran out the valid bigint to create a primary key what could happen?
  2. currently I am using oban on the distributed nodes. I am having issues to set the cron, to uniquely run a single long running task across nodes, And I want to make sure that until a job is completed the new job should not be scheduled, I am using oban cronjob machanism to do this.

This is what my job module looks like ->

# My Job Module
defmodule MyApp.Job do
  use Oban.Worker,
    queue: :worker1,
    unique: [
      fields: [:args],
      states: [:available, :scheduled, :executing],
      period: 60
    ]
 
  def perform(_) do
  end
end

# My cron configuration in config
config :my_app, Oban,
  repo: My.Repo,
  queues: [worker1: 1],
  crontab: [
    {"* * * * *", MyApp.Job}
  ]

With this configuration, I could see that new job is never scheduled until the current job is finished. But I am seeing that the new job is created before a time diff of 1 minute. One more thing to note is that this job could be running for small time or it could run for serveral minutes. I want to guarantee two things -> 1. only 1 job can be run at a time on distributed nodes, and 2. I want to gurantee that a new job should not be created before 1 minute

Thanks
Suracheth

The upper bound of a PG bigint/bigserial is 9,223,372,036,854,775,807. That will last a really long time—if you are running 5 million jobs a day it would take 5,053,902,485 years to exhaust exhaust the limit. If, somehow, you’re running enough jobs that this is a practical concern you can reset the pkey back to 1:

ALTER SEQUENCE oban_jobs_id_seq RESTART WITH 1

As long as you are pruning jobs and the rate of inserting new jobs won’t collide with existing jobs, this is entirely safe to do. Practically, you’ll never need to worry about it.

So long as you don’t override the unique :stats this is the default behavior. If the job is taking more than a minute, and it is still executing, a new job won’t be inserted. The cron scheduler does not execute jobs inline, instead it inserts a single job across all distributed nodes, and that job will only run on a single node.

2 Likes

@sorentwo thanks for your reply. so for the my above unique job case. if i do not override the :states, i should capture what i needed.

My intention of overriding startes above is that i don’t want to discard and do not count for completed jobs for checking uniqueness.