What is proper way to handle messages when connection is not ready?

I’m working on an app which receives messages via HTTP and sends it to RabbitMQ. So, I have GenServer which handles connection and publishing to a rabbit.

I’m curious about a proper approach to implement buffer/message queue to handle messages while
connection to rabbit is not ready or in a reconnection loop.

To implement rabbit handler I use https://github.com/fishcakez/connection behavior and amqp:

defmodule Rabbit do
  use Connection
  ...
end

One of the ways to handle messages while a connection is not ready is to use Erlang queue module inside Rabbit module, like this:

def handle_call({:publish, message}, _from, %{conn: nil, queue: queue} = state) do
  new_queue = :queue.in(message, queue)
  {:reply, :ok, %{state | queue: new_queue}}
end

def connect(_, state) do
  with conn <- make_connection() do
    state = %{state | conn: conn}
    dequeue(state)
  else
    # error handling
  end
end

def dequeue(state) do
  case :queue.out(state.queue) do
    {{:value, message}, new_queue} ->
      publish_message(state.conn, message)
      dequeue(%{state | queue: new_queue})
    {:empty, _} -> state
  end
end

So, is it a good approach? Or should I use dedicated GenServer for queue? Or maybe something else.

2 Likes

Probably a bad idea, but you can use the process’s mailbox for queuing incoming massages. Just don’t handle_* them and wait for some condition (connection’s readiness in your case). I usually do it this way, but one day it will surely bite me.

1 Like

This will have the disadvantage of blocking the caller until a connection is established. However, if it’s needed, you can use this to your advantage by implementing a form of back-pressure—for the first X calls, handle with the internal queue, but if that gets too large, refuse to handle the calls (:noreply) and only reply to them once their request is processed so that they cannot submit more requests until then. Watch out for GenServer.call timeouts there though.

3 Likes

:queue is fine for things like this, and generally how I’d implement it. For anything more sophisticated I’d take a look at GenStage.

2 Likes

…you can use the process’s mailbox for queuing incoming massages. Just don’t handle_* them and wait for some condition (connection’s readiness in your case)…

This is the interesting idea, but as I know mailbox would work with receive block, but when GenServer used it throws an error and terminated.

1 Like

for the first X calls, handle with the internal queue, but if that gets too large, refuse to handle the calls (:noreply) and only reply to them once their request is processed so that they cannot submit more requests until then

In my case, no needed to reply to a caller, because it’s notifications, that’s why I want to receive and handle messages even no connection to RabbitMQ.

Probably it’s a good idea to implement back pressure, and discard messages when a queue is large.

1 Like

In that case, you could also use casts instead of calls.

2 Likes

Let the sender handle retries or drop the message completely.

Or if you really want to keep them centralised and sent them in a huge pile, just put them in a list and reverse before sending…

An Okasaki queue wouldn’t do much more than that, since you have continous writes at one end followed by continous reads from the other end. If though reads and writes on different ends intersperse a lot, then Okasaki-queues make sense.

1 Like

You don’t need to use receive to block a genserver.

1 Like

I mean that in such situation GenServer just throws error

1 Like

I have a few suggestions based on what you’ve said so far, and I hope they help. Reading through your description, you are first sending all messages to a GenServer, Rabbit, which is responsible for maintaining a connection to a RabbitMQ server and acting as a message buffer for the upstream service. You may have done this intentionally, but this raises two concerns for me:

  • Your message buffer is directly associated with the health of the connection. If the connection encounters an error, you lose your message buffer.
  • You’ve serialized the message dispatch to Rabbit; this creates an artificial bottleneck in your code that you don’t need unless you must send the messages in the order they were received by the GenServer (which is unlikely given that it would be difficult to guarantee their order in the first place)

To put this more bluntly, Rabbit has two responsibilities in its state: the connection and the message queue.

The Connection behaviour is specifically designed to help manage a connection to a remote service, and the documentation is very useful in understanding how to start a connection when the process is started (by returning {:connect, info, state} inside of init/1). This is a good use of a GenServer process because the connection and connection state are held in the GenServer state. The GenServer can then give informative responses to callers.

However, I don’t see any reason in your description to have a buffer. I would remove the buffer part completely and have the processes handling the HTTP processes responsible for calling Rabbit. Because Rabbit uses Connection, even when the connection to the Rabbit server isn’t available, it can handle messages from callers and inform them, with a response like {:reply, :noconnect, state}. The caller can then make a decision about whether to retry or inform the downstream client that there was an error.

The model I described above has a few benefits:

  • There is no longer any message queue to deal with, so you aren’t dealing with the potential to “lose everything” that was in a GenServer state because of minor errors
  • The success state is pushed farther downstream, towards the client, and the client is now in agreement about the state of the message. This empowers the end user to make a decision to try again or not, and it also avoids a situation where the end user was informed something succeeded only for it to fail silently somewhere else.

For the model I described above, in the supervision tree, Rabbit should be at the far left, and the HTTP processes should be to the right of Rabbit (possibly as children of an HTTP process supervisor). This ensures Rabbit is started and allowed to connect before HTTP requests are handled. Also, in the case that Rabbit suffers irrecoverable issues and isn’t able to maintain a connection, this will force the HTTP processes to restart.

The model above still suffers an inherent flaw, though: the GenServer will only processes messages serially, so all HTTP connections will have to share the single Rabbit connection. If your HTTP requests pile up faster than your connection can handle them, the calls will timeout. As your service grows, you may instead require a pool of Rabbit connections, in which case the RabbitPool would be at the far left in the same manner as described above, but the HTTP processes must now check out a Rabbit connection from the pool before performing operations. This allows you to service multiple HTTP requests concurrently.

Overall, I think that would move you towards a more robust service and avoid headaches about losing state. I may have missed something in your description that makes what I described above inadequate, though. If that’s the case, let me know what it is, can I can try and provide a better answer.

5 Likes

Thanks for the detailed answer.

I create gist with the code of this rabbit module to easily observe my idea:

I totally agree about concern separation.
But in this situation, I need to know when a connection is ready, so I can start dequeue messages from buffer immediately.

Your message buffer is directly associated with the health of the connection. If the connection encounters an error, you lose your message buffer.

As I understand, in current code I only lost messages when the whole module will terminate or maybe I miss something.

However, I don’t see any reason in your description to have a buffer. I would remove the buffer part completely and have the processes handling the HTTP processes responsible for calling Rabbit.

I agree, but in my case, clients send requests without checking response status, aka fire and forgot, that’s why I got the idea about using internal message queue instead handling {:reply, :noconnect, state} in HTTP processes like you suggested.

As your service grows, you may instead require a pool of Rabbit connections

It’s a good point and thanks again.

1 Like