https://hexdocs.pm/plug_amqp/Plug.AMQP.html
We have a complex RPC system, which uses rabbitmq as transport. So we have implemented the pattern described in https://www.rabbitmq.com/tutorials/tutorial-six-elixir.html
We have also found that having to manage the publishing of the response in each service was a bit too much to keep track of, so we have chosen to use plug as the framework to transform requests to responses. Then the adapter would handle consuming requests, publishing responses and keeping track of all the fizzbuzz.
So now we can expose an RPC with standard plug modules (router, parsers, other middleware) and standard plug.conn API (resp(conn, 200, "ok")
).
The Conn struct
It is pretty much a standard plug conn, but some parts of it are kind of specific to amqp:
-
path_info
is set to the routing key split on dots. -
request_path
is set to the routing key with slashes instead of dots -
method
is POST by default. It can be overridden using thex-method-override
header. The common Plug.MethodOverride plug cannot be used because it needs to parse body first, and it won’t override to GET. - body is always available
- query_params are always empty
- chunks and streaming responses are not available
The request handler
The handler is run as a task, monitored by the main process. It is responsible of creating the Conn struct and doing telemetry.
The connection
The main process (that receives basic_deliver
messages from rabbit) handles reconnections and outages, and to each message starts a task.
It stores a map task_ref => metadata
so when a task asks for the response to be published back, it can retrieve the reply_to
metadata item, and use it as routing key for the response.
Backpressure is handled by standard rabbit QoS, you can define the maximum number of unhandled messages per conection or per queue, and rabbit will enforce it for you.