Best practices in handling 3rd party libraries exceptions/exits (i.e. AMQP exits on AMQP.Queue.declare/3)

Hi,

This a question regarding best practices when handling 3rd party exceptions/exits.

I ran into a problem with calling AMQP.Queue.declare/3. The spec says it will return a standard ok/error tuple.

@spec declare(AMQP.Channel.t, AMQP.Basic.queue, keyword) :: {:ok, map} | AMQP.Basic.error
Basic.error() :: {:error, reason :: :blocked | :closing}

However, I ran into a situation where the call exited the process. I don’t think the exact reason matters here, but now I also have to handle an exit when calling the function. Otherwise it will exit the child 5 times in rapid succession, which shuts down the Application/Supervisor due to reached_max_restart_intensity.

I have a reconnect interval for error tuples, which works fine for standard connection issues. But it didn’t catch exits. So now I also have to handle the exit when calling the function and convert it to a tuple, so the standard reconnect interval is used.

My question is: how to avoid this kind of problem resulting in reached_max_restart_intensity?
Do you always have to handle 3 cases (error tuples, exceptions and exits) to be robust? That seems a bit of an overkill.

The code is listed below:

defmodule RabbitmqPublisher.Consumer do
  use GenStage
  require Logger
  use AMQP

  @exchange    "test_exchange"
  @queue       "test"
  @queue_error "#{@queue}_error"
  @reconnect_interval 10_000

  def start_link(_args) do
    GenStage.start_link(__MODULE__, [], name: __MODULE__)
  end

  def init(_) do
    send(self(), :connect)
    {:consumer, nil}
  end

  def handle_info(:connect, _chan) do
    with {:ok, conn} <- RabbitmqPublisher.AMQP.get_connection(),
         {:ok, chan} <- Channel.open(conn),
         :ok <- setup_queue(chan),
         :ok <- GenStage.async_subscribe(__MODULE__, to: RabbitmqPublisher.Producer, max_demand: 1) do

      Process.monitor(chan.pid)
      {:noreply, [], chan}
    else
      error ->
        Logger.error("Failed to connect #{@queue}: #{inspect error}. Reconnecting later...")
        Process.send_after(self(), :connect, @reconnect_interval)
        {:noreply, [], nil}
    end
  end

  def handle_info({:DOWN, _, :process, _pid, reason}, _) do
    # Stop GenServer. Will be restarted by Supervisor.
    {:stop, {:connection_lost, reason}, nil}
  end

  def handle_events([event], _from, chan) do
    AMQP.Basic.publish(chan, @exchange, "", event)
    {:noreply, [], chan}
  end

  defp setup_queue(chan) do
    with {:ok, _} <- Queue.declare(chan, @queue_error, durable: true),
         {:ok, _} <- Queue.declare(chan, @queue,
           durable: true,
           arguments: [
             {"x-dead-letter-exchange", :longstr, ""},
             {"x-dead-letter-routing-key", :longstr, @queue_error}
           ]
         ),
         :ok <- Exchange.fanout(chan, @exchange, durable: true),
         :ok <- Queue.bind(chan, @queue, @exchange) do
      :ok
    else
      error ->
        Logger.error("Failed to setup queue #{@queue}: #{inspect error}. Propagating error")
        error
    end
  # Queue.declare can also exit, so we need to handle that, otherwise we run into reached_max_restart_intensity
  catch
    :exit, reason -> {:exit, reason}
  end
end
1 Like

Hi, I don’t have a good answer for this type of issue (I do similar handling when met with such an API) but I’d like to say a couple of things

First, a digression: the fact that you could not predict the behaviour of this function from just the type (spec) makes it a partial, not a total function. Functions like these tend to spring unpleasant surprises on the programer during runtime and I try to avoid writing them at all costs. When met in a library I tend to create wrappers to convert exits and throws to error tuples before they get a chance to complicate the rest of the codebase.

Second: I see that you’re declaring queues (and possibly other topology pieces like exchanges perhaps?) inside your code. Even though the AMQP protocol allows for that, I don’t think it’s a good practice for your code to take on extra responsibilities like that. We’ve had a lot of success treating RabbitMQ as a piece of infrastructure (eg like a database) and making ‘migration’ files to be run outside our apps by a separate process. Because of the way RabbitMQ behaves, you may often find yourself declaring the same parts of topology in two or more otherwise separate applications to account for issues like ‘app B went up first before A, so it needs to make sure queues are in place before consuming. Exchanges too, because queues are usually bound to an exchange. But A may go up first, so we must also declare the exchange and/or queues there or we’ll get errors when publishing’. And by doing this it makes it very difficult to remember all the places a piece of topology is declared when you need to make changes (like attributes). And it makes it very hard to deploy the changes because now both apps need deployment simultaneously, or the declaration will go from a no-op (in the case when declaring something that exists already) to an error that you’ll have a hard time recovering from.

So what we do at work is create json files with the declarations and use the http api of RabbitMQ admin to setup the rabbit server. When changes are required there’s a single place to do them, and most of the times can be made without affecting the dependent apps. All of this is hooked into ci/cd which makes it easier to manage.

1 Like

Thanks, that sounds like a better way to do it indeed.
I would solve my particular issue, because it would bring me back from a situation with partials to simple functions.

So as I understand generally: if you encounter partials, make a wrapper that converts then into functions. The problem there is, how do you know? The partial only reveals itself when the system goes down…

Yeah, you’re right of course- bad behavior from partial functions will manifest in runtime/production more often than not, when it’s too late to catch/prevent.

The places were I’ve met such behavior is mostly from using erlang/OTP APIs- they tend to be pretty unforgiving in case of errors. Examples are the :queue, :gb_trees and :ets modules, and I’ve also seen this in libraries (:erlsom for xml parsing comes to mind). So I’m usually more alert when directly consuming erlang code, looking for places in the source code where errors are thrown instead of returned. I’m sorry I don’t have anything more concrete for you other than learning the hard way and relying on intuition and past experience on when to be on your guard :man_shrugging:

We have a production app using this library and since it’s about comminucation with external system, we assume stuff can go wrong anyway (the MQ server is down and such) so we have a system that retries the connection every few seconds, we catch exceptions and treat them as downtime.

Here’s a gist of it (some logger calls removed for brevity):

  defp maybe_refresh_connection(state) do
    if state[:conn] && Process.alive?(state[:conn].pid) do      
      {:ok, state}
    else      
      try do
        with {:ok, conn} <- AMQP.Connection.open(conn_params()),
             {:ok, chan} <- AMQP.Channel.open(conn) do
          setup_state(state, conn, chan)
        else
          err ->            
            {:error, :downtime}
        end
      rescue
        e ->          
          {:error, :downtime}
      end
    end
  end

Inside of the setup_state/3 the exchange and queues are declared in such way that it fails on anything but :ok tuple (eg {:ok, _map} = AMQP.Queue.declare(chan, q_name, durable: true)) and farther outside we use Process.send_after/3 to retry handling of failed events - it’s the producer part, so it would use process mailbox as a temporary queue until the MQ server is up again or we run out of memory which did not happen yet :slight_smile:

Hope it helps!

Hi @yurko,

Technically, I don’t think this approach would catch an :exit (which AMQP.Queue.declare/3 threw for me). Only catch/1 and catch/2 do, rescue/1 does not.

Now, this doesn’t matter as the :exit was quite exotic (remove user rights on an already existing queue in an operational situation, while re-declaring it continuesly) and declare can better be handled like a migration, as @bottlenecked mentioned. But if a consistent reproducible exit would be thrown by one of your AMQP calls, you would end up crashing the process as the rescue doesn’t catch is. The immediate supervised retry would crash immediately again and the Supervisor would give up after a couple of tries due to reached_max_restart_intensity.

So you are not 100% safe here. Which was core of my point: you apparently are never safe, unless you use catch/2 with an exception and exit clause.

1 Like

Thanks for the feedback, you are right here. The code dates back to 2017 and there were never any issues, but using catch there, which is really exotic in the Beam world, might indeed be better in that situation.