Oban — Reliable and Observable Job Processing

Fair enough and a good thing to remember! I was very sad when my Redis bandwidth maxed out due to an application specific issue once.

1 Like

Oban v0.12.1 is out with a couple of helpful bug fixes around running cronjobs, some testing improvements, and a bit of documentation about what oban_beats is for. Thanks to everybody that contributed :yellow_heart:

From the CHANGELOG

Fixed

  • [Oban.Worker] Deep merge unique options between the worker and custom params. Previously the unique options passed to Worker.new/2 would completely override the options stored by use Oban.Worker. This was primarily an issue for crontab jobs, where the period is always passed by the scheduler. @BrightEyesDavid

Changed

  • [Oban] Allow setting crontab: false or crontab: nil to disable scheduling. This matches the queues behavior and makes it possible to override the default configuration within each environment, i.e. when testing.

  • [Oban.Testing] Better error message for Oban.Testing.assert_enqueued/2 @axelson

6 Likes

I haven’t tried yet v0.12.1 (I’m on v0.11.1) but just wanted to say this a great lib and it has made some workflows really smooth and straightforward. Thanks for sharing!

I would be happy to contribute, but I didn’t see any issues on the repo (a good sign ?).
I’ve had the need to use some “locks” and I solved it with an Ecto.Transaction that first tries to insert a “lock” record (where the primary key is the lock term - a string identifier) then inserts the actual job, the lock record would be deleted after the job has run (either successfully or maxed out the retries), or optionally only deleted if the job as failed (like saying, this has been run for this specific lock already).

It’s similar to the unique queue item, but by my reading the unique option always requires a period, sometimes though you simply want to be sure that if a given type of job is already queued that it shouldn’t queue another one until the lock is free (and using a lock key as a string allows you to interpolate for instance a date, or wtv in it). I think it can help ensuring no other job is scheduled in case of failure & restart of a process responsible for enqueuing, a node start/restart, etc. Or perhaps not. If you think this could be useful I can try to work on a PR and discuss it (on github)

1 Like

It should be a good sign! There are some features and changes that I have in a local TODO which aren’t listed as open issues. I’ll write them up and see if anybody wants to work on them to contribute.

If I understand your use case correctly then you can do this currently with unique options. The period option is required, but you can set it to an arbitrarily high number. Additionally, by changing the states to only include available, scheduled, retryable and executing you will prevent running a duplicate job but can freely run the same job repeatedly provided that it completes.

defmodule MyApp.Worker do
  @one_week 60 * 60 * 24 * 7

  use Oban.Worker, unique: [
    period: @one_week,
    states: ~w(available scheduled executing retryable)a,
    fields: ~w(queue worker)a
  ]

  # ...
end

The upcoming release also has the addition of xact lock during insert, which makes uniqueness guaranteed even during concurrent inserts.

4 Likes

So my use case was basically scheduling emails for every 1st day of a month, and I wanted it to be able to whenever you start an application (imagine you just deployed the first time) and it would create the job for doing the actual delivery to sendgrid with a scheduled at. The problem was the sendgrid email has to be sent close to the sending deadline because theoretically users might had sign up anytime before the scheduling and you would still want them to receive the email no matter what, so fetching what users to deliver to has to be postponed to an hour or 2 of the email going off.

And then the sendgrid request had to account for any arbitrary number of delivery targets and in order to prevent a gigantic http request (because of the data points being sent for the templates being partially unique and partially shared but the shared parts still having to be included in each data_for_template item), it has to batch the delivery.

Then months have this weird peculiarity that they have different days length. So if a job was scheduled at the first of Jan, to be delivered on the first of Feb, there would be a uniqueness period of 31 days, but then of Feb, that period would be different, and I would need to convolute the logic in order to guarantee working periods I guess? It seemed complex to do (or more complex than I wanted).

So I made that transaction thing, creating a table for the locks with just a primary key which is the lock identifier itself.
Basically

|> Multi.insert(:job_lock, %Lock{id: "lock_some_identifier_01_01_2020"})
|> Multi.run(:job, fn(_, _) -> do_oban_insert(params) end)
|> Multi.insert(:job_lock_to_keep, %Lock{id: "scheduled_some_identifier_01_01_2020"})
|> Multi.delete(:delete_job_lock.....)
|> Repo.transaction()

Probably I could rely on just leaving the first “job_lock” and not doing the second one afterwards.
The do_oban_isert schedules the job to be run on wtv time.
Then the job worker picks it up and tries to deliver the thing in batches, for each batch it creates another two jobs in a transaction, one to fetch the next batch and other to deliver for the current batch, so that you don’t have mid batch failures restarting the whole thing and duplicate emails being sent, etc, if the job for the batch fails being inserted, only the insertion of that job is repeated, etc.

I might be wrong but this ended up appearing to me to be the best way to guarantee no duplication of email sending without having to fiddle with time periods, since the lock identifiers are reproducible at any point in time.

I’ve read the xact lock link, but how does that work? Is it a lock that is not temporary?

2 Likes

This sounds like a perfect use case for a CRON job. You can schedule the job to run at the first of the month at a particular time using a specific timezone (on master):

config :my_app, Oban,
  repo: MyApp.Repo,
  timezone: "Europe/Lisbon",
  crontab: [
    {"0 8 1 * *", MyApp.MailingWorker}
  ]

That will kick off the email at 8am on the 1st of each month.

I’d make the scheduled job only responsible for generating smaller batch jobs which do the actual sending. You get isolation for failure as well as horizontal scaling:

defmodule MyApp.MailingWorker do
  use Oban.Worker, queue: :mailer

  import Ecto.Query

  @batch_size 100

  @impl Oban.Worker
  def perform(%{"emails" => emails}, _job) do
    # do the batch email delivery
  end

  def perform(_args, _job) do
    MyApp.User
    |> select([u], u.email)
    |> MyApp.Repo.all()
    |> Enum.chunk_every(@batch_size)
    |> Enum.map(fn emails -> new(%{emails: emails}) end)
    |> Oban.insert_all()
  end
end

The second perform/2 clause is called by the scheduled job and enqueues all of the batches in one go. I made up some schema names and chose an arbitrary batch size of 100 purely for illustration purposes. Anyhow, the scheduled job will succeed or fail atomically.

Each of the batches will hit the first perform/2 clause and do the actual work of delivering your emails. If there is a problem with one of the batches it won’t impact any of the others and it should retry without sending duplicates.

In this case I don’t think you need to specify uniqueness at all.

The xact lock is an advisory lock that only lives as long as the transaction. It is highly temporary and is used to prevent inserting jobs from simultaneous transactions (like when a user clicks a button twice and it hits the server with two separate requests).

2 Likes

Nice addition - does it run an actual crontab or is it a cronlike interface inside oban itself?
I didn’t use crontabs because they need some babysitting/manual scheduling in some edge cases unless you can repeat them every day (interval wtv) in an idempotent way.

I hadn’t thought about the enum chunk and inserting them with insert_all. Was insert_all always available? I think I missed it (I started using oban in the earlier versions and only used the newer versions now). That looks cleaner than my solution.

Ah, yes, for the xact lock it makes sense, but that’s why I wasn’t understanding how it tied back if it was a temporary transaction lock. Thanks for the explanation and samples!

1 Like

It uses a cron-like interface inside of Oban itself.

No, it wasn’t added until a little while ago in v0.9.0. It is a huge improvement over inserting individual jobs one at a time, but it doesn’t support uniqueness.

My pleasure! I hope it makes your situation a bit smoother.

1 Like

Looks like the cron settings are defined as a config option, parsed at boot time.

Is that right?

Is there a way to update cron settings in a running Oban application?

1 Like

The short answer is no, there isn’t any way to do that. You could fetch the current config (Oban.config/0) and then update it to put in the new crontab, but that wouldn’t make its way into the scheduler itself.

The only way to do this currently would be to shut down the existing Oban supervisor and then start a new one with updated config.

What is your use case?

@sorentwo thanks for the helpful info.

Use case: I have a ‘job runner’ Phoenix app similar in concept to Apache Airflow. The app allow end-users to schedule their own jobs, add and remove jobs, tweak the job execution frequency. I’m currently doing this with Quantum, and would like to consolidate dependencies. To make it work on Oban probably I would use a single job-runner module that takes user-specified arguments.

I’m just reading the Crontab.Scheduler source now. To make this work I might need a custom version of the enqueue_jobs function, or maybe I would tweak the Oban.Config genserver to add an update_crontab function.

If people would like an ‘updatable cronjobs’ feature, give a like and if there is critical mass I’ll submit a PR. Otherwise I can hack something together that solves my issue. :wink:

5 Likes

I see. That use case makes a lot of sense. I’m fully in favor of helping consolidate dependencies :slightly_smiling_face:

There are a few barriers in place that make it impossible to modify the scheduler state without some major hacks:

  1. There aren’t any functions to modify the Config after the system starts
  2. If you do modify the config through Agent.update/3, that won’t get into the scheduler instance
  3. If you use :sys.replace_state/2 to reach in and change the config stored in the scheduler process it will only apply on the current node, it won’t scale horizontally

The way to do this properly would be to add top level functions such as Oban.put_cron, Oban.delete_cron, and Oban.update_cron. Those would use notifications to broadcast cron changes and each node would then handle the update locally. That is how other functions like start_queue and stop_queue work.

3 Likes

@sorentwo thanks for the hints I’ll add a new function Oban.update_cron and support to dynamically update Config state. I’ll post updates to the issue #146 on the Oban repo.

1 Like

Today I’m very excited to announce the release of Oban v1.0.0-rc.1. This release has been a long time coming and it packs a lot of new features, performance improvements, stability fixes and updated defaults for pruning and heartbeats. If there aren’t any regressions or breaking changes then I’ll release 1.0.0 in one week.

This release does require a migration, but I crammed as many features as I could into it: priority jobs, tags, a discarded_at timestamp and a massively improved notifications trigger.

Thanks to everybody that contributed and reported issues. :yellow_heart:


From the CHANGELOG:

Migration Required (V8)

This is the first required migration since 0.8.0, released in 09/2019. It brings with it a new column, discarded_at, a streamlined notifications trigger, job prioritiy and job tags.

Added

  • [Oban] Add timezone support for scheduling cronjobs using timezones other than “Etc/UTC”. Using a custom timezone requires a timezone database such as tzdata.

  • [Oban] Add dispatch_cooldown option to configure the minimum time between a producer fetching more jobs to execute.

  • [Oban] Add beats_maxage option to configure how long heartbeat rows are retained in the oban_beats table. Each queue generates one row per second, so rows may accumulate quickly. The default value is now five minutes, down from one hour previously.

  • [Oban.Job] Add discarded_at timestamp to help identify jobs that were discarded and not completed. The timestamp is added by the V8 migration and it is also included in the original create table from V1 as a minor space saving optimization (packing datetime columns together because they use a predictable 4bytes of space).

  • [Oban.Job] Add numerical priority value to control the order of execution for jobs within a queue. The priority can be set between 0 and 3, with 0 being the default and the highest priority.

  • [Oban.Job] Add tags field for arbitrarily organizing associated tags. Tags are a list of strings stored as an array in the database, making them easy to search and filter by.

Changed

  • [Oban] Change the default prune value from :disabled to {:maxlen, 1_000}. Many people don’t change the default until they realize that a lot of jobs are lingering in the database. It is rare that anybody would want to keep all of their jobs forever, so a conservative default is better than :disabled.

  • [Oban] Change oban_beats retention from one hour to five minutes. The value is now configurable, with a default of 300s. The lower bound is 60s because we require one minute of heartbeat activity to rescue orphaned jobs.

  • [Oban.Queue.Producer] Introduce “dispatch cooldown” as a way to debounce repeatedly fetching new jobs. Repeated fetching floods the producer’s message queue and forces the producer to repeatedly fetch one job at a time, which is not especially efficient. Debounced fetching is much more efficient for the producer and the database, increasing maximum jobs/sec throughput so that it scales linearly with a queue’s concurrency settings (up to what the database can handle).

  • [Oban.Query] Discard jobs that have exhausted the maximum attempts rather than rescuing them. This prevents repeatedly attempting a job that has consistently crashed the BEAM.

  • [Oban.Query] Use transactional locks to prevent duplicate inserts without relying on unique constraints in the database. This provides strong unique guarantees without requiring migrations.

Removed

  • [Oban.Notifications] An overhauled and simplified insert trigger no longer emits update notifications. This was largely an internal implementation detail and wasn’t publicly documented, but it does effect the UI.
25 Likes

Incredible, very impressed with the pace of features and general stability of the project, great stuff @sorentwo, thank you!

14 Likes

I added Oban to our product a couple weeks ago and it’s been fantastic. Great job @sorentwo!

5 Likes

Quick question, can the migration be run before upgrading? Basically I want to deploy the update with minimal downtime.

2 Likes

Is there a way to configure Oban for an umbrella app which has three apps: app_db, app_jobs, app_api where app_db has AppDb.Repo, app_jobs is the app dedicated to timed execution and app_api is the app with the Endpoint? The way the example docs describe setting up the supervision tree, this appears impossible.

1 Like

You can deploy the update with no downtime. There are a few features and scenarios to consider:

  • If you use a migrator in your supervision tree you can be sure that the migrations run before oban starts. Be sure that migrator module runs after the Ecto repo is started and before Oban is started.
  • The migration is backward compatible, meaning that running instances of older oban versions can keep running while you run migrations.
  • If there are any issues circuit breakers will repeatedly trip while it waits for the migration to complete. That means you may not process any jobs for 30+ seconds, but it won’t bring down your application.
4 Likes

Sure, that is entirely possible. They are separate apps but they run in the same instances and I imagine there are dependencies within the umbrella (in_umbrella: true). As long as app_jobs depends on app_db you can access the app_db Ecto repo:

config :app_jobs, repo: AppDb.Repo, queues: [default: 10] # other config