EctoJob: A transactional job queue built with Ecto, PostgreSQL and GenStage

ectojob
job-queue
#1

EctoJob

A transactional job queue built with Ecto, PostgreSQL and GenStage

Available on Hex.pm: https://hex.pm/packages/ecto_job
Docs: https://hexdocs.pm/ecto_job/api-reference.html
Github: https://github.com/mbuhot/ecto_job

Goals

  • Transactional job processing
  • Retries
  • Scheduled jobs
  • Multiple queues
  • Low latency concurrent processing
  • Avoid frequent database polling
  • Library of functions, not a full OTP application

Details

One of the distinguishing features of ecto_job is that the API encourages transactional job processing through Ecto.Multi. Job handlers are passed an Ecto.Multi parameter that must be passed to Repo.transaction to complete the job.

def perform(multi = %Ecto.Multi{}, job = %{}) do
  multi
  |> do_first_thing(job["customer_id"])
  |> do_second_thing(job["product_id"])
  |> MyApp.Repo.transaction()
end

Similarly, there are helpers to encourage transactional job creation by adding to an Ecto.Multi

Multi.new()
|> Multi.insert(:add_user, User.insert_changeset(%{name: "Joe", email: "joe@gmail.com"}))
|> JobQueue.enqueue(:email_job, %{"type" => "SendEmail", "address" => "joe@gmail.com", "body" => "Welcome!"})
|> MyApp.Repo.transaction()

Jobs are processed using a GenStage producer and ConsumerSupervisor that will execute jobs concurrently up to a configurable max_demand.

Postgrex.Notifications is used to trigger job processors as soon as the transaction that adds a job is committed.

Job workers can be run on a separate node, without requiring any beam clustering, just PostgreSQL listen/notify.

Job completion is also signalled using listen/notify enabling websocket or chunked HTTP responses (on another node) to be triggered once a job completes.

Similar libraries:

15 Likes

How to do a job in user defined date and time (like schedule a job)
How to best handle tasks when pulling from a job queue
#2

awesome - look forward to battle testing it.

0 Likes

#3

Sounds like an interesting library! Did you design the library to work on Heroku? It seems like relying on listen/notify instead of clustering is partially to fit Heroku’s constraints.

0 Likes

#4

Not quite. I noticed that most of the Exq jobs in our system were enqueued along with changes to the database, and workers updated the database also.

To guarantee that jobs are enqueued consistently and worked idempotently requires bookkeeping that all goes away if you let a db transaction take care of it :smile:

2 Likes

#5

Many thanks for making this available to use, @mbuhot. I’m now using it in my first client Elixir project. :slight_smile:

2 Likes

#6

Glad to hear it :smile:
Issues and PRs most welcome, I’m currently on a .net project at work so I only get my Elixir fix doing open source these days!

3 Likes

#7

Interesting library! Another advantage to using something like this is a simpler infrastructure. No need to add another database like Redis.

2 Likes

#8

Version 0.3.0 released, now with configurable reservation and execution timeouts.

https://hexdocs.pm/ecto_job/0.3.0/readme.html

3 Likes

#9

EctoJob version 2.0.0 has been published to Hex.pm.

EctoJob 2.0 depends on Ecto 3.0, but is otherwise backwards compatible with existing code.

Other changes in this release:

  • Timestamp options on job tables can be customized.
  • Job tables can be declared with custom schema_prefix.
  • EctoJob will always use a :bigserial primary key instead of relying on the ecto default.

Hex Package
Docs
Source

8 Likes

#10

Hi, thanks for great library, I am currently testing it inside my new project.

My only question is: is there a way or at least a plan to implement priorities in message job queue?
It’s important for SaaS app to make sure come clients are not waiting for someone else pushing loads of requests to server.

Thank you.

0 Likes

#11

The closest thing right now would be to use separate queues for high priority and low priority jobs.

  children = [
    MyApp.Repo,
    {MyApp.JobQueue, repo: MyApp.Repo, max_demand: 100},
    {MyApp.ImportantJobQueue, repo: MyApp.Repo, max_demand: 100}
  ]

I’ve tried to keep the code fairly simple and well documented, so if you’d like to contribute support for priorities it would be welcome :+1:

1 Like

#12

@mbuhot This is awesome. Thank you!

1 Like

#13

Very cool. Thanks for sharing this project.

0 Likes