Handling errors in RabbitMQ consumer

I’m trying to set up a RabbitMQ consumer using the amqp library. I want to process multiple messages concurrently and properly handle errors (properly reject message if a processing error occurs). How would I go about doing this?

The “Setup a consumer GenServer” example defines a single GenServer that spawns a new process for each incoming message which paired with prefetch_count > 1 allows for concurrent processing of the messages.

How do I make sure that those processes are cleaned up properly? Is spawn_link enough to shut them down when the parent GenServer is shutting down?
Should those processes live under a supervision tree?
How would I implement a timeout for processing of a message (kill it after 5seconds)?
The example says “You might also want to catch :exit signal in production code.” Why? When would that happen? How do I do that?
Are these consumer processes a use case for Tasks or DynamicSupervisor?
Or should this be set up totally differently, i.e. have a Supervisor with N workers where each one is a consumer on a shared connection?

What I’ve done is the following. I’ve created a wrapper for a Connection and for a Channel.

The wrapper for the Connection starts a new connection in the init and traps exits Process.flag(:trap_exit,t rue) and it links the pid of the connection Process.link(connection.pid)`

So something like this:

{:ok, conn} = AMQP.Connection.open(...)

Process.flag(:trap_exit, true)
Process.link(conn.pid)

As a last thing, I’ll implement the terminate callback and try to close the Connection/Channel if it still lives.

For Channel I do the same thing. In my code I put my Connection and Channel wrappers in a supervision tree so that they get killed if needed.

I’m not sure that this is the correct way, but it works to have no zombie channels when something goes wrong in my code.
I was thinking about asking this in the amqp repository, why the Connection and Channel are no otp behaviours, because if they were, the wrappers wouldn’t be necessary.

See this topic as well: Processing multiple messages in parallel with AMQP

You basically setup one connection and multiple channels.
For error handling and timeouts. In the handle_info that receives the message I start a task and await it with a sensible timeout. If it’s to slow, or something fails you can do your error handling.

You might find my library useful for this. It handles all the process and error stuff for you.

These slides give you idea of the code you would write.

Even if you don’t use it, you can see how those types of things are handled here.

EDIT: If you do use it and discover something that’s not well documented. Please file a ticket or a PR for it.

2 Likes