Oban — Reliable and Observable Job Processing

Thanks for the quick response and the great work!

Perhaps it would be enough to mention it in docs.

1 Like

I do tend to think that if something comes up multiple times it would probably benefit from a blessed solution.

1 Like

Oban v1.1.0 is out with some minor features and a ton of reliability improvements prompted by edge cases from real world usage. Thanks to everybody that reported the issues and helped in proving the solutions :yellow_heart:

From the CHANGELOG:

Fixed

  • [Oban.Crontab] Allow any number of spaces and/or tabs in cron expressions.

  • [Oban.Pruner] Prevent deadlocks while pruning by ensuring that only a single node can prune at any given time. (Thanks @bgentry)

  • [Oban.Queue.Executor] Improve handling of nested/linked process failures. Previously if a job spawned a process that crashed (i.e. through Task.async) then the job was left stuck in an executing state.

Added

  • [Oban.Worker] Add timeout/1 callback for limiting how long a job may execute. The default value is :infinity, which allows a job to run indefinitely and mirrors the previous behavior. (Thanks to @benwilson512 who did the initial implementation)

  • [Oban.Telemetry] Add :error and :stack to trip_circuit event metadata.

  • [Oban.Queue.Executor] Retry success/failure database updates after a job finishes.

    On occasion something may happen to the database connection and updating a job’s state would fail. Now we retry writing with a linear backoff to prevent the job from getting stuck in an executing state. (Thanks to @coladarci)

Changed

  • [Oban.Worker] Tighten the spec for perform/2. Now workers are expected to return only :ok, {:ok, result} or {:error, reason}. A warning is logged if any other value is returned—for high throughput systems this may cause performance issues and you should update your worker’s return values.

    Returning a success tuple is supported for testing purposes and backward compatibility, while returning :ok on success if preferred. (Thanks to @sasajuric)

13 Likes

Hi,
I have a question. I use a API that bans or rate limits its users if they do more than one request in 4 seconds. Is there any to make some kinda interval so that if I have some jobs related to this api and some not, then the there will be always be 4 seconds between each job with calls to this api and the worker could run other jobs meanwhile. If there are no other jobs then the worker could just wait until 4 seconds has passed. Any thoughts on this?

Thanks for your time! :slight_smile:

Simplest solution could be to define a queue with concurrency set to 1 for that jobs and sleep necessary time at the end of each job.

If it’s not possible, I’d use GenServer to throttle the requests.

Simplest solution could be to define a queue with concurrency set to 1 for that jobs and sleep necessary time at the end of each job.

This is what I lean towards right now but I find it ugly.

You can model basic rate limiting by conditionally returning an error tuple:

defmodule MyApp.LimitedWorker do
  # You may want to use a longer period
  use Oban.Worker, unique: [period: 4]

  @impl Oban.Worker
  def perform(%{"user_id" => user_id}, _job) do
    if MyApp.Accounts.over_rate_limit?(user_id) do
      {:error, :over_rate_limit}
    else
      do_whatever_this_worker_does()

      :ok
    end
  end

  # Retry every four seconds, you'd probably want this to be smarter.
  @impl Oban.Worker
  def backoff(_attempt), do: 4
end

That version will retry the job later after the rate limit has cooled down. Note that you’ll get error reports for each retry, so you may want to ignore the :over_rate_limit error in your error reporter.

1 Like

Thanks. To me Oban seems really great but also really complex and I have a hard time grasping it. Is there some example codebase to look at to understand better?

1 Like

You could use jobs for that, it supports rate limited queues. Note that job` is not about persistence, so you might still combine it with oban. Basically, in each worker perform you ask your jobs-based rate limited queue for permission. Since jobs is in-memory, a system restart might cause an occasional rate offense (e.g. you execute the job, then the system restarts, and you end up executing another job in the interval of 4 seconds).

1 Like

Are there specific things you find confusing that we could help you, and others, understand? We could add these topics to a knowledge base/wiki like Sidekiq’s wiki.

In terms of the rate limiter part, I’ve used ExRated in the past (though I wish it were named something different) and ExHammer also looks very nice. Those you could use for the

part. You would still likely have to “fail” the job so Oban will retry later.

1 Like

Is there any sort of way to make assert_enqueued act like assert_receive? Or is there some sort of monitor I can subscribe a test process to so I can get notified when a job gets enqueued before using assert_enqueued? There are times I want to test a job gets enqueued as part of an async process.

1 Like

Absolutely, if you are confused or need clearer examples let us/me know! The project always needs doc improvements and I’m working on proper guides.

Rate limiting keeps coming up, it’s definitely on the feature roadmap.

There isn’t any monitoring/pubsub that makes this possible currently. There are PostgreSQL pubsub notifications for inserted jobs, but they don’t work in the sandbox environment. The sandbox uses transactions for isolation and pubsub notifications aren’t emitted until a transaction commits.

Would you mind opening an issue/feature request? This is definitely something I’ve encountered before and I’ve resorted to polling. Additional telemetry events (which has an open issue) would make this easy to support without polling as well.

2 Likes

Hi, thanks for you reply.

Something that I would like to do is to do work with some entries in a files table in the database. Some types of work are dependent on other types of work. If something fails for one file I would like to be able say “oh then dont start the other work just yet. Instead retry this 2 times with a hour interval first or so. if those fails too then dont continue doing work with this file.” However the work should still be done for the other files. All jobs should be logged and accessible from web admin ui.

What I had trouble understanding was mostly the terms used in the docs eg queues and wokers. I made a simple ennvironment and used this guide to setup up test work. I think I undestand better now. https://elixircasts.io/job-processing-with-oban

Thanks for your time

Not quite what you wanted in the last point (it blocks the worker while waiting) but one option would be a separate “rate limiter” GenServer. It exposes one public function, RateLimiter.get_token with the following behavior:

  • if get_token has never been called before, record the current time in the state and reply with :ok immediately

  • if get_token has been called but the last time was more than 4 seconds ago, record the current time in the state and reply with :ok immediately

  • if get_token was called more recently than that, use Process.send_after to schedule a future message. Don’t reply at all, leaving the caller of get_token blocked waiting for a reply. When the future message arrives, reply to the original caller with :ok.

The overall effect of this is to create a function that returns :ok immediately unless it’s called more often than once every 4s; in that case, it makes the caller wait.

This approach can be extended to handling separate per-user rate limits by keeping a map user_id => last_call_time in the GenServer instead of a single timestamp.

1 Like

Either way I would need to wait until all the first work with all the files is done or I would need to wait until the work with the processed file is done. In the latter approach the the each processed file would be processed in its own pace and when some work with it is done, then move on to the next work to do for that file, while the system continues to do the first work with the other files. I would prefer if I could do the latter because that way my system could work with other things while the failed files wait for a retry. Any ideas how to do smth like any of those approaches with Oban?

I think you should have a dedicated table that describes the amount of processing that has happened so far, and how much remains. Then enqueue just one job at the start, when it is done it updates your status row, and enqueues the next job. Relying on the presence or absence of future Oban jobs is going to perform more poorly, and won’t let you capture the nuance of your domain.

This question and some like it I think end up trying to have the job row represent “the state of processing as understood by my business domain on some business entity”, but I think this is a mistaken view of what Oban should be about. Oban’s job is to manage programmatic units of work that need to occur asynchronously. What the success or failure of those units of work implies about the status of some domain entity needs to be captured in your domain data, not by Oban.

2 Likes

Thanks, I will consider this. I think I have a better understanding of this now.

edit: how would I await for the work to be done? Should I do a request to a db every 30 or so seconds to check if I the work has been inserted? I would be thankful for any help! :slight_smile:

I’ve found that scheduled_at in Oban is a great solution if you combine with some job planning of your own. For example in my app I let users enqueue a lot of jobs and specify a rate limit. So, I can calculate the exact time I want each job to be run, given the initial time (current time) and adding the ellapsed time to it. For example if they want one job every 4 seconds, the first one will be scheduled_at now, the second one at “now + 4 seconds”, the third one at “now + 4 + 4 seconds” and so on. I store the last timestamp in Redis but you could use any other cache, DB, an ets table, a genserver…

Another approach that you can also use is a rate limiter like ExHammer, and making the jobs re-enqueue themselves if they’re rate limited, or just wait until it’s their time. In fact I use both solutions, job planning and rate limiting.

2 Likes

Hi, is there some way to mark a job as diascard from inside the job? Like if oban job could encounter temporary problem and also permanent issues, then I want to mark it as failed if it encountes permanent problem at the first time.

There isn’t currently a way to indicate that a job should be discarded instead of retried if it encounters a permanent problem. Coincidentally @chulkilee was asking about this in the #oban Slack channel just yesterday.

For now, you can “fake” it and have the job stop retrying by returning a successful value like :ok, or {:ok, :ignored}.