Oban — Reliable and Observable Job Processing

That makes a lot of sense. Thanks for the detailed answer!

I’ve posted a “recipe” on enforcing unique jobs with Oban. It is the first in a series of recipe posts, and was largely drawn from the discussion in this thread. (This is also linked in the monster “Blog Posts” thread)

Oban Recipes Part 1: Unique Jobs

8 Likes

Thanks @sorentwo, been using Oban for a few weeks now, very happy with it. In the initial post you teased a UI for visualizing the workers and job queues, is there a timeline or roadmap with respect to that UI?

7 Likes

@sorentwo,

Would you be interested in vue implementation of the ui with some filtering using absinthe?

Personally, no, I’m trying to avoid heavy JS where possible.

The current live view powered UI works wonderfully and I’m really pleased to be writing Elixir for everything. The remaining blockers are:

  1. Some ugly bugs (literally)
  2. A few missing core features
  3. Install and usage documentation
  4. The dependency on phoenix_live_view means I can’t release it on Hex
8 Likes

So you need someone to write docs and create a tutorial on how to use your package?

Also can oban be used for email marketing self solution as a newsletter sender?

Thanks in advanced

Not exactly. The writing effort is around integration, a changelog, subtleties of behavior, and establishing a roadmap. The process of explaining those things to somebody (or multiple somebodies) would be equivalent to writing the docs :slightly_smiling_face:

Certainly. It’s up to you how you make use of it!

2 Likes

I need check it later after you create the docs.

T

Thanks

awesome! Is there (in the code) a pretty hard dependency on PG? I have a use case for a similar front end but with a distributed DB, I’d be interested in extending it at some point.

1 Like

Yes, there are a number of hard dependencies on PG features. A few off the top of my head:

  1. Pub sub through pg_notify
  2. Software advisory locks
  3. Use of SKIP LOCKED
  4. Partial indexes

There are probably others that I’m missing, but those are fundamental enough to how Oban operates that I can’t foresee porting it.

Dropping Ecto as a requirement or writing a port in another language is more likely that working with a different database.

2 Likes

I am looking into using Oban in our production project. Some of the background tasks are run using Task.Supervisor.async_stream_nolink and, while it works for now, could be improved a lot and made more reliable by using a proper background library.

I have a branch with Oban in which I am testing it. I have converted a pair of those tasks and now they run into an Oban worker, so far they are running amazingly well :raised_hands: . The only remaining doubt is how to report errors that happen into a worker.

Our project uses AppSignal for performance and error metrics, but I found that errors inside Oban workers won’t get reported to AppSignal. Looking at the code it looks like Oban rescues errors to handle them for retries and such, so they never get to AppSignal.

Has anybody faced a similar situation? Any guidance on how should I proceed? I’ve seen that Oban reports certain information using :telemetry, would that be the key point that I am missing?

1 Like

That is exactly what I would recommend to proceed. There is some documentation around error reporting in the Oban.Telemetry documentation:

https://hexdocs.pm/oban/Oban.Telemetry.html#module-examples

It seems like that information, or even that example, should be elevated in the documentation :slight_smile:

2 Likes

Oban v0.6.0 has been released!

There are a couple of convenient additions, some changes to how the perform/1 function is defined, and an important fix for queue polling. As usual, from the CHANGELOG with attribution where possible:

Added

  • [Oban.Query] Added :verbose option to control general query logging. This allows debug query activity within Oban to be silenced during testing and development.

  • [Oban.Testing] Added all_enqueued/1 helper for testing. The helper returns a list of jobs matching the provided criteria. This makes it possible to test using pattern matching, which is more flexible than a literal match within the database. @luizpvasc

Changed

  • [Oban.Config] All passed options are validated. Any unknown options will raise an ArgumentError and halt startup. This prevents misconfiguration through typos and passing unsupported options.

  • [Oban.Worker] The perform/1 function now receives an Oban.Job struct as the sole argument, calling perform/1 again with only the args map if no clause catches the struct. This allows workers to use any attribute of the job to customize behaviour, e.g. the number of attempts or when a job was inserted into the database.

    The implementation is entirely backward compatible, provided workers are defined with the use macro. Workers that implement the Oban.Worker behaviour manually will need to change the signature of perform/1 to accept a job struct. @andykent

  • [Oban] Child process names are generated from the top level supervisor’s name, i.e. setting the name to “MyOban” on start_link/1 will set the notifier’s name to MyOban.Notifier. This improves isolation and allows multiple supervisors to be ran on the same node.

Fixed

  • [Oban.Producer] Remove duplicate polling timers. As part of a botched merge conflict resolution two timers were started for each producer. (Tom Taylor)

v0.6.0 Docs

11 Likes

Well that example worked wonderfully and errors in background jobs are being reported to Appsignal.

This was the last step into using Oban in production. Thanks! :raised_hands:

3 Likes

Do you have any example on how you equeue jobs from python?
Did you write some helper modulen in python/ruby for that? or are you just inserting the jobs directly to the db?

1 Like

I haven’t done this with Python yet. All you need to do is insert jobs directly into the oban_jobs table with the proper values:

  • worker — The name of your worker in Elixir, i.e. MyApp.Worker
  • queue — The queue for the job, which must be running in Oban for the job to be ran, i.e. priority
  • args — A jsonb map, i.e. {"email":"somebody@example.com"}

All of the other fields have database defaults, so you don’t need to set anything.

3 Likes

Oban v0.7.0 has been released!

This release includes a few big features and some breaking changes to workers. The somewhat annotated CHANGELOG follows.

Thanks to everybody who contributed to docs and helped drive these features :yellow_heart:

Added

  • [Oban] Added insert/2, insert!/2 and insert/4 as a convenient and more powerful way to insert jobs. Features such as unique jobs and prefix support only work with insert.

  • [Oban] Add prefix support. This allows entirely isolated job queues within the same database. @anthonator

  • [Oban.Worker] Compile time validation of all passed options. Catch typos and other invalid options when a worker is compiled rather than when a job is inserted for the first time.

  • [Oban.Worker] Unique job support through the unique option. Set a unique period, and optionally fields and states, to enforce uniqueness within a window of time. For example, to make a job unique by args, queue and worker for 2 minutes:

    use Oban.Worker, unique: [period: 120, fields: [:args, :queue, :worker]]

    Note, unique support relies on the use of Oban.insert/2,4.

Changed

  • [Oban.Worker] Remove the perform/1 callback in favor of perform/2. The new perform/2 function receives the job’s args, followed by the complete job struct. This new function signature makes it clear that the args are always available, and the job struct is also there when it is needed. A default perform/2 function is not generated automatically by the use macro and must be defined manually.

    This is a breaking change and all worker modules will need to be updated. Thankfully, due to the behaviour change, warnings will be emitted when you compile after the upgrade.

    If your perform functions weren’t matching on the Oban.Job struct then you can migrate your workers by adding a second _job argument:

    def perform(%{"some" => args}, _job)

    If you were making use of Oban.Job metadata in perform/1 then you can move the job matching to the second argument:

    def perform(_args, %{attempt: attempt})

    See the issue that suggested this change for more details and discussion.

  • [Oban.Producer] Use send_after/3 instead of :timer.send_interval/2 to maintain scheduled dispatch. This mechanism is more accurate under system load and it prevents :poll messages from backing up for each producer.

  • [Oban.Migration] Accept a keyword list with :prefix and :version as options rather than a single version string. When a prefix is supplied the migration will create all tables, indexes, functions and triggers within that namespace. For example, to create the jobs table within a “private” prefix:

    Oban.Migrate.up(prefix: "private")t

v0.7.0 Docs

12 Likes

I’ve released Oban v0.7.1 with some important bug fixes. This is a minor release, but I recommend everybody upgrade (especially if you’re using a hosted database that doesn’t allow

From the CHANGELOG:

Fixed

  • [Oban.Query] Release advisory locks in batches rather than individually after a job finishes execution. By tracking unlockable jobs and repeatedly attempting to unlock them for each connection we ensure that eventually all advisory locks are released.

    The previous unlocking system leaked advisory locks at a rate proportional to the number of connections in the db pool. The more connections, the more locks that wouldn’t release. With a default value of 64 for max_locks_per_transaction the database would raise “ERROR: 53200: out of shared memory” after it hit a threshold (11,937 exactly, in my testing). @speeddragon

  • [Oban.Worker] Allow max_attempts to be 1 or more. This used to be possible and was broken unintentionally by compile time validations. @antonielcm

5 Likes

Any interest in revisiting orphaned job tracking methods that don’t use advisory locks?