In the documentation of Task there is this:
Compatibility with OTP behaviours
It is not recommended to await a long-running task inside an OTP behaviour such as GenServer. Instead, you should match on the message coming from a task inside your GenServer.handle_info/2 callback.
(Task — Elixir v1.16.0)
And I was wondering what the exact problem is with long-running tasks for a GenServer. Is the sole reason that the callbacks handle_call
,handle_cast
have timeouts that will fail because of the long running task?
Or are there other reasons too?
When you block the GenServer for a long time in a single callback, it can’t handle other messages it potentially could handle in the meantime. This also includes system messages for things like observer or the :sys
module.
1 Like
So let’s say I have GenServer that is a bit slow in processing messages. Like say 1 second per message.
When I send 100 messages to it, it will still take a very long while to process system messages that come later?
So I guess that’s a bad idea as well then?
The thing is, I have an application that consumes messages from RabbitMQ. They have a simple way of implementing back pressure just by how fast the consumer can consume messages (http://www.rabbitmq.com/erlang-client-user-guide.html). In combination with a prefetch count you say how many message you want buffered.
If I can conclude that I shouldn’t hold the queue for too long, then I either have to:
- Make sure that I can handle messages always fast enough.
- set a low prefetch count
- don’t handle the messages in a GenServer - or other OTP behaviour (but for example in a long running task that receives the messages?)
You should spawn a separate process for each message (you get concurrency and error isolation) and control the concurrency using the prefetch_count
. This can be achieved with Task
s and Task.Supervisor
or manually (see this example). Here’s some code that I’ve written that you can use as a starting point.
1 Like
I’m probably missing something but I’m not sure how you handle backpressure. Here: https://github.com/stefanchrobot/rabbitmq-sample/blob/master/lib/rabbitmq/client.ex#L83 you basically try to consume messages from the queue as fast as possible and send them to a task.
But let’s say you have 1M messages in your queue, and you start your code, you will start spawning tasks as fast as and if the tasks have something that takes a while, you’ll probably end up killing lot’s of them because you can’t handle the load anymore.
I want to provide backpressure so I only start as many tasks as fast as I can handle them.
I’m going to be consuming at most N messages at once where N is exactly equal to prefetch_count
. When the first message is acked/rejected, the broker will start sending more messages, but it will never go above prefetch_count
.
See https://www.rabbitmq.com/confirms.html
(EDIT: wrong link)
1 Like
Thanks, that was the information I was missing!