Pooling and queue libraries in Elixir

Hi, I’ll be definitely needing a pooling in my next small project in work, because I don’t want to kill Elasticsearch cluster. And I was wondering what are the libraries out there to use? One of obvious choices is to simply use Poolboy, and from what I see it will fit my use case (cause it’s not that complicated). Should I even consider other libraries, and if which ones?

The other one is queues, I’ll need them too. The simplest queue I can think of is just process message-box, but I’ll probably want to persist those queues because I’ll probably would not like to loose them. And now the question is, what to use? I don’t want to use Redis or any other technology like this (because If I would need Redis, I can just install it and write logic in Python which would be faster given what I have already). Any one can suggest a good, minimal library with persistence layer? Bonus points for a lib that can synchronize it’s queues over at least two nodes (redundancy in case of a hardware failure, network problem etc.)

If you can even point me somewhere with some link I’d be grateful, because when I see that some libs where not updated in like 2 years, I’m simply not sure if they’re simply stable or just abandoned :wink:

for the simple queue there is backy https://github.com/kuon/backy which persist to a postgres table… not sure it does distributed, so maybe no bonus points… the readme has links to other queue libs, but they all seem to use redis

fyi I’ve only used it for small stuff and not in production.

I don’t really need Postgres either, but I’ll look at that too, thank you.

In the meantime I found https://github.com/sheharyarn/que any thoughts?

Pooling

Elasticsearch communication occurs over HTTP. Hackney is, for better or worse, the most supported HTTP client for Elixir right now. It comes with pooling built-in. By using a specific pool for your Elasticsearch HTTP requests, your connections will automatically be reused until they are closed.

Asynchronous Worker Queue

For an asynchronous worker queue, I would recommend using a library like Verk. Verk is based on Sidekiq and stores its queues in Redis. This makes it tolerant of restarts and failures in your own application.

As I wrote, if I would have to use Redis for the medium of the queue, I can just skip Elixir altogether. I don’t need a Sidekiq/Resque clone, because then I can simply use Sidekiq. I want or rather need this app to be self contained, without any third party software as dependency. And if the data it will be keeping can be synchronized across nodes out of the box then it’s even better.

The queries for Elasticsearch are rather long, it takes 10s for more for Elasticsearch cluster to respond. And each job will need to make many queries to elastic so I need some pooling library so I can keep sane number of jobs. So yes, I I’ll use hackney I will use it’s connections pool to reuse connections, but I also need a poll of workers for jobs that do a little bit more than just querying Elasticsearch.

The problem is that Elasticsearch alone is not fast enough. I need to pre-cache complicated queries, do a map-reduce on it and then some :wink:

For a simple queue take a look at http://erlang.org/doc/man/queue.html you can persist it into anything with http://erlang.org/doc/man/erlang.html#term_to_binary-1

I’m not aware of an embedded solution for a distributed queue. I’d likely try RabbitMQ in that scenario.

However, if you want a local persistent queue with absolutely no external dependencies, then the solution would not be overly complex. I don’t know if there are libraries which do this out of the box, but rolling your own simple solution would IMO not be too complicated, and is at least a nice exercise of a bit of OTP :slight_smile:

The idea is to have an in-memory queue, and persist the queue state to disk every time an item is added, and every time the job succeeds.

Persisting without an external component can be done in a couple of ways:

In a simplest solution, you’d have one process, a GenServer, which would act as a scheduler. The scheduler reads from the queue file, and starts the first job in a separate process. If the job succeeds, the scheduler removes it, persists the new queue state, and starts the next job, if there is one. The scheduler should also monitor the job process (or link to it), to detect if it crashed. Then you need to handle the crash by retrying, deferring the job, giving up.

Every time you want to add another item, you send a request to the scheduler, which adds it to the end of the in memory structure (for example :queue), stores to disk, and starts the job if nothing is running.

I’d suggest to try rolling a simple implementation, perhaps starting with a plain in-memory queue, and then adding naive persistence with File.write(:erlang.term_to_binary(queue)).

If you have more elaborate needs, such as pooled or rate-limited queues, you could take a look at jobs. I’m actually using it myself for these particular cases, and I’m quite happy with it.

5 Likes

The author of Sidekiq has a new project for async work queues called Faktory which is language agnostic and addresses some of the main design short comings of Sidekiq.

There is a fully featured Elixir client for it that is actively being developed:
https://hexdocs.pm/faktory_worker_ex

:v:

2 Likes

Hi Sasa,

I would love to use this lib jobs from within elixir but I have no clue how to go about it after I have installed it. Is there an elixir wrapper for it kindly?

I wrote Stagger as a simple persistent queue that acts as a GenStage producer. It’s usage is targeted more at capturing data when backend processing might not be available but can be resumed later.

I don’t know of any. But you don’t need a wrapper, you can use jobs directly. The examples in Erlang are here, and the comparison of syntax differences can be found here. So e.g. if we take this example:

jobs:add_queue(q, [{standard_rate,1}]).
...
jobs:run(q, fun() -> io:fwrite("job: ~p~n", [time()]) end).

It could be roughly translated as e.g.:

:jobs.add_queue(:q, [standard_rate: 1])
...
:jobs.run(:q, fn -> IO.inspect(DateTime.utc_now()) end)

More generally, to use jobs, you need to first create a named queue with :jobs.add_queue(queue_name, opts), where queue_name is an atom. Typically this can be done in the application start callback.

Then you can run a queued operation from any process as :jobs.run(queue_name, fn -> ... end).

Take a look ad add_queue docs, in especially the “regulators” section. To create a rate-limited queue, you can pass rate: [...]. To create a counter queue (limit max simultaneous jobs), you can pass counter: [...].

2 Likes

Thank you so much for this. With this explanation and references I actually don’t need a wrapper.