Getting StaleEntryError on DynamicQueues.update

Hi there. I’m trying to update the rate_limit on queues based on database config that is retrieved on app startup. This update is called within the init of a Supervisor that retrieves the config as part of its init.

This is the relevant code that I have. In this specific instance, it’s trying to update the rate_limit: [allowed: 200] to rate_limit: [allowed: 1].

def check_and_update_rate_limit(ssps) do
    Enum.each(@queues, fn queue_name ->
      relevant_ssps = get_relevant_ssps(queue_name, ssps)

      [queue: queue_name]
      |> Oban.check_queue()
      |> maybe_update_rate_limit(relevant_ssps)
    end)
  end
defp maybe_update_rate_limit(%{rate_limit: rate_limit} = queue, [_ | _] = ssps) do
    ssps
    |> calculate_average_interval()
    |> calculate_rate_limit()
    |> update_rate_limit(rate_limit.allowed, queue)
  end

  defp maybe_update_rate_limit(
         %{rate_limit: rate_limit} = queue,
         %{interval_callback: interval} = _ssp
       ) do
    interval
    |> calculate_rate_limit()
    |> update_rate_limit(rate_limit.allowed, queue)
  end

  defp update_rate_limit({allowed, period}, current_allowed, queue)
       when allowed != current_allowed do
    Oban.check_queue(queue: queue.queue) |> IO.inspect(label: "latest queue")
    Oban.Pro.Plugins.DynamicQueues.update(queue.queue,
      rate_limit: [allowed: allowed, period: period]
    )
  end

Everything runs fine until I get to update_rate_limit.

This is the queue that’s printed right before the update

latest queue: %{
  name: "Oban",
  node: <omitted>,
  running: [],
  queue: <omitted>,
  started_at: ~U[2024-09-26 10:43:45.346350Z],
  uuid: "01922dee-0045-7765-86f3-c6a7c1b8a862",
  global_limit: %Oban.Pro.Producer.Meta.GlobalLimit{
    allowed: 200,
    tracked: %{},
    partition: nil
  },
  local_limit: 200,
  rate_limit: %Oban.Pro.Producer.Meta.RateLimit{
    allowed: 5,
    period: 1,
    window_time: 1727347425,
    windows: [%{"curr_count" => 0, "prev_count" => 0}],
    partition: nil
  },
  paused: true,
  updated_at: ~U[2024-09-26 10:43:45.346352Z]
}

This is the error stack trace

11:43:49.421 [error] GenServer {Oban.Registry, {Oban, {:plugin, Oban.Pro.Plugins.DynamicQueues}}} terminating
** (Ecto.StaleEntryError) attempted to update a stale struct:

%Oban.Pro.Queue{__meta__: #Ecto.Schema.Metadata<:loaded, "public", "oban_queues">, name: <omitted>, lock_version: 11, only: %Oban.Pro.Queue.Only{mode: nil, op: nil, key: nil, value: nil}, opts: %Oban.Pro.Queue.Opts{ack_async: nil, local_limit: 200, paused: true, refresh_interval: nil, xact_delay: nil, xact_retry: nil, global_limit: %Oban.Pro.Queue.Opts.GlobalLimit{allowed: 200, partition: nil}, rate_limit: %Oban.Pro.Queue.Opts.RateLimit{allowed: 1, period: 1, window_time: nil, partition: nil}}, inserted_at: ~U[2024-09-24 12:53:45.521465Z], updated_at: ~U[2024-09-26 10:43:48.516420Z]}

    (ecto 3.10.3) lib/ecto/repo/schema.ex:781: Ecto.Repo.Schema.apply/4
    (ecto 3.10.3) lib/ecto/repo/schema.ex:467: anonymous fn/15 in Ecto.Repo.Schema.do_update/4
    (ecto 3.10.3) lib/ecto/repo/schema.ex:1004: anonymous fn/3 in Ecto.Repo.Schema.wrap_in_transaction/6
    (ecto_sql 3.10.2) lib/ecto/adapters/sql.ex:1352: anonymous fn/3 in Ecto.Adapters.SQL.checkout_or_transaction/4
    (db_connection 2.7.0) lib/db_connection.ex:1756: DBConnection.run_transaction/4
    (oban_pro 1.5.0-rc.1) lib/oban/pro/plugins/dynamic_queues.ex:579: Oban.Pro.Plugins.DynamicQueues.scale_queue/3
    (oban_pro 1.5.0-rc.1) lib/oban/pro/plugins/dynamic_queues.ex:421: Oban.Pro.Plugins.DynamicQueues.handle_info/2
    (stdlib 5.2) gen_server.erl:1095: :gen_server.try_handle_info/3

From what I can see, everything looks identical except for the updated_at timestamp. I’m not sure if this is a bug or if I’m doing something wrong.

Hi @lamxw2, thanks for asking with so much detail. This is an excellent use of rate limits, and it’s awesome to see that you’re calculating rates dynamically like that.

Is it possible that you’re running the same code on multiple nodes? The stale update error could happen if updates happened concurrently.

Also, which version of Pro are you using? We plan on putting in a safeguard for stale updates in the next v1.5 release candidate.

2 Likes

Hi @sorentwo , thanks for the response. That is a huge compliment, thank you :smile:

I asked my coworker and he said that we are not running it on multiple nodes.

We are using 1.5.0-rc.1. I’ve tried upgrading to rc.3 but still faced the same error so I reverted it. That’s great to hear! Do you have any estimates as to when the next release may be?

Hi, just to update, I’ve updated my Oban Pro version to the latest rc.4 and it has fixed it!

1 Like