Oban: having many dynamic queues okay or a bad idea?

Hi, wondering if anyone has experience creating a lot of queues dynamically with oban, and how that went for you.

I’m writing an app that will be connecting over SSH to potentially N number of servers per user, and I want to create a queue for each server. The idea is to always wait for the last operation on that server to finish before beginning the next.

It seems like theoretically there shouldn’t be an issue for this with Oban, but has anyone tried it out? Any issues?

Also, should I store these queues in a db table so that I can start them up again easily later? Or has anyone done this differently?

Thanks!

AFAIK from discussion in Slack it wasn’t designed to have a lot of queues.
Oban has an ability to set uniqueness based on arguments of the worker. I didn’t use this feature, but it should help for your use case.
When I needed similar functionality, Oban didn’t have uniqueness by argument and I solved it by using combination of static queues with 1 worker and :erlang.phash/2, so server1 always bind to queue3, server2 to queue5 and so on.

Sorry, I’m not sure I understand how uniqueness would help in this case. I thought that unique jobs prevented a duplicate of the same job from being created. Do they let me limit concurrency for a set of jobs - in this instance, all jobs connecting to the same server?

I suppose I could have a queue for ALL server connection tasks, and limit it to 1 concurrency, but I only need to limit concurrency for tasks connecting to the same server. Jobs connecting to server A shouldn’t have to wait for jobs connecting to server B to finish (or vice versa).

Thanks for the reply!

Oban, as it works currently, uses per-queue polling for scheduled jobs. That is the biggest issue with running a lot of queues, where “a lot” depends on your hardware but is about 50+ queues.

If you don’t need scheduled jobs, or you don’t need per-second resolution, then you can start queues with a higher poll_interval and eliminate a lot of db load.

1 Like

Thanks for replying! That makes sense, so then having N queues means that it’ll need to query N times per period (second?) To check if there are jobs for that queue?

Are there any other ways you’d recommend solving this particular problem?

Tag the job with a server ID, when the server is done with current job, get the next job by server id tag and schedule it to run now?

Or just keep a separate table of jobs to schedule by server id, and when they’re done with their current job pull from that and put into Oban?

You can still have a process per server managing that connection / listening for when commands are done etc.

Thanks for the ideas. Tagging it that way might work, although it doesn’t seem like it would necessarily guarantee that the jobs would always be done in order or limited to 1 concurrency, just that they mostly would, which might be an issue in my use case.

The separate table of jobs sounds like it would work, I was just hoping I could throw it all at oban and let it sort everything out. Sounds like this use case is a bit out of scope for Oban as is, so a hybrid approach like that might be in order. Thanks!

You could limit concurrency by having the worker check your server genserver if a job was currently running. If a job was running, then fail the next oban job and update the scheduled_in or scheduled_at.

Please pardon the slow reply, I’ve been on vacation for the past few weeks

This is entirely doable with a single worker through the use of :snooze. Here’s a pseudo example:

defmodule MyApp.SequentialWorker do
  use Oban.Worker, queue: :some_queue

  import Ecto.Query, only: [where: 3]

  @impl Worker
  def perform(%Job{args: args}) do
    if executable_for_user?(args) do
      # do normal stuff
    else
      # snooze for 60 seconds, make it as short as necessary
      {:snooze, 60}
    end
  end

  defp executable_for_user?(%{"user_id" => user_id}) do
    Job
    |> where(j, j.worker == Worker.to_string(__MODULE__))
    |> where(j, j.queue == "some_queue")
    |> where(j, j.state == "executing")
    |> where(j, fragment("? @> ?", j.args, ^%{user_id: user_id})
    |> MyApp.Repo.exists?()
  end
end

By checking whether there is another job executing you can force global concurrency. In this case the worker checks for other executing workers with the same user_id, though that is arbitrary and you could scope to anything you like.

It isn’t as optimized as running multiple queues with global concurrency, but it will work right now and is accurate.

2 Likes

Pardon my jumping on the same thread, but I have a similar issue. I would like to snooze the job and also all other jobs that can be identified by given tenant_id. How would you approach this?

One thing I can think of is querying the oban jobs table by given tenant_id and updating the records having given tenant_id in arguments, but I can easily see this being a performance bottleneck and putting a lot of pressure on the database.

Would there be a better way to snooze bunch of jobs this way?

My jobs are independent on the order, but if are executed in order, or close to order as they were scheduled, they have to do generally less work overall. So, snoozing just some jobs and not others will make the system do more work. So I am thinking maybe snoozing + changing job priority could do the trick here?

I think I am going to answer myself here but I’d still appreciate if @sorentwo could confirm:

the way to go if you want to snooze / rate limit jobs by something like :tenant_id is to have this passed as argument to jobs, indexed on jsonb field in Postgres, and then provide a custom conf.engine that replaces default implementation, esp this function:

With this functionality I can control which jobs are fetched to be processed next, and temporarily exclude tenants that are exceeding allowed limits. This would work without creating dynamic queues or being required to snooze a lot of jobs.

That is the most effective and efficient way to do it, though not a trivial undertaking. A snooze + priority approach could work if you can tolerate the churn and inefficiency.

This use case keeps coming up and I’m definitely looking to support it in Pro :slightly_smiling_face:

Yeah, we’re getting the subscription (in the process) but it looks fairly straightforward with the way Oban is architectured.

Let me reanimate this thread with extra questions @sorentwo , at the moment of legal nature.

So we have migrated from Exq to Oban very smoothly, and indeed we’re using Pro + Web UI. We’re already using SmartEngine that comes with Pro, and we’re using it’s unique features that are not in free version, i.e. rate_limit and also global_limit.

Now, here’s the legal question: we would like to add the rate limit per tenant functionality one of our queues, but keep the pro’s unique rate_limit / global limit features we already use for some other queues.

Can I just grab and modify the smart_engine.ex from pro distribution that comes into deps/ or would that be a crime of century?

Edit: I would also be happy with using different engines per different queues. Not sure how to do it or if it’s possible either, however.

Not the crime of the century, but also not something I can endorse.

This is something we’d love to provide to everybody that uses Pro. If you’re willing to share more details of your use case I’d love to talk through an API and implement it in Pro directly. You’re paying for Pro for a reason! Send a message if you’re interested :slightly_smiling_face:

Well, I would actually prefer if you implemented it yourself, so I don’t have to so if we can avoid crime as well and work for me that’d be ideal.

What we have is a multi-tenant system, where each tenant can schedule a number of background jobs. We want to prevent one, hugely intense tenant, from taking up all resources of the system, by queuing up thousands of background jobs.

These background jobs as well communicate with external API, which is rate limitted, and each tenant is using own API key meaning if they schedule and execute thousands of jobs at the same time they will also exhaust their API limits.

The solution to both of our problems would be to have rate limiting per queue, per tenant, or have a set of queues per tenant and rate limit them separately.

we currently don’t use anything like PostgreSQL schemas to separate tenants. Each tenant has ID, and their associated records all have tenant_id set on them.

I was thinking about having tenant_id as argument to each task scheduled as well. Then, I would fetch the jobs from tenants that are not exceeding the limits. But I am open to different solution that achieves the same goal.

1 Like

I believe I have a suitable API between our prior discussion, some outside requests, and the details you’ve provided here. It should be flexible enough to support a few different use cases, introspectable enough to figure out what’s going on in the engine, and fast enough to keep jobs processing smoothly.

I’ll spare the implementation details and focus purely on the SmartEngine API changes.

The feature is tentatively called “Partitioned Rate Limiting,” which will build off the existing rate-limit feature. The configuration spec would look like this:

partition: [fields: [:worker | :args], keys: [atom()]]

Translating that into your tenant_id use case:

rate_limit: [
  allowed: 60,
  period: :timer.seconds(60),
  partition: [fields: [:args], keys: [:tenant_id]]
]

Alternatively, if somebody wanted to rate limit purely by the worker:

rate_limit: [
  allowed: 60,
  period: :timer.seconds(60),
  partition: [fields: [:worker]]
]

The engine will then track and apply rate-limiting per-partition.

Will that work for you?

That would work for me very well!

1 Like

Thanks @sorentwo !
That would be an awesome addition to the smart engine! Can’t wait to try it out!

Same here. If there’s a beta version, we would love to try it. @sorentwo please let us know when/if there’s something we could try and report back.