Hi there. I am currently working on migrating our current queue system from GenServers to Oban (Pro). I am implementing this in phases, so not all entities will be running in Oban yet, which means based on a condition, some entities use our existing queue, and the remainder get queued in Oban.
We want to prioritize the entities using our existing queue system, so the plan was to pause the relevant Oban queues when the current queue is not empty, then resume Oban queues when that queue is empty.
However, I have run into Ecto.StaleEntryError
Last message: {:notification, :signal, %{"action" => "pause", "ident" => "any", "queue" => <omitted>}}
Last message: {:notification, :signal, %{"action" => "resume", "ident" => "any", "queue" => <omitted>}}
I have tried:
- Using
Oban.check_queue
and pause/resume based onqueue.paused
value, however that requires a db query call every time an entity is queued, which is not very efficient.StaleEntryError
occurs - Adding a boolean field
oban_paused
to my GenServer states, this prevents a db query every time. However we have multiple GenServers that share a queue, so theoban_paused
boolean is inaccurate based on the queue. This also still has aStaleEntryError
on a specific GenServer that doesn’t share a queue, so it seems like our system is too quick and a race condition happens as well. - Wrapping
Oban.pause/resume_queue
in atry catch
, but this doesn’t work either.
The next solution I thought of doing was manually pausing (changing the state) Oban.Job
instead of the queue itself, which I think would work out better especially for our GenServer states that share a queue, since we rate limit and partition our queues based on that. However, I don’t see a suitable state
that can be used, aside from scheduled
, but that would require setting a time for the Job, which isn’t suitable for our use case.
I am looking for an easy workaround, because we won’t need to resume/pause queues like this after our entire current system has been moved to Oban.
Any other suggestions are welcome!