How can I snooze a whole queue in Oban until a certain time? (for daily API quota)

I have a daily quota from an API that I’m calling in Oban jobs. When I get an error response because of the quota exceeded, I want to postpone all jobs in the queue until 12:00amPT when it resets.

I see there is the ability to postpone jobs based on a {:snooze, time} response, but is there a way to do it on a whole-queue basis? Or is the best action to just run through all the jobs and have them reschedule individually? I’m trying not to hit the API too much with requests that won’t execute.

Curious what the concern would be about hitting the API too often? It seems like the most straightforward option is just checking for rate limited responses and having the job reschedule itself at that point.

Another alternative to a separate process to manipulate the jobs would be to store the API state in a simple agent and check that in the job before issuing the request?

I’m worried that I’ll get penalized by the operator. It could potentially be thousands of requests. I didn’t think about using an Agent, actually forgot all about it. Thanks for the suggestion.

Can you think of a way where I could do it using pause_queue and resume_queue? The concern being that if I pause a queue it might stay that way if some code fails to resume it. Sorry for all the questions, I’m new to Oban as of today.

Are you using a queue size in the thousands? Because it seems like that’s the only way that many requests would go through after a rate limited response came back…

I don’t have experience with pause_queue or honestly with a lot of Oban features so there might be something out there I’m not personally aware of. At first glance I’m not sure it’s a much better option since it doesn’t actually stop executing jobs so a high queue size would still be a potential problem?

One other thought, have you looked into customizing the backoff algo? That is frequently the right tool for dealing with rate limiting.

edit on second look the snooze return value doesn’t seem like a bad option, seems like the same issue would be with simultaneous execution…

1 Like

Doing these should mostly solve the problem:

  1. Run Oban.pause_queue/2
  2. Have global state in a GenServer or an Agent that more or less says “did we get a HTTP 429 so all work must stop now?” and have each job check its state; if the state is true then have the job snooze itself.

Doing #1 ensures no new jobs will run and will not hit the rate-limited API, and #2 makes sure that any running jobs that fetch the new status will also not hit the rate-limited API and reschedule themselves.

Not ideal of course; you might have 50 tasks that are already past the check – and it was false at the time of the check – so they might hit 50 requests that return HTTP 429.

You could also run Oban.cancel_all_jobs/2 on the queue immediately after one of them gets the HTTP 429 but then you’ll have to manually look for the cancelled job and reschedule them for when the restriction is lifted. Which is also very doable, but worth mentioning that you have to take care of the cancelled jobs.

This is easily accomplished with two workers and two queues.

The worker that’s interacting with your API will respond to a rate limit error by pausing the queue and scheduling a job to resume the queue at midnight.

Like this:

defmodule MyApp.APIWorker do
  use Oban.Worker, queue: :some_api

  @impl true
  def perform(%{args: args, queue: queue}) do
    case MyApp.make_api_call(args) do
      {:ok, _whatever} ->
        :ok

      {:error, :rate_limit} ->
        tomorrow =
          Date.utc_today()
          |> Date.shift(day: 1)
          |> DateTime.new!(~T[00:00:00])

        %{queue: queue}
        |> MyApp.APIMonitor.new(scheduled_at: tomorrow)
        |> Oban.insert!()

        Oban.pause_queue(queue: :some_api)

        {:snooze, 60}
    end
  end
end

The worker that resumes the queue runs in a different queue that isn’t paused, and it’s marked as unique to prevent enqueuing duplicates if multiple jobs hit the rate limit at the same time.


defmodule MyApp.APIMonitor do
  use Oban.Worker, queue: :monitors, unique: true

  @impl true
  def perform(%{args: %{queue: queue}}) do
    Oban.resume_queue(queue: queue)
  end
end

This has the advantage that it’s resilient against restarts so you can deploy in between and the queue will automatically pause itself again (or the queue will remain paused if you’re able to use DynamicQueues). It also avoids snoozing, which can still churn jobs (and mess with the attempt count if you’re not using a Pro worker).

8 Likes

I love you! Simple and elegant, cheers. I know people have been defaulting to Oban for a while but I hadn’t gotten the chance to use it. Yesterday I started looking into the docs and watched your oban architecture video. Super impressing engineering, thanks for the great library.

3 Likes

What would you do to test this? I’m having a situation where Oban.pause_queue is returning :ok, but when assert for paused: true, it fails.

 describe "perform/1" do
    test "unpauses paused queue" do
      :ok = Oban.pause_queue(queue: :youtube)

      assert %{paused: true} = Oban.check_queue(queue: :youtube) # This is paused: false even though I called paused above

      perform_job(APIMonitorWorker, %{queue: :youtube})

      assert %{paused: false} = Oban.check_queue(queue: :youtube)
    end
  end

Pausing is asynchronous because it applies to all nodes. You have a race condition between telling it to pause and checking that it is in fact paused. To test pausing like that you need a delay, or a loop that keeps checking with a small backoff.

This isn’t something I suggest testing anyhow. It tests part of how the framework operates, with is already tested by Oban. In addition, queues aren’t normally running during tests.

1 Like

It’s superfluous to test Oban itself. It has its own tests. Just test your functionally and assume Oban will work. If it doesn’t then it has a bug that its maintainers will fix. It’s not your responsibility.

@sorentwo @dimitarvp Hmm I’m not sure I agree about not testing it, but maybe I didn’t explain myself well. The pausing of the API queue is a core part of the business logic as it relates to a very real situation of running out of API credits. The YouTube API limit is very low so this is likely to be an every day occurrence. I’m not testing the Oban pause function so much as I’m testing the recovery of the queue after it’s been paused by some outside force.

I’ve even used Process.sleep with very high values (as high as 5 seconds) and it still doesn’t appear to pause the queue in tests.

The real test I want to write includes pausing the queue and then testing the APIMonitorWorker Cron functionality as it runs on an hourly basis. Do you have any recommendations for testing workers that are run with the Cron plugin? Or for testing the queue pausing and resuming in this instance?

In this case you should make f.ex. a global counter that shows how much API requests remain until a certain deadline (fetched from another global state i.e. in ETS where the counter will also be). When that counter hits zero then any currently running jobs will simply snooze themselves to re-run when the API requests limit window resets.

You still should issue the Oban.pause_queue function call, of course, but definitely do add the global counter to make sure the jobs aren’t going over the limit.

Whether a queue is paused immediately or 10 seconds later is something that you’ll have to accept as a fact of life and a (perceived) limitation of Oban for the moment, and a behaviour that you can’t rely on when it comes to immediacy.

EDIT: Obviously there are other and more refined solutions to self-enforce rate limits and we can give you links to those libraries; I am pointing out a quick solution.

I have an automated YouTube channel, and all the tasks are handled using Oban. For the YouTube uploading part, I faced similar issues with the YouTube API quota limitations, which only allow a certain number of operations within a 24-hour period.

If you’re interested, I’ve also written a tutorial about uploading to YouTube with Elixir :slight_smile:

From my experience running this pipeline for multiple years, it’s much more convenient to track how many quota units you have left on your API key so you can max out the quota. This becomes especially helpful if you plan to add more API keys either for handling actions on behalf of different users or just to increase your overall quota without doing the google verification.

I manage my multiple Google credentials and their current quotas using a model in my database:

schema "google_credentials" do
  field :name, :string
  field :client_id, :string
  field :client_secret, :string
  field :refresh_token, :string
  field :quota, :integer, default: 0

  timestamps()
end

To handle API requests, I wrap each operation with a database query to find a credential with enough quota available. If none are found, the job is rescheduled to run after the next quota reset, with the highest priority to ensure it gets handled before new jobs are queued.

Here’s a simplified example of how I manage the quota for YouTube uploads and other API operations:

def insert_simple(channel_name, video_path, title, description, tags) do
  operation_cost = 1_600

  quota_wrapper(channel_name, operation_cost, fn client ->
    Api.insert_simple(client, video_path, title, description, tags)
  end)
end

def set_thumbnail(channel_name, youtube_video_id, thumbnail_path) do
  operation_cost = 50

  quota_wrapper(channel_name, operation_cost, fn client ->
    Api.set_thumbnail(client, youtube_video_id, thumbnail_path)
  end)
end

defp quota_wrapper(channel_name, operation_cost, fun) do
  with {:ok, credential} <- Google.fetch_credential_with_quota(channel_name, operation_cost),
       client <- Api.new(credential.name),
       :ok <- Google.increment_credential_quota(credential.name, operation_cost) do
    fun.(client)
  else
    :error ->
      Logger.info("Quota increment failed")
      {:error, :quota_increment_failed}
  end
end

And then you reset the quota at 0 Los_Angeles timezone which is what’s youtube is doing

    {
      Oban.Plugins.Cron,
      crontab: [
        {"0 0 * * *", Lor.Worker.ResetCredentialsQuota}
      ],
      timezone: "America/Los_Angeles"
    }
  ],
1 Like