Oban.SmartEngine :rate_limit doesn't work as expected

Tagging @sorentwo as he’s probably the only one to help me :slight_smile:

I am using Oban.Pro, with SmartEngine and a queue limited with all: global_limit, rate_limit and local limit.

The rate limit is per account_id, which is a parameter in args passed to worker.

The usual scenario is that an account would schedule a bunch of jobs in bursts. We don’t want to block the whole queue to process the jobs scheduled by one single account, and that’s the reason we use rate_limit by account_id.

We are experiencing, however, that if an account schedules a bunch of jobs, they indeed get rate limited, but also no other jobs that are scheduled after that account scheduled their jobs, are executed.

So we created ourselves the worst of two worlds: we are now executing only the jobs for the account that scheduled a bunch of them, others are waiting for completion, and we’re also doing it slowly - i.e. rate limitted.

To illustrate, consider the following code.

I have the following Oban config for my_queue:

config :jobs_runner,                                                                                                                                                     
  queues: 
    my_queue: [                                                                                                                                                          
      global_limit: 50,                                                                                                                                                  
      local_limit: 20,                                                                                                                                                   
      rate_limit: [                                                                                                                                                      
        allowed: 30,                                                                                                                                                     
        period: {1, :minute},                                                                                                                                            
        partition: [fields: [:args], keys: [:account_id]]                                                                                                                
      ]                                                                                                                                                                  
    ],

The intent here is to have a global cluster wide limit of 50 concurrent jobs, on a node it’s 20, and for given account we’d only want to execute 30 per minute.

Now, I have the following worker:

defmodule MyWorker do                                                                                                                                           
  use Oban.Pro.Worker, queue: :my_queue                                                                                                      
                                                                                                                                                             
  @impl Oban.Pro.Worker                                                                                                                                                  
  def process(                                                                                                                                                           
        %Oban.Job{                                                                                                                                                       
          args: %{                                                                                                                                                       
            "account_id" => account_id,                                                                                                                                  
          }                                                                                                                                                              
        }                                                                                                                                                                
      ) do                                                                                                                                                               
    Logger.warn("Processed job for account ID=#{account_id}")                                                                                                            
  end                                                                                                                                                                    
end  

And let’s say I have two accounts that schedule a bunch of jobs each, depending on how that happens - * and how many jobs they schedule* - the system either behaves either as expected or in an unexpected (blocking everything) way.

First, if I schedule the jobs at about the same time, and number of jobs doesn’t matter here, can be 50, can be 10000 per account, i.e.:

iex(a@127.0.0.1)2> (0..10000) |> Enum.each(fn _ -> MyWorker.new(%{account_id: 1}) |> Oban.insert(); MyWorker.new(%{account_id: 2}) |> Oban.insert() end)

I am (correctly) seeing immediately an output of 60 lines:

12:55:59.028 pid=<0.4243.0> [warning] Processed job for account ID=2
12:55:59.028 pid=<0.4245.0> [warning] Processed job for account ID=2
12:55:59.028 pid=<0.4244.0> [warning] Processed job for account ID=1
12:55:59.028 pid=<0.4242.0> [warning] Processed job for account ID=1
12:55:59.037 pid=<0.4246.0> [warning] Processed job for account ID=1                         
... 

Then, the queue pauses processing jobs for both accounts for the remaining time in a given minute, and the remaining jobs are completed too in a similar fashion, after appropriate pauses. This is expected and desired behavior.

Now, if I schedule the jobs this way:

iex(a@127.0.0.1)6> (0..10000) |> Enum.each(fn _ -> MyWorker.new(%{account_id: 1}) |> Oban.insert() end); (0..10000) |> Enum.each(fn _ -> MyWorker.new(%{account_id: 2}) |> Oban.insert() end)

I only get to see the following logs, in batches of 30 per minute:

13:04:41.341 pid=<0.4204.0> [warning] Processed job for account ID=1
13:04:41.341 pid=<0.4205.0> [warning] Processed job for account ID=1
13:04:41.341 pid=<0.4206.0> [warning] Processed job for account ID=1
13:04:41.341 pid=<0.4206.0> [warning] Processed job for account ID=1
...

This happens for a long time, i.e. not just during the time that the jobs are still being scheduled, but it appears to last until about all jobs for partition of account_id=1 get executed, at which time there’s a brief period when I see jobs from both partitions being executed (as expected) and then the remaining jobs, all from account_id=2 are being executed.

Note: I am scheduling a lot of jobs here to see the problem, i.e. 10,000 per account. You can also schedule 10,000 per account_id=1, and 50 for account_id=2, and it’ll be the same symptom.

If, however, I schedule only a small number of jobs, like 50, for each account, they are being executed correctly.

Unfortunately, we made the error of testing the rate limiter on a smaller amount of jobs, and noticed it’s working “correctly” - which it does, for a small number of jobs. We were surprised when we stumbled upon queues blocking in production (where bursts per account can be within thousands).

My guess is, somewhere where Oban queries for the next jobs, there’s a limit on the number of jobs it fetches. This limit is probably significantly smaller than 10,000. This probably results in the “next batch of jobs” being taken in only containing jobs of account_id=1. This batch of jobs is then rate limited per account, but it only contains jobs from account_id=1. It doesn’t go to fetch another batch of jobs belonging to different account at all, just waits. Then, another batch, again, for account_id=1 only is being fetched, rate limited etc.

@sorentwo can you help sort it out?

1 Like

@hubertlepicki You’re guess is entirely correct. In fact, that workaround/optimization was put in place precisely because of issues you ran into previously. What I didn’t account for was that the “head of line” would block to that extent. I’m investigating a better solution.

Okay thanks.

What we will do for now, and it may be actually a good enough solution in our case, is we will detect an account that schedules an unusually high amount of jobs. We will then schedule those jobs to a separate queue, where the blocking of concurrent jobs of other accounts doesn’t affect the usual flow of jobs in the system.

This other queue may also have > 1 “spike” of jobs from different accounts, it’d be nice if they weren’t blocking each other but it’s not as critical.

You could also space out large blocks of jobs to compensate. For example, if your rate limit only allows N jobs per hour, when you get a large influx of jobs, you can automatically space them out in blocks of N per hour:

list_of_args
|> Enum.chunk_every(1000)
|> Enum.with_index()
|> Enum.flat_map(fn {chunk, offset} ->
  opts = if offset == 0, do: [], else: [schedule_in: {offset, :hours}]

  Enum.map(chunk, &Worker.new(&1, opts))
end)
|> Oban.insert_all_jobs()

This is a very good idea. I think I may end up doing both things: offloading the jobs to separate queues and scheduling some of them in the future. Basically in your code, a small change:


list_of_args
|> Enum.chunk_every(1000)
|> Enum.with_index()
|> Enum.flat_map(fn {chunk, offset} ->
  opts = if offset == 0, do: [], else: [schedule_in: {offset, :hours}, queue: :my_other_queue] # <- added this

  Enum.map(chunk, &Worker.new(&1, opts))
end)
|> Oban.insert_all_jobs()
1 Like