sprql

sprql

What is proper way to handle messages when connection is not ready?

I’m working on an app which receives messages via HTTP and sends it to RabbitMQ. So, I have GenServer which handles connection and publishing to a rabbit.

I’m curious about a proper approach to implement buffer/message queue to handle messages while
connection to rabbit is not ready or in a reconnection loop.

To implement rabbit handler I use GitHub - elixir-ecto/connection: Connection behaviour for connection processes · GitHub behavior and amqp:

defmodule Rabbit do
  use Connection
  ...
end

One of the ways to handle messages while a connection is not ready is to use Erlang queue module inside Rabbit module, like this:

def handle_call({:publish, message}, _from, %{conn: nil, queue: queue} = state) do
  new_queue = :queue.in(message, queue)
  {:reply, :ok, %{state | queue: new_queue}}
end

def connect(_, state) do
  with conn <- make_connection() do
    state = %{state | conn: conn}
    dequeue(state)
  else
    # error handling
  end
end

def dequeue(state) do
  case :queue.out(state.queue) do
    {{:value, message}, new_queue} ->
      publish_message(state.conn, message)
      dequeue(%{state | queue: new_queue})
    {:empty, _} -> state
  end
end

So, is it a good approach? Or should I use dedicated GenServer for queue? Or maybe something else.

Most Liked

DavidAntaramian

DavidAntaramian

I have a few suggestions based on what you’ve said so far, and I hope they help. Reading through your description, you are first sending all messages to a GenServer, Rabbit, which is responsible for maintaining a connection to a RabbitMQ server and acting as a message buffer for the upstream service. You may have done this intentionally, but this raises two concerns for me:

  • Your message buffer is directly associated with the health of the connection. If the connection encounters an error, you lose your message buffer.
  • You’ve serialized the message dispatch to Rabbit; this creates an artificial bottleneck in your code that you don’t need unless you must send the messages in the order they were received by the GenServer (which is unlikely given that it would be difficult to guarantee their order in the first place)

To put this more bluntly, Rabbit has two responsibilities in its state: the connection and the message queue.

The Connection behaviour is specifically designed to help manage a connection to a remote service, and the documentation is very useful in understanding how to start a connection when the process is started (by returning {:connect, info, state} inside of init/1). This is a good use of a GenServer process because the connection and connection state are held in the GenServer state. The GenServer can then give informative responses to callers.

However, I don’t see any reason in your description to have a buffer. I would remove the buffer part completely and have the processes handling the HTTP processes responsible for calling Rabbit. Because Rabbit uses Connection, even when the connection to the Rabbit server isn’t available, it can handle messages from callers and inform them, with a response like {:reply, :noconnect, state}. The caller can then make a decision about whether to retry or inform the downstream client that there was an error.

The model I described above has a few benefits:

  • There is no longer any message queue to deal with, so you aren’t dealing with the potential to “lose everything” that was in a GenServer state because of minor errors
  • The success state is pushed farther downstream, towards the client, and the client is now in agreement about the state of the message. This empowers the end user to make a decision to try again or not, and it also avoids a situation where the end user was informed something succeeded only for it to fail silently somewhere else.

For the model I described above, in the supervision tree, Rabbit should be at the far left, and the HTTP processes should be to the right of Rabbit (possibly as children of an HTTP process supervisor). This ensures Rabbit is started and allowed to connect before HTTP requests are handled. Also, in the case that Rabbit suffers irrecoverable issues and isn’t able to maintain a connection, this will force the HTTP processes to restart.

The model above still suffers an inherent flaw, though: the GenServer will only processes messages serially, so all HTTP connections will have to share the single Rabbit connection. If your HTTP requests pile up faster than your connection can handle them, the calls will timeout. As your service grows, you may instead require a pool of Rabbit connections, in which case the RabbitPool would be at the far left in the same manner as described above, but the HTTP processes must now check out a Rabbit connection from the pool before performing operations. This allows you to service multiple HTTP requests concurrently.

Overall, I think that would move you towards a more robust service and avoid headaches about losing state. I may have missed something in your description that makes what I described above inadequate, though. If that’s the case, let me know what it is, can I can try and provide a better answer.

mjadczak

mjadczak

This will have the disadvantage of blocking the caller until a connection is established. However, if it’s needed, you can use this to your advantage by implementing a form of back-pressure—for the first X calls, handle with the internal queue, but if that gets too large, refuse to handle the calls (:noreply) and only reply to them once their request is processed so that they cannot submit more requests until then. Watch out for GenServer.call timeouts there though.

hpopp

hpopp

:queue is fine for things like this, and generally how I’d implement it. For anything more sophisticated I’d take a look at GenStage.

Where Next?

Popular in Questions Top

shahryarjb
Hello, I get Persian date from my client and convert it to normal calendar like this: def jalali_string_to_miladi_english_number(persi...
New
Patoshizzle
After calling mix ecto.create I get this error: 17:00:32.162 [error] GenServer #PID&lt;0.412.0&gt; terminating ** (Postgrex.Error) FATAL...
New
jaysoifer
Is there a way to rollback a specific migration and only that one (“skipping” all the other ones)? Would mix ecto.rollback -v 200809061...
New
jononomo
I am trying to figure out how Mix knows whether the environment is test, dev, or prod – where is this set? Thanks.
New
nobody
How to bind a phoenix app to a specific ip address? could not find anything about that, nowhere, unfortunately, but for me this is quite...
New
itssasanka
Hi all, Trying to get some more clarity over utc_datetime and naive_datetime for Ecto: The documentation above suggests that while ...
New
freewebwithme
Using vs code and installed ElixirLS: support and debugger. And I got an error popped up on start up says Failed to run ‘elixir’ comma...
New
shijith.k
I am trying to start a new phoenix project with elixir 1.9, but mix phx.new does not work. It says that ** (Mix) The task "phx.new" could...
New
dotdotdotPaul
Okay, I’m having a heck of a time trying to figure out how to best handle the validation of belongs_to associations in Ecto. I’m sure I’...
New
marick
I had some trouble figuring out how to make many-to-many associations work. Once I got it working, I wrote a blog post. Because I’m a nov...
New

Other popular topics Top

vertexbuffer
Hello, can anybody help here..? I have a list of players and I what to delete an element, but every for loop the list is reverting to ori...
New
AstonJ
Posting this to see if we can make things easier for people to get into Neovim. If you use Neovim and have a favourite distro please let ...
New
Patoshizzle
After calling mix ecto.create I get this error: 17:00:32.162 [error] GenServer #PID&lt;0.412.0&gt; terminating ** (Postgrex.Error) FATAL...
New
dokuzbir
I want to highlight html closing tags when i click a html tag. That works in .html files but doesnt work for html.eex templates. How can...
New
RisingFromAshes
I’ve read in another post that it may be possible with a router helper - but I couldn’t find an appropriate one, and tbh, I’m still just ...
New
jay1
Why is it that the mnesia database isn’t the most preferred database for use in Elixir/Phoenix?
New
AstonJ
We’ve put together this wiki for Phoenix LiveView - please feel free to add any info you feel is worth including. What is Phoenix LiveV...
New
marick
I had some trouble figuring out how to make many-to-many associations work. Once I got it working, I wrote a blog post. Because I’m a nov...
New
openscript
Hello! Sorry for this astonishing simple question, but I’m really stuck. I try to set up the intellij-elixir plugin, but I don’t know ho...
New
PeterCarter
There are pre-rolled solutions for other frameworks that do work. However, Phoenix does not seem to have these. Have people had good expe...
New

We're in Beta

About us Mission Statement