tl;dr Announcing Oban, an Ecto based job processing library with a focus on reliability and historical observability.
After spending nearly a year building Kiq, an Elixir port of Sidekiq with most of the bells and whistles, I came to the realization that the model was all wrong. Most of us don’t want to rely on Redis for production data, and Sidekiq is a largely proprietary legacy system. Not the best base for a reliable job processing system.
So, I took the best parts of Kiq and some inspiration from EctoJob and put together Oban. The primary goals are reliability , consistency and observability. It is fundamentally different from other background job processing tools because it retains job data for historic metrics and inspection.
Here are some of the marquee features that differentiate it from other job processors that are out there (pulled straight from the README):
Isolated Queues — Jobs are stored in a single table but are executed in distinct queues. Each queue runs in isolation, ensuring that a jobs in a single slow queue can’t back up other faster queues.
Queue Control — Queues can be paused, resumed and scaled independently at runtime.
Job Killing — Jobs can be killed in the middle of execution regardless of which node they are running on. This stops the job at once and flags it as discarded .
Triggered execution — Database triggers ensure that jobs are dispatched as soon as they are inserted into the database.
Scheduled Jobs — Jobs can be scheduled at any time in the future, down to the second.
Job Safety — When a process crashes or the BEAM is terminated executing jobs aren’t lost—they are quickly recovered by other running nodes or immediately when the node is restarted.
Historic Metrics — After a job is processed the row is not deleted. Instead, the job is retained in the database to provide metrics. This allows users to inspect historic jobs and to see aggregate data at the job, queue or argument level.
Node Metrics — Every queue broadcasts metrics during runtime. These are used to monitor queue health across nodes.
Queue Draining — Queue shutdown is delayed so that slow jobs can finish executing before shutdown.
Telemetry Integration — Job life-cycle events are emitted via Telemetry integration. This enables simple logging, error reporting and health checkups without plug-ins.
Version v0.2.0 was released today. Please take a look at the README or the docs and let me know what you think!
One more thing! A stand-alone dashboard built on Phoenix Live View is in the works.
The killer feature for any job processor is the UI. Every sizable app I know of relies on a web UI to introspect and manage jobs. It is very much a WIP, but here is a preview of the UI running in an environment with constant job generation:
One suggestion - maybe instead of storing data as a JSONB field instead use binary field type and ETF to store arguments. It will provide you greater flexibility (for example distinction between atom and binary).
That was my initial inclination as well, but I ended up using JSONB instead of a few reasons:
It makes it much easier to enqueue jobs in other languages. The primary system I work on uses Elixir, Python and Ruby on the backend. It is essential that jobs can be enqueued from outside of Elixir/Erlang.
Searching and filtering is an important part of the UI and historic observation. By storing arguments as JSONB we can actually leverage indexes for full text search. A common situation we have is trying to determine if a job was ran for a particular customer or with particular arguments.
I’m glad you went for JSONB rather than ETF for serialisation.
Rihanna uses ETF and it has repeatedly made gathering data and altering jobs a difficult process that involves loading every row into application memory when I could have run a single SQL statement.
Is the manner in which Oban polls the database documented?
How is failure detection implemented? If you’re using Ecto it suggests that your not using transactions to detect worker death?
I am the weird guy who is amazed by job queues and background processing and I am kinda hyped about this library. I am sad I have no active use case for a job queue at the moment.
First, I’m very excited to see this! We have a similar home rolled solution but this has all kinds of wonderful features I’d love to have. The argument for using jsonb is sound, people should keep in mind that if there are specific values you want to keep as erlang terms you can always term |> :erlang.term_to_binary |> Base.encode64 manually on that term and put it in the job JSON, OR have those values in a different table that the job json points to.
PG_NOTIFY FYI
I’m noticing you’re using pg_notify I want to make sure you’re aware of an important detail in how pg_notify behaves, because I only learned about it recently and found it quite unexpected. For every invocation of pg_notify within a transaction, pg_notify dedups the notice against existing notices that are set to go out when the transaction commits. This check is O(n^2)!!!. This can cause serious issues if for example you encode the entire job payload in the notice, because 1000 jobs will create 1000 unique notices, and suddenly what was a simple transactional insert of 1000 records has taken several seconds.
Based on a quick glance at the code though it looks like you’re limiting pg_notify usage to just push out {"queue":queue, "state":job_state} json blobs which should be fine, since there are a limited number of queues and a limited number of job states.
Only slightly, in the config documentation for Oban.start_link/1. For most job dispatching it relies on triggers/notifications, but because of scheduled jobs it also polls every one second by default. The poll interval is effectively the scheduled job resolution.
That’s correct, it doesn’t use transactions at all. I routinely run jobs that can last for 15-30 minutes (video encoding, creating zips, etc) and holding a transaction for that long would eat up the connection pool.
There are two modes of failure that it handles:
Standard catch/rescue — The catch/rescue bit is built on top of telemetry. It uses a notification handler to enqueues a retry or discard dead jobs.
Shutdown (Orphans) — Every job acquires an advisory lock when it starts executing. The advistory lock is used to indicate which jobs are actively executing and which really dead. When a node shuts down it takes its connection pool with hit, which cleans up lingering advisory locks. Each node periodically scans for orphaned jobs and marks them as available again for retried execution.
It is easy to tell how effective the orphan rescue is because jobs are kept around after they are complete. Any job that has multiple attempts and no errors must have been orphaned!
If a job takes a lock when it starts how is the lock established for retrying a failed job (not a node death)? Wouldn’t you need to release the lock beforehand?
That’s great to hear! Hope you can make use of it.
While working on the pg_notify payloads I discovered that the Base.encode64 part here is crucial. With some benchmarking I discovered that Jason encoding is just as fast as :erlang.term_to_binary + Base.encode64.
Wow. I knew that it did the deduping (discovered while trying to work around notifications only broadcasting after a transaction), but I didn’t know that it was O(n^2). Ouch.
That’s true, the notification from a trigger has a very limited payload. Originally it was just the queue name but I expanded it to aid in tracking runtime metrics.
The advisory lock is only used to indicate which jobs are actively executing. Row level locks are handled by FOR UPDATE SKIP LOCKED, so it isn’t an issue if the advisory lock is already taken when the job is retried. Note that the query uses pg_try_advisory_lock_shared to be account for duplicate locks.
It does attempt to release the lock when marking a job complete, retrying it or discarding it. That doesn’t always work though, due to connection ownership. From a comment in the query module:
Dropping a lock uses the same format as taking a lock. Only the session that originally took the lock can drop it, but we can’t guarantee that the connection used to complete a job was the same connection that fetched the job. Therefore, not all locks will be released immediately after a job is finished. That’s ok, as the locks are guaranteed to be released when the connection closes.
Cleanup is best effort, mainly to ensure we free locks and keep advisory locks below the limit.
What are you using to manage job locking at the database level? I ask because PG Advisory locks are extremely fast for this type of workload. There’s a Ruby queue call Que designed around it with some very compelling throughput numbers.
This looks like a really cool library! I really like the idea of retaining the job history. But I do have a question about that. Based on reading this it seems like the jobs table is essentially unbounded. I’m assuming that its just up to operations to remove old jobs or migrate those to a different table, do rollups, etc.? Or is there some other mitigation strategy you have for this?
That’s interesting, I’ve never seen tcn before. It could have worked for the first implementation of triggers/notifications because it wasn’t forced to be JSON. Oban uses four different notification channels to manage various things (insert trigger, update trigger, node/queue gossip and signaling), and they all use JSON for uniformity.
Considering what @benwilson512 posted regarding the pg_notify deduplication it seems like a good thing that the full contents of each row aren’t broadcast.
I’m using FOR UPDATE SKIP LOCKED for row level locking. It is a more recent addition to PG and doesn’t have the drawbacks that are mentioned in the Que documentation, i.e. there isn’t any lock contention. Oban also uses advisory locks, but as a guarantee for “at least once” job execution instead of a real lock mechanism.
As mentioned by @engineeringdept below there is historic job pruning. There are two modes: time based and limit based. As the system runs it clears out jobs beyond the configured time or number of rows. An alternative, that I shied away from due to operational overhead and limited flexibility, was to use partitioning.
For the UI project I’m adding rollups for added historic information and visualization, but that isn’t ready yet.
Seems on topic to me! I’m a big fan of HLL and would love to use it for the roll up table. My primary concern is portability and whether it will be available on most cloud platforms. For instance, to my knowledge it isn’t available on Heroku and they don’t provide any way to install outside extensions.
Other cloud database providers may be more flexible, I haven’t looked into many of them lately.