Oban setup for separate worker/web nodes

Hi,

I’m tracing down an issue with Oban. I’ve a staging environment which works correctly and a production environment which has issues. When the Oban.insert_all is used to insert jobs to a queue all jobs are started at once while that specific queue is configured to have a concurrency of 1. One of the differences between staging and production is that there are multiple worker nodes. This gave me doubts about our queue configuration. On the web nodes we have the the queues set to: queues: [] and on the worker node set to: queues: [default: 2, images: 1, ocr: 1].

What is the correct configuration of Oban with a worker/web nodes setup?

  1. Start Oban in the application.ex only on the worker nodes and you can still use Oban.insert_all on web nodes
  2. Start Oban on all nodes (web and worker), but with a different queues configuration

cc @sorentwo

Thanks

2 Likes

If you’re using free version of Oban, it only has limit: local_limit option, i.e. you have local limit of 2 concurrent jobs in the default queue per worker. So, if you have 6 workers, you will have 12 concurrent jobs, if you have 12 workers you have 24… etc.

If you want to use global_limit, you will probably have to fetch Oban.Pro and use it’s Smart Engine (docs here: Smart Engine — Oban v2.11.0)

1 Like

Hi @hubertlepicki,

I’m aware that the concurrency is a local configuration option. The issue I’m seeing is that when I insert 20 jobs using Oban.insert_all in my setup with 3 worker nodes and a queue with a concurrency of 1 results in 1 node starting all the jobs at once. What I’m expecting to see is 3 active jobs at once.

1 Like

are you setting up those jobs as unique jobs?
per Oban docs:

The built-in Basic engine doesn’t support unique inserts for insert_all and you must use insert/3 for per-job unique support. Alternatively, the SmartEngine in Oban Pro supports bulk unique jobs and automatic batching.

https://hexdocs.pm/oban/Oban.html#insert_all/3

1 Like

Hi @cevado

Thank you for your reply. We are not using the unique jobs feature here.

The worker is initialised with:
use Oban.Worker, priority: 3, max_attempts: 1, queue: :ocr

The jobs are inserted with:
Oban.insert_all

And the queue is configured with:
queues: [default: 2, images: 1, ocr: 1]

Option 1 won’t work because you need the Oban supervisor for insert_all to work. The second option, where you disable those queues on your other nodes, is the proper way to do it. The Splitting Queues Between Nodes guide describes a solution for this exact situation, and Pro’s DynamicQueues plugin adds other conveniences.

That doesn’t sound right. Queues on a single node won’t run jobs beyond the concurrency limit. However, if they’re running fast enough, the other nodes may be unable to pick up the jobs. You can check where each job ran in the job’s attempted_by field and the exact time they started/finished at with the attempted_at and completed_at timestamps.

That wouldn’t make any difference. Uniqueness only effects inserting jobs; it has no bearing on how jobs are executed.

3 Likes

Thank you for replying! I’ve used the guide you’ve mentioned to configure the system.

The job takes a few seconds to finish. I’ve copied the data from the oban_jobs table a few days ago (not the best visualisation, null’s are not visible:

id queue worker  args  errors  attempt max_attempts  inserted_at scheduled_at  attempted_at  completed_at  attempted_by  discarded_at  priority  tags  meta  cancelled_at  state
20556197	ocr	Project.Ocr.OcrPageJob	{"page_id": "8e1bf275-00bc-4154-8fdd-f8c9a5641775"}	{}	1	1	2022-09-08 09:10:25.801283	2022-09-08 09:10:25.801283	2022-09-08 09:10:27.350033		{node@10.0.4.25}		3	{}	{}		executing
20556194	ocr	Project.Ocr.OcrPageJob	{"page_id": "e77e5a0b-eef6-4417-9137-e984284d732f"}	{}	1	1	2022-09-08 09:10:25.801283	2022-09-08 09:10:25.801283	2022-09-08 09:10:27.350033		{node@10.0.4.25}		3	{}	{}		executing
20556193	ocr	Project.Ocr.OcrPageJob	{"page_id": "d6e7f709-7fba-465f-b55b-b5707e9c5367"}	{}	1	1	2022-09-08 09:10:25.801283	2022-09-08 09:10:25.801283	2022-09-08 09:10:27.350033		{node@10.0.4.25}		3	{}	{}		executing
20556192	ocr	Project.Ocr.OcrPageJob	{"page_id": "a4217fc6-83a2-4edb-ae98-355e1d0e687a"}	{}	1	1	2022-09-08 09:10:25.801283	2022-09-08 09:10:25.801283	2022-09-08 09:10:27.350033		{node@10.0.4.25}		3	{}	{}		executing
20556190	ocr	Project.Ocr.OcrPageJob	{"page_id": "8f5d1815-7467-4528-9267-36c78dc91abf"}	{}	1	1	2022-09-08 09:10:25.801283	2022-09-08 09:10:25.801283	2022-09-08 09:10:27.350033		{node@10.0.4.25}		3	{}	{}		executing
20556189	ocr	Project.Ocr.OcrPageJob	{"page_id": "5fb1aa2b-de56-4169-a8d7-3877c0a93dd2"}	{}	1	1	2022-09-08 09:10:25.801283	2022-09-08 09:10:25.801283	2022-09-08 09:10:27.350033		{node@10.0.4.25}		3	{}	{}		executing
20556187	ocr	Project.Ocr.OcrPageJob	{"page_id": "36a4b116-1a54-47d5-8c7d-01cbc6a0ecba"}	{}	1	1	2022-09-08 09:10:25.801283	2022-09-08 09:10:25.801283	2022-09-08 09:10:27.350033		{node@10.0.4.25}		3	{}	{}		executing

It stands out that all attempted_at values have the same timestamp.

Not sure if this is relevant, but in this project we have a cluster, so the nodes can talk to each other, but Oban is configured using the default LISTEN/NOTIFY PostgreSQL functions.

Config of the web process by running Oban.config():

%Oban.Config{
  dispatch_cooldown: 5,
  engine: Oban.Queue.BasicEngine,
  get_dynamic_repo: nil,
  log: false,
  name: Oban,
  node: "node@10.0.3.22",
  notifier: Oban.Notifiers.Postgres,
  peer: Oban.Peer,
  plugins: [
    {Oban.Plugins.Cron,
     [
       timezone: "Europe/Amsterdam",
       crontab: [
         ...
       ]
     ]},
    Oban.Plugins.Pruner,
    Oban.Plugins.Stager
  ],
  prefix: "public",
  queues: [],
  repo: Project.Repo,
  shutdown_grace_period: 15000
}

and the worker:

%Oban.Config{
  dispatch_cooldown: 5,
  engine: Oban.Queue.BasicEngine,
  get_dynamic_repo: nil,
  log: false,
  name: Oban,
  node: "node@10.0.4.27",
  notifier: Oban.Notifiers.Postgres,
  peer: Oban.Peer,
  plugins: [
    {Oban.Plugins.Cron,
     [
       timezone: "Europe/Amsterdam",
       crontab: [
         ...
       ]
     ]},
    Oban.Plugins.Pruner,
    Oban.Plugins.Stager
  ],
  prefix: "public",
  queues: [default: [limit: 2], images: [limit: 1], ocr: [limit: 1]],
  repo: Project.Repo,
  shutdown_grace_period: 15000
}

It certainly does. Which version of Oban are you running?

No, that doesn’t have any bearing. Notifications are used for pausing, canceling jobs, gossipping, etc., not for execution.

1 Like

The version is 2.11.3.

I’ve never seen or heard of a queue ignoring the concurrency limit that way. You may be experiencing a bug fixed in Oban v2.12.1 due to subquery instability during a select for update.

We upgraded to 2.13.3, but unfortunately we’re still experiencing this issue. It looks like it is related to the Oban.insert_all, because we’re only experiencing it with the jobs inserted like that. And maybe only when that queue is empty, because the jobs after that first insert_all runs with a concurrency of 1.

If you can make a minimal reproduction of your setup that exhibits the problem then I’ll investigate and work on a fix.

1 Like

I’m trying to get a reproduction. The non default we have in our applications is that there are two databases and the prepare: :unnamed setting enabled in Ecto. At the moment I do not have a test case where I can reproduce it consistently, but I’ve seen the behaviour a few times.

I think it is not related to the multiple database setup of Ecto. I just saw it again with only the prepare: :unnamed setting enabled and a single database.

521	completed	ocr	Demo.Job	{"id": 1}	{}	1	1	2022-09-14 11:26:10.88802	2022-09-14 11:26:10.88802	2022-09-14 11:26:10.89765	2022-09-14 11:26:11.907732	{9to5-MacBook-Pro}		3	{}	{}	
522	completed	ocr	Demo.Job	{"id": 2}	{}	1	1	2022-09-14 11:26:10.88802	2022-09-14 11:26:10.88802	2022-09-14 11:26:10.89765	2022-09-14 11:26:11.907957	{9to5-MacBook-Pro}		3	{}	{}	
523	completed	ocr	Demo.Job	{"id": 3}	{}	1	1	2022-09-14 11:26:10.88802	2022-09-14 11:26:10.88802	2022-09-14 11:26:10.89765	2022-09-14 11:26:11.909088	{9to5-MacBook-Pro}		3	{}	{}	
524	completed	ocr	Demo.Job	{"id": 4}	{}	1	1	2022-09-14 11:26:10.88802	2022-09-14 11:26:10.88802	2022-09-14 11:26:10.89765	2022-09-14 11:26:11.90967	{9to5-MacBook-Pro}		3	{}	{}	
525	completed	ocr	Demo.Job	{"id": 5}	{}	1	1	2022-09-14 11:26:10.88802	2022-09-14 11:26:10.88802	2022-09-14 11:26:10.89765	2022-09-14 11:26:11.910167	{9to5-MacBook-Pro}		3	{}	{}	
526	completed	ocr	Demo.Job	{"id": 6}	{}	1	1	2022-09-14 11:26:10.88802	2022-09-14 11:26:10.88802	2022-09-14 11:26:10.89765	2022-09-14 11:26:11.915585	{9to5-MacBook-Pro}		3	{}	{}	
527	completed	ocr	Demo.Job	{"id": 7}	{}	1	1	2022-09-14 11:26:10.88802	2022-09-14 11:26:10.88802	2022-09-14 11:26:10.89765	2022-09-14 11:26:11.915452	{9to5-MacBook-Pro}		3	{}	{}	
528	completed	ocr	Demo.Job	{"id": 8}	{}	1	1	2022-09-14 11:26:10.88802	2022-09-14 11:26:10.88802	2022-09-14 11:26:10.89765	2022-09-14 11:26:11.915305	{9to5-MacBook-Pro}		3	{}	{}	
529	completed	ocr	Demo.Job	{"id": 9}	{}	1	1	2022-09-14 11:26:10.88802	2022-09-14 11:26:10.88802	2022-09-14 11:26:10.89765	2022-09-14 11:26:11.915722	{9to5-MacBook-Pro}		3	{}	{}	
530	completed	ocr	Demo.Job	{"id": 10}	{}	1	1	2022-09-14 11:26:10.88802	2022-09-14 11:26:10.88802	2022-09-14 11:26:10.89765	2022-09-14 11:26:11.915795	{9to5-MacBook-Pro}		3	{}	{}	
531	completed	ocr	Demo.Job	{"id": 1}	{}	1	1	2022-09-14 11:26:36.447177	2022-09-14 11:26:36.447177	2022-09-14 11:26:36.455261	2022-09-14 11:26:37.471022	{9to5-MacBook-Pro}		3	{}	{}	
532	completed	ocr	Demo.Job	{"id": 2}	{}	1	1	2022-09-14 11:26:36.447177	2022-09-14 11:26:36.447177	2022-09-14 11:26:36.455261	2022-09-14 11:26:37.470968	{9to5-MacBook-Pro}		3	{}	{}	
533	completed	ocr	Demo.Job	{"id": 3}	{}	1	1	2022-09-14 11:26:36.447177	2022-09-14 11:26:36.447177	2022-09-14 11:26:36.455261	2022-09-14 11:26:37.470802	{9to5-MacBook-Pro}		3	{}	{}	
534	completed	ocr	Demo.Job	{"id": 4}	{}	1	1	2022-09-14 11:26:36.447177	2022-09-14 11:26:36.447177	2022-09-14 11:26:36.455261	2022-09-14 11:26:37.472534	{9to5-MacBook-Pro}		3	{}	{}	
535	completed	ocr	Demo.Job	{"id": 5}	{}	1	1	2022-09-14 11:26:36.447177	2022-09-14 11:26:36.447177	2022-09-14 11:26:36.455261	2022-09-14 11:26:37.4725	{9to5-MacBook-Pro}		3	{}	{}	
536	completed	ocr	Demo.Job	{"id": 6}	{}	1	1	2022-09-14 11:26:36.447177	2022-09-14 11:26:36.447177	2022-09-14 11:26:36.455261	2022-09-14 11:26:37.472228	{9to5-MacBook-Pro}		3	{}	{}	
537	completed	ocr	Demo.Job	{"id": 7}	{}	1	1	2022-09-14 11:26:36.447177	2022-09-14 11:26:36.447177	2022-09-14 11:26:36.455261	2022-09-14 11:26:37.472295	{9to5-MacBook-Pro}		3	{}	{}	
538	completed	ocr	Demo.Job	{"id": 8}	{}	1	1	2022-09-14 11:26:36.447177	2022-09-14 11:26:36.447177	2022-09-14 11:26:36.455261	2022-09-14 11:26:37.472545	{9to5-MacBook-Pro}		3	{}	{}	
539	completed	ocr	Demo.Job	{"id": 9}	{}	1	1	2022-09-14 11:26:36.447177	2022-09-14 11:26:36.447177	2022-09-14 11:26:36.455261	2022-09-14 11:26:37.472601	{9to5-MacBook-Pro}		3	{}	{}	
540	completed	ocr	Demo.Job	{"id": 10}	{}	1	1	2022-09-14 11:26:36.447177	2022-09-14 11:26:36.447177	2022-09-14 11:26:36.455261	2022-09-14 11:26:37.473616	{9to5-MacBook-Pro}		3	{}	{}	

It’s common to use prepare: :unnamed, I don’t believe that is an issue.

The output you’ve shared doesn’t include any headers, so I can’t tell which timestamp is which. Here is a short script I wrote to verify queues processes jobs sequentially:

Application.put_env(:oban, Oban.Test.Repo,
  name: Oban.Test.Repo,
  url: "postgres://localhost:5432/oban_test",
  prepare: :unnamed
)

defmodule SomeWorker do
  use Oban.Worker

  @impl Worker
  def perform(%{args: %{"id" => id}}) do
    IO.puts "JOB #{id} started at #{System.system_time(:millisecond)}"

    Process.sleep(100)

    :ok
  end
end

{:ok, _} = Oban.Test.Repo.start_link()
{:ok, _} = Oban.start_link(queues: [default: 1], repo: Oban.Test.Repo)

1..9
|> Enum.map(&SomeWorker.new(%{id: &1}))
|> Oban.insert_all()

That generates this output, showing that jobs are started sequentially:

JOB 1 started at 1663158893513
JOB 2 started at 1663158893630
JOB 3 started at 1663158893747
JOB 4 started at 1663158893861
JOB 5 started at 1663158893986
JOB 6 started at 1663158894108
JOB 7 started at 1663158894229
JOB 8 started at 1663158894344
JOB 9 started at 1663158894458

The oban_jobs table shows that the attempted_at and completed_at times are staggered as expected (note it is only showing the time and not the date):

 id  │    args    │   state   │  inserted_at    │  attempted_at   │  completed_at
═════╪════════════╪═══════════╪═════════════════╪═════════════════╪═════════════════
 671 │ {"id": 1}  │ completed │ 12:34:53.486301 │ 12:34:53.505758 │ 12:34:53.613663
 672 │ {"id": 2}  │ completed │ 12:34:53.486301 │ 12:34:53.623178 │ 12:34:53.730719
 673 │ {"id": 3}  │ completed │ 12:34:53.486301 │ 12:34:53.740571 │ 12:34:53.84865
 674 │ {"id": 4}  │ completed │ 12:34:53.486301 │ 12:34:53.857146 │ 12:34:53.961968
 675 │ {"id": 5}  │ completed │ 12:34:53.486301 │ 12:34:53.976616 │ 12:34:54.087918
 676 │ {"id": 6}  │ completed │ 12:34:53.486301 │ 12:34:54.102646 │ 12:34:54.20998
 677 │ {"id": 7}  │ completed │ 12:34:53.486301 │ 12:34:54.224589 │ 12:34:54.329783
 678 │ {"id": 8}  │ completed │ 12:34:53.486301 │ 12:34:54.339628 │ 12:34:54.445821
 679 │ {"id": 9}  │ completed │ 12:34:53.486301 │ 12:34:54.455605 │ 12:34:54.559764

Getting the issue reproduced is hard, but I’m still trying to figure out in which direction the issue is. I’ve changed the insert_all to a Enum.each with an Oban.insert. The issue is still there. I’ve attached a screenshot of the database. The attempted_at timestamp is the same for the executing jobs.

To verify the config of the worker node I connected remotely to the node and ran: Oban.config()

%Oban.Config{
  dispatch_cooldown: 5,
  engine: Oban.Engines.Basic,
  get_dynamic_repo: nil,
  log: false,
  name: Oban,
  node: "node-name-here@10.0.4.30",
  notifier: Oban.Notifiers.Postgres,
  peer: Oban.Peers.Postgres,
  plugins: [
    {Oban.Plugins.Cron,
     [
       timezone: "Europe/Amsterdam",
       crontab: [
         ...
       ]
     ]},
    Oban.Plugins.Pruner,
    Oban.Plugins.Stager
  ],
  prefix: "public",
  queues: [default: [limit: 2], images: [limit: 1], ocr: [limit: 1]],
  repo: Project.Repo,
  shutdown_grace_period: 15000,
  testing: :disabled
}

When I look how the jobs are updated into an executing state: oban/lib/oban/engines/basic.ex at 44803121eb16be6a4dffb147aa257555c421050c · sorentwo/oban · GitHub
It looks like the demand is too high, but that can only happen if the limit is higher than 1 because map_size returns a non_neg_integer. Is it possible that I set the limit incorrectly?

Thanks!

Doubtful, the limit is validated before starting or updating the value on a running queue. Unless you’re using :sys.replace_state/2 to meddle with the queue’s producer, there’s no way to get an invalid limit. To verify the configuration, you can get the queue’s current state with Oban.check_queue/1.

Does it always run the same number of jobs simultaneously? From your screenshot, it looks like there are 24 with the same timestamp.

I just caught it running multiple jobs on the ocr queue.

iex(node@10.0.3.26)2> Oban.check_queue(queue: :ocr)    
%{
  limit: 1,
  name: Oban,
  node: "node@10.0.3.26",
  paused: false,
  queue: "ocr",
  refresh_interval: 30000,
  running: [20660303, 20660276, 20660255, 20660257, 20660292, 20660280,
   20660293],
  started_at: ~U[2022-09-22 10:32:26.603968Z],
  updated_at: ~U[2022-09-22 14:08:18.654687Z]
}

Right now I have 7 jobs with the same timestamp executing. But I do not see a pattern with the amount happening. It looks like if the queue is empty and new jobs are queued this happens. If new jobs are inserted after the queue is working on jobs the new jobs are executed one by one.

Is there something else I can investigate? I have this situation multiple times per day on our production environment. So I can deploy some extra debug logs for example.