I wrote a unit test where I want to ensure that the app is able to connect to RabbitMQ (using Broadway), so the test will start Broadway, then publish to some exchange, and expect a message from the Broadway processor.
The problem is that it takes time for the Broadway producer to declare the exchange and bind the queue, so in my test when I publish the message it is not routed.
My current solution is to add a small sleep but it is not reliable (and slows down tests).
{:ok, rmq_consumer} = consumer_mod.start_link(job_handler: job_handler, start_mode: :normal)
# Generate a random string to validate that we receive this specific
# message.
randstr = Ecto.UUID.generate()
payload = Jason.encode!(%{"randstr" => randstr})
# Connect to RMQ and deliver the payload to the configured exchange.
# Currently I have not figured a way to ensure that the queue was declared
# and to republish otherwise, so we sleep for one second.
config = consumer_mod.config()
{:ok, conn} = AMQP.Connection.open(config.connection)
{:ok, chan} = AMQP.Channel.open(conn)
Process.sleep(1000)
:ok = ensure_routed_publish(chan, config.exchange, "", payload)
# We should receive the same random string from our test handler
assert_receive {:rmq_str, ^randstr}, 1000
I tried to fiddle with AMQP.Confirm.select, the :mandatory option, the wait_confirm_or_die function, but it does not work. Everytime AMQP will happily tell me that everything is fine though my message will never be routed as with select I can see the error in the logs.
How can I write the ensure_routed_publish/4 function so it returns {:error, _} when the message is not routed so I can retry it and avoid the sleeping?
Unfortunately this does not work because I receive a message only in case there was an error. So when everything goes right i’ll have to add a timeout to the receive, which is only slightly better than the sleep.
the :basic_ack message is always delivered after the :basic_return message, so it may be safe to consider that receiving basic_ack without basic_return means that the message was routed.