How to best handle tasks when pulling from a job queue

What is the reason you are building a postgres-backed queue?

Because:

  1. In systems in which it is not a problem that tasks disappear when the system does a full-system restart, you don’t need any persistence.
  2. If you do need persistent tasks, you could use (D)ETS or Mnesia to implement this, without needing the external Postgres dependency.

I believe there already are quite a few packages doing either of these:

  • transient background jobs: the built-in Task module or ex_job,
  • transient scheduled tasks: quantum,
  • persistent backround tasks using mnesia: que,
  • persistent background tasks using redis: toniq, exq,
  • persistent background tasks using RabbitMQ: task_bunny, roger
  • persistent background tasks using Postgres: backy (note: still in early development)
4 Likes

Our requirements are:

  • Persists jobs across VM restarts or crashes
  • Transactional and durable (i.e. backed by a SQL database)
  • Allows nodes to be stateless (i.e. no mnesia)
  • Does not need any additional services (i.e. no Redis, RabbitMQ etc)
  • Does not require Ecto
  • Is highly performant and well maintained
  • Does not require arguments to be serializable to JSON

I have been looking at existing solutions for some time, and none of them suit our needs, so I opted to build one.

2 Likes

All right, then let me give you a slightly more in-depth answer to your original question :slight_smile: .

Both approaches (working with a worker pool vs a new task for each job) are valid: The main idea behind a worker pool is that frequently you don’t want too many background jobs running at the same time, because some OS resources (like the outgoing internet connection used to fetch data from elsewhere, or like the email handler) can be easily flooded if not rate-limited in some way.

You should not care about (garbage collection) efficiency at this time, because this would be a premature optimization (and the BEAM handles its garbage quite well). I believe using one or more worker pools is the cleaner solution.

As for your requirements:

I gather you already are using an SQL database for something else in your application, so it is not considered ‘additional’?

I’d like to note that Mnesia is able to write all changes to disk just like an SQL database does, and you can obviously have one (or multiple) nodes that just run Mnesia just like you’d run the SQL database in a separate location, besides replicating its data across all nodes. Obviously Mnesia is going to be faster because it is in-memory and lives right next to your application, using the same datatypes that the application uses.

Could you explain the reasoning behind this one? Ecto is not that heavyweight of a dependency, and it abstracts the database-specific details away from you, allowing you to swith between a whole slew of DBs.

Either use something that can handle erlang-terms natively like Mnesia, or use Erlang’s external term format to convert to/from a binary format that you could store on disk (or in a binary column in a database).

1 Like

Genstage would be ideal for this use case, You’d have a producer hooked up via a postgres pubsub mechanism which knows how many jobs are available to be processed at any given time (no polling). And a bunch of consumers which would try to get 1 job at a time and process it.

1 Like

This is how I usually end up doing it. Based on your description, I’d have a GenServer dispatcher process, which is responsible for the lifecycle of jobs. I’d implement polling in a separate process (possibly also a Task). That way, a crash when reading from the database would not affect the dispatcher, and therefore would not cause currently running jobs to fail.

Instead of having a separate Task supervisor, I tend to use the dispatcher as the parent of the job processes. I setup the dispatcher to trap exits, and start each new task with Task.async. With such setup, the dispatcher process receives all the necessary events (task result, task failure) in form of messages.

Using a separate Task.Supervisor should also work. I usually don’t do that, since the supervisor wouldn’t really lift a lot of responsibilities from the dispatcher. The dispatcher would have to setup a monitor to each task, and respond to :DOWN messages, and keep monitors and running tasks in its state. So it doesn’t seem that a dedicated supervisor would bring anything useful to the table.

The supervision subtree could look something like:

                Queue
        (rest_for_one supervisor)
                  /\
                 /  \
Queue.Dispatcher      Queue.DbPoller
  (GenServer)       (Task or GenServer)
      /\
     /  \
Task1 ... TaskN
4 Likes

take a look at

All though it’s called ectojob - I don’t see that ecto should be a requirement for writing the job to the DB or that anything prevents you from wrapping it in an ecto-less api…
(although jobs are often tied to DB insert/update and the ecto multi job hook looks pretty darn awesome)

your “no ecto” requirement is the only one that sticks out to me… what is the reason for “no ecto” ?

1 Like

Sounds costly, why not use a PostgreSQL Stream and get results streamed back to you in real-time whenever a table gets a new entry?

You could then just toss it into GenStage/Flow and let it all handle the concurrency as well. :slight_smile:

For note, the PostgreSQL Library that Ecto uses (and thus you’d already have it if you use Ecto) has the Stream calls wrapped up in a nice API for you to use. :slight_smile:
/me hopes Ecto gets such interfaces too so we get the auto-conversions and such

Yeah Ecto is quite lightweight and does a lot of caching for you and so forth, really useful to have.

Yep! This is it!

4 Likes

Only other thing I’d suggest looking into would be how you plan to lock jobs. There’s several options from using a lock column to advisory locks and a couple of others that can have a significant performance impact.

Correct. And I would guess that a large majority of Elixir deployments in the wild are also backed by a Postgres DB.

Sure. But mnesia requires your nodes to have state, and it is a PITA to manage (I have used it in production for a while).

It’s not that heavyweight, but it doesn’t make sense for this application for several reasons:

  1. Not everybody likes or wants to use it, they shouldn’t be forced to bring it in for a dependency like this
  2. This queue only works with Postgres anyway so the database abstraction layer is moot
  3. Using the Postgres driver directly gives you better control over how you manage connections

Sure, I looked into Postgres pubsub and it seems interesting, but adds a little extra complexity. It’s not clear-cut whether it would be more performant than a poll-based approach. My current polling implementation is simple and fast enough, once that is stable in production I’ll look into pubsub.

That supervision tree looks interesting… but why separate the Dispatcher and the Poller? Couldn’t you have the dispatcher set a timer to poll itself and fetch/dispatch the jobs synchronously itself inside the handle_info function?

This library aims to solve a slightly different problem than ecto_job. Note that ecto_job uses SELECT FOR UPDATE and holds transactions open for the duration of the job, this limits your concurrency to one job per connection. For some use-cases this is fine, but I want this to be more of a general purpose job queue.

Polling is surprisingly very performant. The problem with streaming events is lock contention since every Dispatcher will try to get a lock at the same time. While I think you could make this work with advisory locks which are very lightweight, you’d always need a polling system as a failsafe so I’d prefer to get that really rock solid first then consider Postgres pubsub as a potential optimisation.

In addition, you don’t need GenStage at all with a polling system since it’s purely pull-based - even simpler.

I am using a combination of advisory locking and SELECT FOR UPDATE SKIP LOCKED.

Initial benchmarks show a throughput of about 1.5k jobs per second on my local machine which is plenty fast enough for our needs.

3 Likes

This separation allows the dispatcher to do its work without any significant delays or interrupts if database polling becomes extremely slow and/or it crashes.

1 Like

So is the poller sending the jobs to the dispatcher? How does it know if the dispatcher has capacity to accept any new jobs?

Yes.

There are various ways to handle this

The approach I used recently for a low job rate was to always start a new task (so no limit), and then from the task process use jobs to ensure that most N of them are actually performing the work. That solution is in my opinion quite simple, though it’s probably not optimal for a large incoming rate with a possible periods of large backlog, because you might end up with a very large number of processes, which might end up consuming too much memory.

Another approach is to have the dispatcher ask the poller for more work when it’s ready. I’d have the poller preload at least one batch upfront, so it can immediately send it when the dispatcher asks for more work. This would keep the memory usage constant (you’d have at most one extra batch loaded), while giving you the same properties as I’ve mentioned in my previous post (separation of latencies and failures). You should be able to do this with GenStage (I think - I never used it so far), or you could hand-code it with GenServer.

The first approach is IMO fairly lightweight, though it doesn’t necessarily work well with larger bursts. The GenStage approach is more resilient with respect to overloads, and should keep your memory usage stable, but it might be a bit more complex.

1 Like

Not quite. Ecto_job uses select for update skip locked only for a worker to claim a batch of jobs.

The work of executing the job done using optimistic concurrency - the transaction is only open for the final call to Repo.transaction Where updates are submitted. All jobs in the batch execute concurrently.

3 Likes

I’ve just released Rihanna as an initial implementation of the concepts discussed here. It uses a very simple single dispatcher with a poll-based approach, which nonetheless has a pretty high throughput of around 1.5k jobs/s due to it’s use of advisory locks.

The supervision subtree looks like this:

          Supervisor
    (one_for_one supervisor)
              |
              |
JobDispatcher (polls and fires off jobs)    
          (GenServer)       
              /\
             /  \
        Task1 ... TaskN

It also has a GUI that can be used to see work in progress and retry failed jobs:

rihanna_ui_screenshot

This first version is well-tested and stable at high concurrency levels and should be good enough for most use-cases of a simple, durable, database-backed distributed queueing system.

I believe that significantly higher performance with less load on the DB is possible by taking advantage of pg_notify and communication between nodes of an Erlang cluster to avoid needless contention on the DB, but this is not ready for release yet.

Thoughts/feedback/questions/bug reports are welcome!

8 Likes

Congrats! Isn’t Elixir a wonderful language for implementing concurrent job queues.

I didn’t see a LICENSE file in the repo, is it MIT?

3 Likes

Yup, I just added it. Both Rihanna and Rihanna UI are released under MIT license.

1 Like

If something goes wrong in pulling and the genserver crashes you also lose all currently being executed jobs correct?

Jobs are spawned underneath a Task.Supervisor so they will run to completion regardless of the state of the Dispatcher that spawned them. Since the Dispatcher handles marking jobs as failed/completed, these jobs will never register their status with the database so the lock will be lost and the respawned dispatcher will try them over again.

The result will be that some jobs are run twice.

2 Likes

Ah I see. admittedly I did not look at the code because I was on my phone and your diagram above looks like the job tasks are linked to the JobDispatcher. Pretty cool though I will take a look later!

1 Like

Wonderful! :smiley:

:+1: A great way to handle this edge case. Writing jobs to ensure that they will result in a no-op when ran again if required is usually not hard, and a great solution.

1 Like