Oban — Reliable and Observable Job Processing

As of yesterday there is: Jobs • Oban

It is definitely a work in progress with a lengthy list of things to fix and add. Now that the underlying db structures are stable and phoenix live view has been officially released I can make progress!

Note that this is something I’m hoping to control access to, so the GH repo isn’t public.

8 Likes

This question inspired me to look at using batches with Oban. I’ve finally posted a recipe on implementing batches with Oban, without any built-in support.

Oban Recipes Part 5: Batch Jobs

6 Likes

Thanks for the article! You have a small typo in the Batching Jobs for Monitoring section:

At lease one popular background job processor calls these groups “batches”, and so we’ll adopt that term here as we build it out with Oban.

1 Like

@sorentwo Is there a way to define priority within a queue? For instance I’d like to be able to create large amounts of batch jobs without delaying jobs created by an end-user. So my thought was to have a numerical priority indicator I could set which would allow user jobs to jump to the front of the queue if it’s filled with batch jobs with a lower priority.

1 Like

[moving my comment from the other thread]

In the “batch jobs” recipe, if say the final couple jobs go through final_batch_job? at the same time, wouldn’t they both get false and complete without inserting the callback job?

1 Like

Nicely spotted, thanks! I’ve fixed it.

No, there isn’t a way to define priorities without making changes to Oban itself. Execution order is based on when a job was inserted/scheduled, which would conflict with priorities. It may be possible, but I haven’t put any effort into it.

I recommend defining separate queues for various priorities. The queues are entirely isolated, so a backup for slowdown in one queue won’t effect the others.

Quite right! The post has been updated with a few improvements to prevent those types of race conditions:

  1. Put the final batch job check in a task with a slight delay
  2. Make the job unique for a short period to prevent duplicate callbacks (not 100%, but good enough for most usage)
  3. Include a check for an existing status callback within the final_batch_job? query
  4. Recommend a unique constraint on args for the same batch_id and status if absolute uniqueness is required
1 Like

Hi, thanks for Oban! I want to build a worker that removes expires sessions from the database.

I’ve created a worker which already works fine, but I’m not sure what’s the best place to start the initial job? Also, my job schedules the next run and if I stop the application with an already scheduled job and start my workers again, I end up with multiple workers.

I’ve tried the “unique” configuration, but it doesn’t allow me to schedule a new run when the worker is already running, how can I do this?

Here’s the worker with all irrelevant code removed:

defmodule App.Workers.ClearExpiredSessions do
  use Oban.Worker, queue: "system", max_attempts: 10
  # I've tried `unique: [fields: [:queue, :worker]]` here, but cannot schedule jobs then

  @one_minute 60

  # What's the best place to call this?
  def start do
    job = new(%{})
    Oban.insert(job)
  end

  @impl Oban.Worker
  def perform(_params, %{attempt: 1} = _job) do
    delete_expired_sessions()

    job = new(%{}, schedule_in: @one_minute)
    Oban.insert(job)
  end

  def perform(_params, _job) do
    delete_expired_sessions()
  end
end
1 Like

This sort of sounds like something better suited for Quantum. Quantum is built around executing things at regular intervals, where each execution should be run basically once. You don’t need a queue or bounded worker pool for that.

1 Like

Yes, I also have a lot of other regular workers using Oban, so I thought that introducing another library for just one task might be overkill.

A simple genserver with a 60 second Process.send_after would work too if you want to avoid another library.

That’s my current implementation. I like Oban’s logging and retry functionalities, so I’m just playing around with it. You might be right that it’s not the best solution, but I wanted to know if it’s possible :slight_smile:

I would have a “Setup / Start”-task in my supervision three that just query the database to see if there is a clear_session job or not and queue a new one if there is none.

But in this particular case I would say, keep it in a GenServer… much less complexity.

Regardless of whether you should do it this way, you should be able to do ti this way. In this case you need to use a unique argument, say the nearest minute as unix time. Something like this would round off the edges:

DateTime.utc_now()
|> DateTime.add(60)
|> Map.put(:second, 0)
|> DateTime.to_unix()

Until the next minute you’ll keep getting the same value, so uniqueness is enforced within the current minute.

I tend to agree with you on that. Quantum in particular is a large library.

Yes, I would do exactly that.

For something that is clearing expired sessions and doesn’t need to be coordinated to ensure it only runs on a single host/node, putting it in a GenServer is fine. If you are running multiple nodes and you only want to execute a job once within a window of time it is pretty sensible to use background jobs to do it.

  1. No matter how many nodes you are running the job will only execute on a single node
  2. If your node crashes, or you restart after a deploy the task will run again off-schedule. With unique scheduled jobs you can maintain uniqueness between nodes and restarts.
1 Like

Oban v0.9.0 has been released with a few important bug fixes and a couple of minor feature additions. Definitely upgrade if you’ve ran into trouble in CI with multiple unscoped Oban.Migration calls.

Thanks to everybody that has been using the library and reporting issues! :yellow_heart:

From the CHANGELOG

Added

  • [Oban] Add insert_all/2 and insert_all/4, corresponding to Ecto.Repo.insert_all/3 and Ecto.Multi.insert_all/5, respectively. @halostatue

  • [Oban.Job] Add to_map/1 for converting a changeset into a map suitable for database insertion. This is used by Oban.insert_all/2,4 internally and is exposed for convenience.

Changed

  • [Oban.Config] Remove the default queue value of [default: 10], which was overriden by Oban.start_link/1 anyhow.

  • [Oban.Telemetry] Allow the log level to be customized when attaching the default logger. The default level is :info, the same as it was before.

Fixed

  • [Oban.Migrations] Prevent invalid up and down targets when attempting to run migrations that have already been ran. This was primarily an issue in CI, where the initial migration was unscoped and would migrate to the current version while a subsequent migration would attempt to migrate to a lower version. @jc00ke

  • [Oban.Job] Prevent a queue comparison with nil by retaining the default queue (default) when building uniqueness checks.

  • [Oban.Job] Set state to scheduled for jobs created with a scheduled_at timestamp. Previously the state was only set when schedule_in was used.

7 Likes

:heart: Thanks for the quick fix + release of the migration issue! :heart:

1 Like

Oban v0.10.0 is published with a huge performance improvement, a change to the default logging behavior and a huge restructuring of notifications. The result is much faster (258,000 faster with extremely large queues, millions of unprocessed jobs) and more fault tolerant (able to run without a database connection and recover safely when the database comes back up).

This release does involve a migration and I highly recommend it.

From the CHANGELOG

Migration Optional (V5)

Tables with a lot of available jobs (hundreds of thousands to several million) are prone to time outs when fetching new jobs. The planner fails to optimize using the index available on queue, state and scheduled_at, forcing both a slow sort pass and an expensive bitmap heap scan.

This migration drops the separate indexes in favor of a a single composite index. The resulting query is up to 258,757x faster on large tables while still usable for all of the other maintenance queries.

History of the EXPLAIN ANALYZE output as the query was optimized is available here: https://explain.depesz.com/s/9Vh7

Changed

  • [Oban.Config] Change the default for verbose from true to false. Also, :verbose now accepts only false and standard logger levels. This change aims to prevent crashes due to conflicting levels when the repo’s log level is set to false.

Fixed

  • [Oban.Notifier] Restructure the notifier in order to to isolate producers from connection failures. Errors or loss of connectivity in the notification connection no longer kills the notifier and has no effect on the producers. @axelson
10 Likes

Awesome! Huge thanks for such outstanding package and the continuing effort to improve it!

6 Likes

I took a look through the readme file on github and didn’t see this but is it possible to have scheduled tasks that get repeated on a set interval? For example, every Monday at 2pm run X job (or any cron-like interval variant)?

1 Like

I’ve used a combination of Quantum and Oban for this. Quantum takes cron config, and if the action is super fast, I just do it. If it may fail or can take a while, I have Quantum simply enqueue a job.

4 Likes

Thanks. I am using Quantum too. I guess repeated scheduling is better off being kept as a third party dependency? Oban is so well put together that not supporting repeated scheduling seems like it’s probably by design?

1 Like