Catching NO_ROUTE from AMQP.Basic.publish

Hi everyone!

I wrote a unit test where I want to ensure that the app is able to connect to RabbitMQ (using Broadway), so the test will start Broadway, then publish to some exchange, and expect a message from the Broadway processor.

The problem is that it takes time for the Broadway producer to declare the exchange and bind the queue, so in my test when I publish the message it is not routed.

My current solution is to add a small sleep but it is not reliable (and slows down tests).

{:ok, rmq_consumer} = consumer_mod.start_link(job_handler: job_handler, start_mode: :normal)

# Generate a random string to validate that we receive this specific
# message.

randstr = Ecto.UUID.generate()
payload = Jason.encode!(%{"randstr" => randstr})

# Connect to RMQ and deliver the payload to the configured exchange.
# Currently I have not figured a way to ensure that the queue was declared
# and to republish otherwise, so we sleep for one second.

config = consumer_mod.config()
{:ok, conn} = AMQP.Connection.open(config.connection)
{:ok, chan} = AMQP.Channel.open(conn)
Process.sleep(1000)
:ok = ensure_routed_publish(chan, config.exchange, "", payload)

# We should receive the same random string from our test handler

assert_receive {:rmq_str, ^randstr}, 1000

I tried to fiddle with AMQP.Confirm.select, the :mandatory option, the wait_confirm_or_die function, but it does not work. Everytime AMQP will happily tell me that everything is fine though my message will never be routed as with select I can see the error in the logs.

How can I write the ensure_routed_publish/4 function so it returns {:error, _} when the message is not routed so I can retry it and avoid the sleeping?

Thank you.

You can use return to register a handler to deal with returned messages.

If you get this message: {:basic_return, payload, meta}, send it again.

https://hexdocs.pm/amqp/AMQP.Basic.html#return/2

1 Like

Hi, thank you.

Unfortunately this does not work because I receive a message only in case there was an error. So when everything goes right i’ll have to add a timeout to the receive, which is only slightly better than the sleep.

Edit: It seems that if I also add the following:

    AMQP.Confirm.select(chan)
    AMQP.Confirm.register_handler(chan, self())

the :basic_ack message is always delivered after the :basic_return message, so it may be safe to consider that receiving basic_ack without basic_return means that the message was routed.

Would something like this work?

Def ensure_routed_publish(chan, exchange, routing_key, payload) do
AMQP.Basic.return(chan, self())
AMQP.Confirm.select(chan)
AMQP.Confirm.register_handler(chan, self())
:ok = AMQP.Basic.publish(chan, exchange, routing_key, payload, mandatory: true)

receive do
{:basic_ack, _, _} →
:ok
{:basic_return, returned_payload, _} →
{:error, returned_payload}
after
timeout →
{:error, :timeout}
end
end

Yes this is more or less what I did:

  defp ensure_routed_publish(chan, exchange, key, payload) do
    AMQP.Confirm.select(chan)
    AMQP.Confirm.register_handler(chan, self())
    AMQP.Basic.return(chan, self())
    publish_loop(chan, exchange, key, payload)
  end

  defp publish_loop(chan, exchange, key, payload) do
    :ok = AMQP.Basic.publish(chan, exchange, key, payload, mandatory: true)

    receive do
      {:basic_return, _, %{reply_text: "NO_ROUTE"}} ->
        receive do
          {:basic_ack, _, _} ->
            Process.sleep(50)
            publish_loop(chan, exchange, key, payload)
        end

      {:basic_ack, _, _} ->
        :ok
    after
      5000 -> flunk("could not deliver message to RabbitMQ")
    end
  end

(It is in a test so we know the queue will exist at some point.)

1 Like

Hi!

Do you know if there is a way to catch when an exchange does not exist?

operation basic.publish caused a channel exception not_found: no exchange ‘does_not_exist’ in vhost ‘/’

I have the following code:

    :ok = AMQP.Confirm.select(chan)
    :ok = AMQP.Confirm.register_handler(chan, self())

And was expecting to receive a basic nack.

I also have this code AMQP.Basic.return(chan, self()) but it does not seem to handle that.
Thank you!