EctoJob: A transactional job queue built with Ecto, PostgreSQL and GenStage

EctoJob

A transactional job queue built with Ecto, PostgreSQL and GenStage

Available on Hex.pm: https://hex.pm/packages/ecto_job
Docs: https://hexdocs.pm/ecto_job/api-reference.html
Github: https://github.com/mbuhot/ecto_job

Goals

  • Transactional job processing
  • Retries
  • Scheduled jobs
  • Multiple queues
  • Low latency concurrent processing
  • Avoid frequent database polling
  • Library of functions, not a full OTP application

Details

One of the distinguishing features of ecto_job is that the API encourages transactional job processing through Ecto.Multi. Job handlers are passed an Ecto.Multi parameter that must be passed to Repo.transaction to complete the job.

def perform(multi = %Ecto.Multi{}, job = %{}) do
  multi
  |> do_first_thing(job["customer_id"])
  |> do_second_thing(job["product_id"])
  |> MyApp.Repo.transaction()
end

Similarly, there are helpers to encourage transactional job creation by adding to an Ecto.Multi

Multi.new()
|> Multi.insert(:add_user, User.insert_changeset(%{name: "Joe", email: "joe@gmail.com"}))
|> JobQueue.enqueue(:email_job, %{"type" => "SendEmail", "address" => "joe@gmail.com", "body" => "Welcome!"})
|> MyApp.Repo.transaction()

Jobs are processed using a GenStage producer and ConsumerSupervisor that will execute jobs concurrently up to a configurable max_demand.

Postgrex.Notifications is used to trigger job processors as soon as the transaction that adds a job is committed.

Job workers can be run on a separate node, without requiring any beam clustering, just PostgreSQL listen/notify.

Job completion is also signalled using listen/notify enabling websocket or chunked HTTP responses (on another node) to be triggered once a job completes.

Similar libraries:

19 Likes

awesome - look forward to battle testing it.

Sounds like an interesting library! Did you design the library to work on Heroku? It seems like relying on listen/notify instead of clustering is partially to fit Heroku’s constraints.

Not quite. I noticed that most of the Exq jobs in our system were enqueued along with changes to the database, and workers updated the database also.

To guarantee that jobs are enqueued consistently and worked idempotently requires bookkeeping that all goes away if you let a db transaction take care of it :smile:

2 Likes

Many thanks for making this available to use, @mbuhot. I’m now using it in my first client Elixir project. :slight_smile:

2 Likes

Glad to hear it :smile:
Issues and PRs most welcome, I’m currently on a .net project at work so I only get my Elixir fix doing open source these days!

3 Likes

Interesting library! Another advantage to using something like this is a simpler infrastructure. No need to add another database like Redis.

2 Likes

Version 0.3.0 released, now with configurable reservation and execution timeouts.

https://hexdocs.pm/ecto_job/0.3.0/readme.html

3 Likes

EctoJob version 2.0.0 has been published to Hex.pm.

EctoJob 2.0 depends on Ecto 3.0, but is otherwise backwards compatible with existing code.

Other changes in this release:

  • Timestamp options on job tables can be customized.
  • Job tables can be declared with custom schema_prefix.
  • EctoJob will always use a :bigserial primary key instead of relying on the ecto default.

Hex Package
Docs
Source

9 Likes

Hi, thanks for great library, I am currently testing it inside my new project.

My only question is: is there a way or at least a plan to implement priorities in message job queue?
It’s important for SaaS app to make sure come clients are not waiting for someone else pushing loads of requests to server.

Thank you.

The closest thing right now would be to use separate queues for high priority and low priority jobs.

  children = [
    MyApp.Repo,
    {MyApp.JobQueue, repo: MyApp.Repo, max_demand: 100},
    {MyApp.ImportantJobQueue, repo: MyApp.Repo, max_demand: 100}
  ]

I’ve tried to keep the code fairly simple and well documented, so if you’d like to contribute support for priorities it would be welcome :+1:

1 Like

@mbuhot This is awesome. Thank you!

1 Like

Very cool. Thanks for sharing this project.

EctoJob version 2.1.0 has been released.

Version 2.1 adds support for requeing jobs, fixes to the job reservation algorithm and dialyzer warnings.

Changelog
Hex Package
Docs

Thank you to everyone that contributed to this version!

I’d also like to invite any users of EctoJob to please raise issues on the GitHub page for any features you’d like to see added or would be willing to work on (admin UI, Telemetry, Pre/Post job hooks, etc
)

6 Likes

Hi,

The library is very good and works fine for my delayed email sending functionality. But I am facing a problem, when I add the child worker in supervisor tree, like this

def start(_type, _args) do

    # List all child processes to be supervised

    children = [

      # Start the Ecto repository

      MyApp.Repo,

      # Start the Endpoint (http/https)

      MyAppWeb.Endpoint,


      {MyApp.JobQueue, repo: MyApp.Repo, max_demand: 100},

      {Task.Supervisor, [name: MyApp.TaskSupervisor]}

    ]

I am unable to run any tests. Getting the error like this
I am getting the error

** (DBConnection.OwnershipError) cannot find ownership process for #PID<0.847.0>.

When using ownership, you must manage connections in one
of the four ways:

  • By explicitly checking out a connection
  • By explicitly allowing a spawned process
  • By running the pool in shared mode
  • By using :caller option with allowed process

The first two options require every new process to explicitly
check a connection out or be allowed by calling checkout or
allow respectively.

The third option requires a {:shared, pid} mode to be set.
If using shared mode in tests, make sure your tests are not
async.

The fourth option requires [caller: pid] to be used when
checking out a connection from the pool. The caller process
should already be allowed on a connection.

If you are reading this error, it means you have not done one
of the steps above or that the owner process has crashed.

See Ecto.Adapters.SQL.Sandbox docs for more information.
(ecto_sql 3.3.3) lib/ecto/adapters/sql.ex:609: Ecto.Adapters.SQL.raise_sql_call_error/1
(ecto_sql 3.3.3) lib/ecto/adapters/sql.ex:545: Ecto.Adapters.SQL.execute/5
(ecto 3.3.2) lib/ecto/repo/queryable.ex:192: Ecto.Repo.Queryable.execute/4
(ecto_job 3.0.0) lib/ecto_job/producer.ex:184: EctoJob.Producer.dispatch_jobs/2
(gen_stage 0.14.3) lib/gen_stage.ex:2103: GenStage.noreply_callback/3
(stdlib 3.11.1) gen_server.erl:637: :gen_server.try_dispatch/4
(stdlib 3.11.1) gen_server.erl:711: :gen_server.handle_msg/6
(stdlib 3.11.1) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Last message: {:DOWN, #Reference<0.1630123645.3685482498.192412>, :process, #PID<0.847.0>, {%DBConnection.OwnershipError{message: “cannot find ownership process for #PID<0.847.0>.\n\nWhen using ownership, you must manage connections in one\nof the four ways:\n\n* By explicitly checking out a connection\n* By explicitly allowing a spawned process\n* By running the pool in shared mode\n* By using :caller option with allowed process\n\nThe first two options require every new process to explicitly\ncheck a connection out or be allowed by calling checkout or\nallow respectively.\n\nThe third option requires a {:shared, pid} mode to be set.\nIf using shared mode in tests, make sure your tests are not\nasync.\n\nThe fourth option requires [caller: pid] to be used when\nchecking out a connection from the pool. The caller process\nshould already be allowed on a connection.\n\nIf you are reading this error, it means you have not done one\nof the steps above or that the owner process has crashed.\n\nSee Ecto.Adapters.SQL.Sandbox docs for more information.”}, [{Ecto.Adapters.SQL, :raise_sql_call_error, 1, [file: ‘lib/ecto/adapters/sql.ex’, line: 609]}, {Ecto.Adapters.SQL, :execute, 5, [file: ‘lib/ecto/adapters/sql.ex’, line: 545]}, {Ecto.Repo.Queryable, :execute, 4, [file: ‘lib/ecto/repo/queryable.ex’, line: 192]}, {EctoJob.Producer, :dispatch_jobs, 2, [file: ‘lib/ecto_job/producer.ex’, line: 184]}, {GenStage, :noreply_callback, 3, [file: ‘lib/gen_stage.ex’, line: 2103]}, {:gen_server, :try_dispatch, 4, [file: ‘gen_server.erl’, line: 637]}, {:gen_server, :handle_msg, 6, [file: ‘gen_server.erl’, line: 711]}, {:proc_lib, :init_p_do_apply, 3, [file: ‘proc_lib.erl’, line: 249]}]}}

I have tried many ways to resolve this, but unable to succeed.

Please help me.

Hi @santoshbt

Take a look at this github issue: https://github.com/mbuhot/ecto_job/issues/36

Hi,

It was accidentally jobs table is deleted from my local database. I am unable to re-run the migration.
It always gives the error
execute “CREATE FUNCTION fn_notify_inserted()\n RETURNS trigger AS $$\nDECLARE\nBEGIN\n PERFORM pg_notify(TG_TABLE_NAME, ‘’);\n RETURN NEW;\nEND;\n$$ LANGUAGE plpgsql\n”
** (Postgrex.Error) ERROR 42723 (duplicate_function) function “fn_notify_inserted” already exists with same argument types.

Please let me know how to handle it.

@santoshbt use the postgresql drop function command to remove the function before running the migration.
the CASCADE option will also remove any triggers that depend on that function.

1 Like

EctoJob version 3.1 has been released :tada:

Changelog
Hex Package
Docs

Version 3.1.0 adds support for MySQL 8 and storing job params as an Elixir/Erlang term.
You can insert any arbitrary Elixir/Erlang term into the queue:

{"SendEmail", "jonas@gmail.com", "Welcome!"} 
|> MyApp.JobQueue.new() 
|> MyApp.Repo.insert()

You should use the option :params_type when defining your queue module:

defmodule MyJobQueue do 
  use EctoJob.JobQueue, table_name: "jobs", params_type: :binary # ... 
end

Possible values of the option are: :map (default) and :binary (for storing Elixir/Erlang terms).

You should use the same option when setting up the migration:

@ecto_job_version 3 
def up do 
  EctoJob.Migrations.Install.up() 
  EctoJob.Migrations.up("jobs", version: @ecto_job_version, params_type: :binary) 
end 

We are looking forward to releasing a new major version soon with support for completed job retention and job idempotency. Stay tuned :slight_smile:

5 Likes