Oban — Reliable and Observable Job Processing

This week the course Sidekiq in Practice is available for 5$ (it costs 50$ normally).

Looks like the sections about scaling and idempotency may be interesting regardless of the background job framework you are using. Does anyone have an opinion on this course or know if some of the lessons can be applied to Oban as well?

EDIT: don’t have any personal or professional affiliation with the course or with Sidekiq. Just used it in a Rails project and found it incredibly stable and well written piece of software.

1 Like

I would imagine that the sections about scaling are probably not relevant since the runtime characteristics of Ruby and Elixir are so different. The idempotency stuff may be relevant.

1 Like

Hi all, I’m getting stuck into using Oban in my project and would like a bit of advice on cancelling jobs according to tags when the related entity is deleted. I saw this touched upon in a much earlier post in this thread but am wondering if there’s an easier way to query jobs by tag.

E.g I have a worker for scheduling “reminders” to be sent in the future: MyApp.SendReminder. When these are inserted they’re tagged with the related entity (e.g. entity:123).

Say entity with ID 123 is deleted, I obviously no longer want these reminder jobs to run. I’ve found cancel_all_jobs/2 but I’m not entirely sure how to query for jobs with the appropriate tag. It looks like the tags are stored as a string in the DB. Are there any helpers to get jobs by a tag?

Tags are stored as a text array in the database. There are two approaches you can take:

A) Use a query to cancel the jobs and stop them from running, as you mentioned. That query could look like this:

Oban.Job
|> where([j], "entity:123" in j.tags)
|> Oban.cancel_all_jobs()

B) Check the tags as the job executes and make it a no-op. You could check within the perform/1 function like this:

def perform(%Job{args: args, tags: ["entity:" <> entity_id}) do
  case MyApp.Repo.get(Entity, entity_id) do
    %Entity{} -> ...
    nil -> ...
  end
end

The direction you take is up to you and probably depends on the volume of jobs you have. With a handful of extraneous jobs, I’d favor option B.

4 Likes

Many thanks. My use case is actually a little more complex than I described, so I went with option A, but the noop pattern is useful for some other cases I have.

Thanks for the very handy library!

Oban v2.11, Pro v0.10, and Web v2.9 are out and we’ve written a proper announcement post—Oban v2.11, Pro v0.10, and Web v2.9 Released.

Take a look, or browse the CHANGELOG to see what’s new.

8 Likes

Hi, is there still free a dashboard available or is it just the Web paid version? I might not be your target demographic, but do you have any startup pricing plans? my total monthly server costs is less than $20 while Oban is $39 a month.

1 Like

Oban v2.12 and Oban Pro v0.11 are out!

These releases were dedicated to enriching the testing experience and expanding config, plugin, and queue validation across all environments. They’re so focused on testing that we’ve dubbed them the “don’t forget to floss” and “eat your veggies” editions, respectively :laughing:

:gem: Oban v2.12

Testing Modes

Testing modes bring a new, vastly improved way to configure Oban for testing. The new testing option explicitly states that Oban should operate in a restricted mode for the given environment.

Behind the scenes, the new testing modes rely on validation layers within Oban’s Config module. Now production configuration is validated automatically during test runs. Even though queues and plugins aren’t started in the test environment, their configuration is still validated.

To switch, stop overriding plugins and queues and enable a testing mode in your test.exs config:

config :my_app, Oban, testing: :manual

Testing in :manual mode is identical to testing in older versions of Oban: jobs won’t run automatically, so you can use helpers like assert_enqueued and execute them manually with Oban.drain_queue/2.

An alternate :inline allows Oban to bypass all database interaction and run jobs immediately in the process that enqueued them.

config :my_app, Oban, testing: :inline

Finally, new testing guides cover test setup, unit testing workers, integration testing queues, and testing dynamic configuration.

Global Peer Module

Oban v2.11 introduced centralized leadership via Postgres tables. However,
Postgres-based leadership isn’t always a good fit. For example, an ephemeral leadership mechanism is preferred for integration testing.

In that case, you can make use of the new :global powered peer module for leadership:

config :my_app, Oban,
  peer: Oban.Peers.Global,
  ...

For other improvements and minor bug fixes, view the complete Oban CHANGELOG.

:sparkles: Oban Pro v0.11

In league with Oban v2.12, Pro v0.11 focused extensively on testing improvements.

Pro Testing Module

The centerpiece of those improvements is the Oban.Pro.Testing module, a drop-in replacement for Oban.Testing with considerations for unit, integration, and acceptance testing Pro workers.

The helpers provided by Oban.Pro.Testing are dogfooded—that is, they are what Pro uses for testing internally!

Here’s a taste of how one of the new testing helpers, run_workflow/2, makes it simple to run a workflow optimally, inline, without any extra configuration:

workflow =
  MyWorkflow.new_workflow()
  |> MyWorkflow.add(:com, MyWorkflow.new(%{text: text, mode: :complexity}))
  |> MyWorkflow.add(:sen, MyWorkflow.new(%{text: text, mode: :sentiment}))
  |> MyWorkflow.add(:syn, MyWorkflow.new(%{text: text, mode: :syntax}))
  |> MyWorkflow.add(:exp, MyWorkflow.new(%{}), deps: [:com, :sen, :syn])

# Using with_summary: false gives us a list of executed jobs, including the
# job's recorded result.
assert [_com, _sen, _syn, exp_job] = run_workflow(workflow, with_summary: false)

assert {:ok, 0.8} = MyWorkflow.fetch_result(exp_job)

Furthermore, there are new guides to introduce Pro testing, and walk you through testing Pro workers.

Fewer Plugins, Same Functionality

In an effort to simplify testing scenarios, we’ve pulled some functionality out of plugins and made that functionality available out of the box, without any extra configuration.

The BatchManager and Relay plugins are deprecated and it’s no longer necessary to run them at all! Each deprecated plugin will emit a warning when your app starts, so you’ll probably want to remove them:

config :my_app, Oban, plugins: [
- Oban.Pro.Plugins.BatchManager,
- Oban.Pro.Plugins.Relay
]

Enhanced Workflow Tools

Workflow workers now expose all_workflow_jobs/2 for fetching other jobs in a workflow without streaming. It operates in three modes: fetch all jobs in the workflow, only the current job’s dependencies, or only specific dependencies by name. The same options are now supported by stream_workflow_jobs/2 as well, so you can switch to streaming for large workflows.

Check the Pro CHANGELOG for a complete list of enhancements and bug fixes.

Find us in #oban on Elixir Slack or ask here if you need any help!

6 Likes

Hi @sorentwo , a quick question: The backoff calback must return a number of seconds to wait after the previous attempt, right? Or the seconds relative to the job first enqueing date?

Thank you.

I am trying to compute a simple backoff that would make 20 attempts in about 48 hours, I found this site : Exponential Backoff Calculator but all amounts are actually from the first date.

If it is from the previous attempt then this works:

    Stream.cycle([15])
    |> Stream.scan(fn a, acc -> floor(a + acc * 1.4865) end)
    |> Enum.at(attempt - 1)
    |> Kernel.+(Enum.random(1..5))
1 Like

It’s the number of seconds to wait after the current attempt finishes. Think of it as “now + backoff”.

1 Like

Great, thank you.

Oban v2.13 is out!

Cancel Directly from Job Execution

Discard was initially intended to mean “a job exhausted all retries.” Later, it
was added as a return type for perform/1, and it came to mean either “stop
retrying” or “exhausted retries” ambiguously, with no clear way to
differentiate. Even later, we introduced cancel with a cancelled state as a
way to stop jobs at runtime.

To repair this dichotomy, we’re introducing a new {:cancel, reason} return
type that transitions jobs to the cancelled state:

case do_some_work(job) do
  {:ok, _result} = ok ->
    ok

  {:error, :invalid} ->
-   {:discard, :invalid}
+   {:cancel, :invalid}

  {:error, _reason} = error ->
    error
end

With this change we’re also deprecating the use of discard from perform/1
entirely! The meaning of each action/state is now:

  • cancel—this job was purposefully stopped from retrying, either from a return
    value or the cancel command triggered by a human

  • discard—this job has exhausted all retries and transitioned by the system

You’re encouraged to replace usage of :discard with :cancel throughout your
application’s workers, but :discard is only soft-deprecated and undocumented
now.

Public Engine Behaviour

Engines are responsible for all non-plugin database interaction, from inserting
through executing jobs. They’re also the intermediate layer that makes Pro’s
SmartEngine possible.

Along with documenting the Engine this also flattens its name for parity with
other “extension” modules. For the sake of consistency with notifiers and peers,
the Basic and Inline engines are now Oban.Engines.Basic and
Oban.Engines.Inline, respectively.

v2.13.0 — 2022-07-22

Enhancements

  • [Telemetry] Add encode option to make JSON encoding for attach_default_logger/1.

    Now it’s possible to use the default logger in applications that prefer
    structured logging or use a standard JSON log formatter.

  • [Oban] Accept a DateTime for the :with_scheduled option when draining.

    When a DateTime is provided, drains all jobs scheduled up to, and
    including, that point in time.

  • [Oban] Accept extra options for insert/2,4 and insert_all/2,4.

    These are typically the Ecto’s standard “Shared Options” such as log and
    timeout. Other engines, such as Pro’s SmartEngine may support additional
    options.

  • [Repo] Add aggregate/4 wrapper to facilitate aggregates from plugins or
    other extensions that use Oban.Repo.

Bug Fixes

  • [Oban] Prevent empty maps from matching non-empty maps during uniqueness checks.

  • [Oban] Handle discarded and exhausted states for inline testing mode.

    Previously, returning a :discard tuple or exhausting attempts would cause an
    error.

  • [Peer] Default leader? check to false on peer timeout.

    Timeouts should be rare, as they’re symptoms of application/database overload.
    If leadership can’t be established it’s safe to assume an instance isn’t
    leader and log a warning.

  • [Peer] Use node-specific lock requester id for Global peers.

    Occasionally a peer module may hang while establishing leadership. In this
    case the peer isn’t yet a leader, and we can fallback to false.

  • [Config] Validate options only after applying normalizations.

  • [Migrations] Allow any viable prefix in migrations.

  • [Reindexer] Drop invalid Oban indexes before reindexing again.

    Table contention that occurs during concurrent reindexing may leave indexes in
    an invalid, and unusable state. Those indexes aren’t used by Postgres and they
    take up disk space. Now the Reindexer will drop any invalid indexes before
    attempting to reindex.

  • [Reindexer] Only rebuild args and meta GIN indexes concurrently.

    The new indexes option can be used to override the reindexed indexes rather
    than the defaults.

    The other two standard indexes (primary key and compound fields) are BTREE
    based and not as subject to bloat.

  • [Testing] Fix testing mode for perform_job and alt engines, e.g. Inline

    A couple of changes enabled this compound fix:

    1. Removing the engine override within config and exposing a centralized
      engine lookup instead.
    2. Controlling post-execution db interaction with a new ack option for
      the Executor module.

Deprecations

  • [Oban] Soft replace discard with cancel return value
4 Likes

Hello Oban people,

Recently I discovered “duplicate” Oban jobs being created. Some jobs had 19 copies, all with the exact same details. This meant the same job was handled in 19 processes at the exact same time :laughing:

It was happening when a database transaction updating an Oban job crashed. For some reason this left an extra copy of the job in the table every time.

Today I am going to write some migrations with uniqueness constraints so this cannot happen again.

Something like:

create unique_index(:oban_jobs, [:worker, :args], name: :oban_jobs_unique_worker_args_index)

The docs told me I would not need unique constraints for the oban jobs table:

Unique jobs are guaranteed through transactional locks and database queries: they do not rely on unique constraints in the database. This makes uniqueness entirely configurable by application code, without the need for database migrations.

but reality was different. I saw duplicate jobs being created because a Sage transaction crashed. The transaction crashed because of a bug in an Appsignal lib.

In the future, I will be advising anyone I work with to use database constraints on their oban jobs table. The “unique jobs” guarantee is not really a guarantee. It’s a good effort, but it doesn’t have the watertight safety of a database constraint.

Has anyone else experienced this?

1 Like

How is a crashed transaction writing results to the database? That seems like a bigger problem than the specifics of unique indexes.

I’ve seen similar behavior when using dynamic repos - in that case, the job was written to the database on a different Connection, so the job appeared in the DB despite the main transaction rolling back.

2 Likes

I agree. To be honest, I don’t know exactly where in the process it crashed. I only know the process was crashing at the same time as the duplicate oban job was inserted. The setup was simple too - single node server, single repo.

I wish I could provide better info on the circumstances but it has me stumped. All I know for certain is I had many duplicate jobs and each one was inserted at the same time that the Appsignal lib caused runtime error. The jobs are only ever inserted in transactions, so I have to assume the transaction crashed in some catastrophic way.

We have many tests for the normal “unique” approach of Oban jobs and we have been unable to recreated what we saw happen.

I’m not looking forward to fielding the comments I get off this one, but I feel like it’s my duty to give an honest account of what happened, even though I cannot provide satisfactory information about the cause of the problem.

Inserting jobs through Oban.insert or Oban.insert_all without unique options is exactly equivalent to Repo.insert or Repo.insert_all. With unique options, it’s equivalent to Repo.one followed by Repo.insert with an upsert. The only way duplicate jobs, let alone 19 of them, were inserted simultaneously would be from a loop in your application (which is exactly what it sounds like happened).

You don’t need a unique index on worker and args, but you’re welcome to add one—it’s your database! The primary downside is a lack of flexibility when mixing unique and non-unique jobs.

The jobs were not inserted in a loop.
Some of the inserted_at values were days apart.

Oh, I misunderstood then. That seems even less peculiar! Were you using unique options? What is your pruning set to? Feel free to DM me or ask on Slack if you’d like to keep a debug conversation out of this thread.

Yes we are using unique options. They work perfectly most of the time. At other times, when we were seeing the Appsignal lib crashing, the unique options are not respected.

I very much appreciate the offer to DM/Slack but I am extremely busy and have higher priority things to tackle.

Oban is a fantastic library and we will keep using it.

I only wanted to record my honest experience. If anyone else encounters duplicate jobs violating the unique guarantee, I would urge them to create a database migration and sleep well at night.