Plug_amqp: a plug adapter for amqp

We have a complex RPC system, which uses rabbitmq as transport. So we have implemented the pattern described in

We have also found that having to manage the publishing of the response in each service was a bit too much to keep track of, so we have chosen to use plug as the framework to transform requests to responses. Then the adapter would handle consuming requests, publishing responses and keeping track of all the fizzbuzz.

So now we can expose an RPC with standard plug modules (router, parsers, other middleware) and standard plug.conn API (resp(conn, 200, "ok")).

The Conn struct

It is pretty much a standard plug conn, but some parts of it are kind of specific to amqp:

  • path_info is set to the routing key split on dots.
  • request_path is set to the routing key with slashes instead of dots
  • method is POST by default. It can be overridden using the x-method-override header. The common Plug.MethodOverride plug cannot be used because it needs to parse body first, and it won’t override to GET.
  • body is always available
  • query_params are always empty
  • chunks and streaming responses are not available

The request handler

The handler is run as a task, monitored by the main process. It is responsible of creating the Conn struct and doing telemetry.

The connection

The main process (that receives basic_deliver messages from rabbit) handles reconnections and outages, and to each message starts a task.

It stores a map task_ref => metadata so when a task asks for the response to be published back, it can retrieve the reply_to metadata item, and use it as routing key for the response.

Backpressure is handled by standard rabbit QoS, you can define the maximum number of unhandled messages per conection or per queue, and rabbit will enforce it for you.