It is definitely a work in progress with a lengthy list of things to fix and add. Now that the underlying db structures are stable and phoenix live view has been officially released I can make progress!
Note that this is something I’m hoping to control access to, so the GH repo isn’t public.
This question inspired me to look at using batches with Oban. I’ve finally posted a recipe on implementing batches with Oban, without any built-in support.
@sorentwo Is there a way to define priority within a queue? For instance I’d like to be able to create large amounts of batch jobs without delaying jobs created by an end-user. So my thought was to have a numerical priority indicator I could set which would allow user jobs to jump to the front of the queue if it’s filled with batch jobs with a lower priority.
In the “batch jobs” recipe, if say the final couple jobs go through final_batch_job? at the same time, wouldn’t they both get false and complete without inserting the callback job?
No, there isn’t a way to define priorities without making changes to Oban itself. Execution order is based on when a job was inserted/scheduled, which would conflict with priorities. It may be possible, but I haven’t put any effort into it.
I recommend defining separate queues for various priorities. The queues are entirely isolated, so a backup for slowdown in one queue won’t effect the others.
Quite right! The post has been updated with a few improvements to prevent those types of race conditions:
Put the final batch job check in a task with a slight delay
Make the job unique for a short period to prevent duplicate callbacks (not 100%, but good enough for most usage)
Include a check for an existing status callback within the final_batch_job? query
Recommend a unique constraint on args for the same batch_id and status if absolute uniqueness is required
Hi, thanks for Oban! I want to build a worker that removes expires sessions from the database.
I’ve created a worker which already works fine, but I’m not sure what’s the best place to start the initial job? Also, my job schedules the next run and if I stop the application with an already scheduled job and start my workers again, I end up with multiple workers.
I’ve tried the “unique” configuration, but it doesn’t allow me to schedule a new run when the worker is already running, how can I do this?
Here’s the worker with all irrelevant code removed:
defmodule App.Workers.ClearExpiredSessions do
use Oban.Worker, queue: "system", max_attempts: 10
# I've tried `unique: [fields: [:queue, :worker]]` here, but cannot schedule jobs then
@one_minute 60
# What's the best place to call this?
def start do
job = new(%{})
Oban.insert(job)
end
@impl Oban.Worker
def perform(_params, %{attempt: 1} = _job) do
delete_expired_sessions()
job = new(%{}, schedule_in: @one_minute)
Oban.insert(job)
end
def perform(_params, _job) do
delete_expired_sessions()
end
end
This sort of sounds like something better suited for Quantum. Quantum is built around executing things at regular intervals, where each execution should be run basically once. You don’t need a queue or bounded worker pool for that.
That’s my current implementation. I like Oban’s logging and retry functionalities, so I’m just playing around with it. You might be right that it’s not the best solution, but I wanted to know if it’s possible
I would have a “Setup / Start”-task in my supervision three that just query the database to see if there is a clear_session job or not and queue a new one if there is none.
But in this particular case I would say, keep it in a GenServer… much less complexity.
Regardless of whether you should do it this way, you should be able to do ti this way. In this case you need to use a unique argument, say the nearest minute as unix time. Something like this would round off the edges:
Until the next minute you’ll keep getting the same value, so uniqueness is enforced within the current minute.
I tend to agree with you on that. Quantum in particular is a large library.
Yes, I would do exactly that.
For something that is clearing expired sessions and doesn’t need to be coordinated to ensure it only runs on a single host/node, putting it in a GenServer is fine. If you are running multiple nodes and you only want to execute a job once within a window of time it is pretty sensible to use background jobs to do it.
No matter how many nodes you are running the job will only execute on a single node
If your node crashes, or you restart after a deploy the task will run again off-schedule. With unique scheduled jobs you can maintain uniqueness between nodes and restarts.
Oban v0.9.0 has been released with a few important bug fixes and a couple of minor feature additions. Definitely upgrade if you’ve ran into trouble in CI with multiple unscoped Oban.Migration calls.
Thanks to everybody that has been using the library and reporting issues!
[Oban] Add insert_all/2 and insert_all/4, corresponding to Ecto.Repo.insert_all/3 and Ecto.Multi.insert_all/5, respectively. @halostatue
[Oban.Job] Add to_map/1 for converting a changeset into a map suitable for database insertion. This is used by Oban.insert_all/2,4 internally and is exposed for convenience.
Changed
[Oban.Config] Remove the default queue value of [default: 10], which was overriden by Oban.start_link/1 anyhow.
[Oban.Telemetry] Allow the log level to be customized when attaching the default logger. The default level is :info, the same as it was before.
Fixed
[Oban.Migrations] Prevent invalid up and downtargets when attempting to run migrations that have already been ran. This was primarily an issue in CI, where the initial migration was unscoped and would migrate to the current version while a subsequent migration would attempt to migrate to a lower version. @jc00ke
[Oban.Job] Prevent a queue comparison with nil by retaining the default queue (default) when building uniqueness checks.
[Oban.Job] Set state to scheduled for jobs created with a scheduled_at timestamp. Previously the state was only set when schedule_in was used.
Oban v0.10.0 is published with a huge performance improvement, a change to the default logging behavior and a huge restructuring of notifications. The result is much faster (258,000 faster with extremely large queues, millions of unprocessed jobs) and more fault tolerant (able to run without a database connection and recover safely when the database comes back up).
This release does involve a migration and I highly recommend it.
Tables with a lot of available jobs (hundreds of thousands to several million) are prone to time outs when fetching new jobs. The planner fails to optimize using the index available on queue, state and scheduled_at, forcing both a slow sort pass and an expensive bitmap heap scan.
This migration drops the separate indexes in favor of a a single composite index. The resulting query is up to 258,757x faster on large tables while still usable for all of the other maintenance queries.
[Oban.Config] Change the default for verbose from true to false. Also, :verbose now accepts onlyfalse and standard logger levels. This change aims to prevent crashes due to conflicting levels when the repo’s log level is set to false.
Fixed
[Oban.Notifier] Restructure the notifier in order to to isolate producers from connection failures. Errors or loss of connectivity in the notification connection no longer kills the notifier and has no effect on the producers. @axelson
I took a look through the readme file on github and didn’t see this but is it possible to have scheduled tasks that get repeated on a set interval? For example, every Monday at 2pm run X job (or any cron-like interval variant)?
I’ve used a combination of Quantum and Oban for this. Quantum takes cron config, and if the action is super fast, I just do it. If it may fail or can take a while, I have Quantum simply enqueue a job.
Thanks. I am using Quantum too. I guess repeated scheduling is better off being kept as a third party dependency? Oban is so well put together that not supporting repeated scheduling seems like it’s probably by design?