Hi there. I’m trying to update the rate_limit
on queues based on database config that is retrieved on app startup. This update is called within the init
of a Supervisor
that retrieves the config as part of its init
.
This is the relevant code that I have. In this specific instance, it’s trying to update the rate_limit: [allowed: 200]
to rate_limit: [allowed: 1]
.
def check_and_update_rate_limit(ssps) do
Enum.each(@queues, fn queue_name ->
relevant_ssps = get_relevant_ssps(queue_name, ssps)
[queue: queue_name]
|> Oban.check_queue()
|> maybe_update_rate_limit(relevant_ssps)
end)
end
defp maybe_update_rate_limit(%{rate_limit: rate_limit} = queue, [_ | _] = ssps) do
ssps
|> calculate_average_interval()
|> calculate_rate_limit()
|> update_rate_limit(rate_limit.allowed, queue)
end
defp maybe_update_rate_limit(
%{rate_limit: rate_limit} = queue,
%{interval_callback: interval} = _ssp
) do
interval
|> calculate_rate_limit()
|> update_rate_limit(rate_limit.allowed, queue)
end
defp update_rate_limit({allowed, period}, current_allowed, queue)
when allowed != current_allowed do
Oban.check_queue(queue: queue.queue) |> IO.inspect(label: "latest queue")
Oban.Pro.Plugins.DynamicQueues.update(queue.queue,
rate_limit: [allowed: allowed, period: period]
)
end
Everything runs fine until I get to update_rate_limit
.
This is the queue that’s printed right before the update
latest queue: %{
name: "Oban",
node: <omitted>,
running: [],
queue: <omitted>,
started_at: ~U[2024-09-26 10:43:45.346350Z],
uuid: "01922dee-0045-7765-86f3-c6a7c1b8a862",
global_limit: %Oban.Pro.Producer.Meta.GlobalLimit{
allowed: 200,
tracked: %{},
partition: nil
},
local_limit: 200,
rate_limit: %Oban.Pro.Producer.Meta.RateLimit{
allowed: 5,
period: 1,
window_time: 1727347425,
windows: [%{"curr_count" => 0, "prev_count" => 0}],
partition: nil
},
paused: true,
updated_at: ~U[2024-09-26 10:43:45.346352Z]
}
This is the error stack trace
11:43:49.421 [error] GenServer {Oban.Registry, {Oban, {:plugin, Oban.Pro.Plugins.DynamicQueues}}} terminating
** (Ecto.StaleEntryError) attempted to update a stale struct:
%Oban.Pro.Queue{__meta__: #Ecto.Schema.Metadata<:loaded, "public", "oban_queues">, name: <omitted>, lock_version: 11, only: %Oban.Pro.Queue.Only{mode: nil, op: nil, key: nil, value: nil}, opts: %Oban.Pro.Queue.Opts{ack_async: nil, local_limit: 200, paused: true, refresh_interval: nil, xact_delay: nil, xact_retry: nil, global_limit: %Oban.Pro.Queue.Opts.GlobalLimit{allowed: 200, partition: nil}, rate_limit: %Oban.Pro.Queue.Opts.RateLimit{allowed: 1, period: 1, window_time: nil, partition: nil}}, inserted_at: ~U[2024-09-24 12:53:45.521465Z], updated_at: ~U[2024-09-26 10:43:48.516420Z]}
(ecto 3.10.3) lib/ecto/repo/schema.ex:781: Ecto.Repo.Schema.apply/4
(ecto 3.10.3) lib/ecto/repo/schema.ex:467: anonymous fn/15 in Ecto.Repo.Schema.do_update/4
(ecto 3.10.3) lib/ecto/repo/schema.ex:1004: anonymous fn/3 in Ecto.Repo.Schema.wrap_in_transaction/6
(ecto_sql 3.10.2) lib/ecto/adapters/sql.ex:1352: anonymous fn/3 in Ecto.Adapters.SQL.checkout_or_transaction/4
(db_connection 2.7.0) lib/db_connection.ex:1756: DBConnection.run_transaction/4
(oban_pro 1.5.0-rc.1) lib/oban/pro/plugins/dynamic_queues.ex:579: Oban.Pro.Plugins.DynamicQueues.scale_queue/3
(oban_pro 1.5.0-rc.1) lib/oban/pro/plugins/dynamic_queues.ex:421: Oban.Pro.Plugins.DynamicQueues.handle_info/2
(stdlib 5.2) gen_server.erl:1095: :gen_server.try_handle_info/3
From what I can see, everything looks identical except for the updated_at
timestamp. I’m not sure if this is a bug or if I’m doing something wrong.