Hi,
This a question regarding best practices when handling 3rd party exceptions/exits.
I ran into a problem with calling AMQP.Queue.declare/3. The spec says it will return a standard ok/error tuple.
@spec declare(AMQP.Channel.t, AMQP.Basic.queue, keyword) :: {:ok, map} | AMQP.Basic.error
Basic.error() :: {:error, reason :: :blocked | :closing}
However, I ran into a situation where the call exited the process. I don’t think the exact reason matters here, but now I also have to handle an exit when calling the function. Otherwise it will exit the child 5 times in rapid succession, which shuts down the Application/Supervisor due to reached_max_restart_intensity.
I have a reconnect interval for error tuples, which works fine for standard connection issues. But it didn’t catch exits. So now I also have to handle the exit when calling the function and convert it to a tuple, so the standard reconnect interval is used.
My question is: how to avoid this kind of problem resulting in reached_max_restart_intensity?
Do you always have to handle 3 cases (error tuples, exceptions and exits) to be robust? That seems a bit of an overkill.
The code is listed below:
defmodule RabbitmqPublisher.Consumer do
use GenStage
require Logger
use AMQP
@exchange "test_exchange"
@queue "test"
@queue_error "#{@queue}_error"
@reconnect_interval 10_000
def start_link(_args) do
GenStage.start_link(__MODULE__, [], name: __MODULE__)
end
def init(_) do
send(self(), :connect)
{:consumer, nil}
end
def handle_info(:connect, _chan) do
with {:ok, conn} <- RabbitmqPublisher.AMQP.get_connection(),
{:ok, chan} <- Channel.open(conn),
:ok <- setup_queue(chan),
:ok <- GenStage.async_subscribe(__MODULE__, to: RabbitmqPublisher.Producer, max_demand: 1) do
Process.monitor(chan.pid)
{:noreply, [], chan}
else
error ->
Logger.error("Failed to connect #{@queue}: #{inspect error}. Reconnecting later...")
Process.send_after(self(), :connect, @reconnect_interval)
{:noreply, [], nil}
end
end
def handle_info({:DOWN, _, :process, _pid, reason}, _) do
# Stop GenServer. Will be restarted by Supervisor.
{:stop, {:connection_lost, reason}, nil}
end
def handle_events([event], _from, chan) do
AMQP.Basic.publish(chan, @exchange, "", event)
{:noreply, [], chan}
end
defp setup_queue(chan) do
with {:ok, _} <- Queue.declare(chan, @queue_error, durable: true),
{:ok, _} <- Queue.declare(chan, @queue,
durable: true,
arguments: [
{"x-dead-letter-exchange", :longstr, ""},
{"x-dead-letter-routing-key", :longstr, @queue_error}
]
),
:ok <- Exchange.fanout(chan, @exchange, durable: true),
:ok <- Queue.bind(chan, @queue, @exchange) do
:ok
else
error ->
Logger.error("Failed to setup queue #{@queue}: #{inspect error}. Propagating error")
error
end
# Queue.declare can also exit, so we need to handle that, otherwise we run into reached_max_restart_intensity
catch
:exit, reason -> {:exit, reason}
end
end