Pausing Jobs Instead of Queue?

Hi there. I am currently working on migrating our current queue system from GenServers to Oban (Pro). I am implementing this in phases, so not all entities will be running in Oban yet, which means based on a condition, some entities use our existing queue, and the remainder get queued in Oban.

We want to prioritize the entities using our existing queue system, so the plan was to pause the relevant Oban queues when the current queue is not empty, then resume Oban queues when that queue is empty.

However, I have run into Ecto.StaleEntryError

Last message: {:notification, :signal, %{"action" => "pause", "ident" => "any", "queue" => <omitted>}}
Last message: {:notification, :signal, %{"action" => "resume", "ident" => "any", "queue" => <omitted>}}

I have tried:

  1. Using Oban.check_queue and pause/resume based on queue.paused value, however that requires a db query call every time an entity is queued, which is not very efficient. StaleEntryError occurs
  2. Adding a boolean field oban_paused to my GenServer states, this prevents a db query every time. However we have multiple GenServers that share a queue, so the oban_paused boolean is inaccurate based on the queue. This also still has a StaleEntryError on a specific GenServer that doesn’t share a queue, so it seems like our system is too quick and a race condition happens as well.
  3. Wrapping Oban.pause/resume_queue in a try catch, but this doesn’t work either.

The next solution I thought of doing was manually pausing (changing the state) Oban.Job instead of the queue itself, which I think would work out better especially for our GenServer states that share a queue, since we rate limit and partition our queues based on that. However, I don’t see a suitable state that can be used, aside from scheduled, but that would require setting a time for the Job, which isn’t suitable for our use case.

I am looking for an easy workaround, because we won’t need to resume/pause queues like this after our entire current system has been moved to Oban.

Any other suggestions are welcome!

Would using the built in priority feature work here instead of trying to juggle priority yourself via pausing and unpausing?

Nope it would not, because our entities have already occupied those priorities.

For the specific part that you quoted, we are “juggling” the priority ourselves because we don’t want those entities to use Oban for the time being, since we are moving them to Oban in phases.

Basically, we have entities with our own assigned priorities from 1 to >1000, with the lower number being the highest priority. Since moving an entire system is tricky, we want to move only internal priorities 300 and above to Oban, while keeping internal priorities <300 on our current system. These 1-1000+ priorities are grouped according to Oban’s 0-9 priorities.

The Ecto.StaleEntryError you’re getting is from a bug in DynamicQueues that was fixed in the rather recent Pro v1.5.0-rc.4.

Note that Oban.check_queue doesn’t query the database. It returns information directly from the queue’s producer process.

The easiest workaround is to upgrade to the latest RC (it’s stable, no known bugs at this point) and avoid the annoying StaleEntryError bug :slightly_smiling_face:.

2 Likes

The Ecto.StaleEntryError you’re getting is from a bug in DynamicQueues that was fixed in the rather recent Pro v1.5.0-rc.4.

Hi, you may remember me from this post Getting StaleEntryError on DynamicQueues.update :smile:

I have been on the latest Oban Pro rc version ever since then, but unfortunately am facing this error now.

Of course :slightly_smiling_face:

You’re positive it’s rc.4? The issue was reproducible in tests and there were numerous logic changes, including a rescue for that specific exception.

You’re positive it’s rc.4? The issue was reproducible in tests and there were numerous logic changes, including a rescue for that specific exception.

Yes, I’m positive. In my mix.lock, the version is 1.5.0-rc.4. Unless the mix.lock isnt an accurate indicator?

No, that’s an accurate indicator. I’m not sure how you’re still seeing that error then. Which mechanism are you using to pause? Will you share your current code?

Just to remove all variables, did you do these steps?

rm -rf _build deps
mix do deps.get, compile

And then try again?

I had not, but I just tried and it didn’t resolve it unfortunately

Which mechanism are you using to pause? Will you share your current code?

Just pausing the queue based on the queue name. Sure. I have also included the other code I tried as commented out code

1st module

  defp pop_and_send(%{queue: _queue, name: _name, priority_queue: []} = state) do
    ObanQueue.use_oban?() |> maybe_resume_oban_queues(state)
  end

  # to modify when high priority entities are moved to oban
  defp pop_and_send(%{name: name, priority_queue: priority_queue} = state) do
    # IO.inspect(state.oban_paused, label: "oban_paused when priority queue not empty")

    # Pause when `priority_queue` is not empty
    if ObanQueue.use_oban?() and !state.oban_paused, do: ObanQueue.pause_queue(name)
    # if ObanQueue.use_oban?() do
    #   queue = Oban.check_queue(queue: ObanQueue.set_queue(name))
    #   if !queue.paused, do: ObanQueue.pause_queue(name)
    # end

    # ... <logic involving our existing system here>

    %__MODULE__{state | priority_queue: priority_queue, oban_paused: true}
  end

  # check if queue is paused first
  defp maybe_resume_oban_queues(true, %{name: name, oban_paused: true} = state) do
    ObanQueue.resume_queue(name)

    %__MODULE__{state | oban_paused: false}
  end

  defp maybe_resume_oban_queues(true, %{oban_paused: false} = state), do: state

  defp maybe_resume_oban_queues(false, %{queue: queue, name: name, priority_queue: []} = state) do
    case Qex.pop(queue) do
      {:empty, _queue} ->
        state

      {{:value, pid}, queue} ->
        # ... <logic involving our existing system here>

        %__MODULE__{state | queue: queue}
    end
  end

ObanQueue module

@doc "Resume queue based on SSP"
  # def resume_queue(ssp) do
  #   try do
  #     Oban.resume_queue(queue: set_queue(ssp))
  #   catch
  #     type, reason ->
  #       error = Exception.format(type, reason, __STACKTRACE__)
  #       Logger.warning("Blah blah #{error}")
  #       :ok
  #   end
  # end

  def resume_queue(ssp), do: Oban.resume_queue(queue: set_queue(ssp))

  @doc "Pause queue based on SSP"
  # def pause_queue(ssp) do
  #   try do
  #     Oban.pause_queue(queue: set_queue(ssp))
  #   catch
  #     type, reason ->
  #       error = Exception.format(type, reason, __STACKTRACE__)
  #       Logger.warning("Blah blah #{error}")
  #       :ok
  #   end
  # end

  def pause_queue(ssp), do: Oban.pause_queue(queue: set_queue(ssp))

set_queue is a function that just identifies the queue name based on the variable

Sorry, I’m not able to reproduce the issue locally, or in a staging environment. Here are the commands:

iex(app@10.0.153.225)9> Oban.pause_queue(queue: :default)
:ok
iex(app@10.0.153.225)10> Oban.check_queue(queue: :default)
%{
  name: "Oban",
  node: "worker-i-07744c871b2ea59ba",
  running: [],
  queue: "default",
  started_at: ~U[2024-10-23 18:32:56.023906Z],
  updated_at: ~U[2024-10-24 19:07:34.908989Z],
  uuid: "0192baa7-3fd7-78f7-9a6f-e70af8d41282",
  paused: true,
  global_limit: nil,
  rate_limit: nil,
  local_limit: 10
}
iex(app@10.0.153.225)11> Oban.Pro.Plugins.DynamicQueues.all() |> Enum.find(& &1.name == "default") |> Map.get(:opts)
%Oban.Pro.Queue.Opts{
  ack_async: nil,
  local_limit: 10,
  paused: true,
  refresh_interval: nil,
  xact_delay: nil,
  xact_retry: nil,
  global_limit: nil,
  rate_limit: nil
}
iex(app@10.0.153.225)12> Oban.resume_queue(queue: "default")
:ok
iex(app@10.0.153.225)13> Oban.Pro.Plugins.DynamicQueues.all() |> Enum.find(& &1.name == "default") |> Map.get(:opts)
%Oban.Pro.Queue.Opts{
  ack_async: nil,
  local_limit: 10,
  paused: false,
  refresh_interval: nil,
  xact_delay: nil,
  xact_retry: nil,
  global_limit: nil,
  rate_limit: nil
}

If you’re able to create a small reproduction I’m happy to continue investigating.

Thank you for attempting. I’m not too sure how I can make it reproducible since I don’t know what’s causing it.
After discussing with my boss, he decided he wanted the queues to run and not worry about pausing them. Thanks for the help.