EctoJob: A transactional job queue built with Ecto, PostgreSQL and GenStage

ectojob
job-queue

#1

EctoJob

A transactional job queue built with Ecto, PostgreSQL and GenStage

Available on Hex.pm: https://hex.pm/packages/ecto_job
Docs: https://hexdocs.pm/ecto_job/api-reference.html
Github: https://github.com/mbuhot/ecto_job

Goals

  • Transactional job processing
  • Retries
  • Scheduled jobs
  • Multiple queues
  • Low latency concurrent processing
  • Avoid frequent database polling
  • Library of functions, not a full OTP application

Details

One of the distinguishing features of ecto_job is that the API encourages transactional job processing through Ecto.Multi. Job handlers are passed an Ecto.Multi parameter that must be passed to Repo.transaction to complete the job.

def perform(multi = %Ecto.Multi{}, job = %{}) do
  multi
  |> do_first_thing(job["customer_id"])
  |> do_second_thing(job["product_id"])
  |> MyApp.Repo.transaction()
end

Similarly, there are helpers to encourage transactional job creation by adding to an Ecto.Multi

Multi.new()
|> Multi.insert(:add_user, User.insert_changeset(%{name: "Joe", email: "joe@gmail.com"}))
|> JobQueue.enqueue(:email_job, %{"type" => "SendEmail", "address" => "joe@gmail.com", "body" => "Welcome!"})
|> MyApp.Repo.transaction()

Jobs are processed using a GenStage producer and ConsumerSupervisor that will execute jobs concurrently up to a configurable max_demand.

Postgrex.Notifications is used to trigger job processors as soon as the transaction that adds a job is committed.

Job workers can be run on a separate node, without requiring any beam clustering, just PostgreSQL listen/notify.

Job completion is also signalled using listen/notify enabling websocket or chunked HTTP responses (on another node) to be triggered once a job completes.

Similar libraries:


How to best handle tasks when pulling from a job queue
How to do a job in user defined date and time (like schedule a job)
#2

awesome - look forward to battle testing it.


#3

Sounds like an interesting library! Did you design the library to work on Heroku? It seems like relying on listen/notify instead of clustering is partially to fit Heroku’s constraints.


#4

Not quite. I noticed that most of the Exq jobs in our system were enqueued along with changes to the database, and workers updated the database also.

To guarantee that jobs are enqueued consistently and worked idempotently requires bookkeeping that all goes away if you let a db transaction take care of it :smile:


#5

Many thanks for making this available to use, @mbuhot. I’m now using it in my first client Elixir project. :slight_smile:


#6

Glad to hear it :smile:
Issues and PRs most welcome, I’m currently on a .net project at work so I only get my Elixir fix doing open source these days!


#7

Interesting library! Another advantage to using something like this is a simpler infrastructure. No need to add another database like Redis.


#8

Version 0.3.0 released, now with configurable reservation and execution timeouts.

https://hexdocs.pm/ecto_job/0.3.0/readme.html


#9

EctoJob version 2.0.0 has been published to Hex.pm.

EctoJob 2.0 depends on Ecto 3.0, but is otherwise backwards compatible with existing code.

Other changes in this release:

  • Timestamp options on job tables can be customized.
  • Job tables can be declared with custom schema_prefix.
  • EctoJob will always use a :bigserial primary key instead of relying on the ecto default.

Hex Package
Docs
Source