RabbitMQ/amqp noob: naming connections/channels?

I’m starting to work with RabbitMQ and the amqp package and I have a couple noob questions that I’m hoping someone can set me straight on…

First: is it possible to name the RabbitMQ connection PID? I’m used to Phoenix.PubSub where you can start one or more processes and refer to them by the named pid so that messages can be broadcast/published to that instance. In amqp, there is a :name option, but the docs specifically state that it is not unique and cannot be used as a connection identifier. I’m guessing that this is intentional due to some specifics about how RabbitMQ operates, but can anyone shed light on this? How can easily send/publish/broadcast messages? Is the recommended method to open up a new connection as needed? (Instead of holding onto a named connection pid).

Secondly: is there a way to create topics (i.e. open a channel) when the app starts? I’m seeing warnings when parts of the app subscribe to topics that don’t exist yet. I can manually create these channels and the logs quiet down – this is different from Phoenix.PubSub which lets you subscribe to any topic name you want regardless if it has been formally “initialized” or not. Is it advisable to pre-define these channel names (i.e. topic names) when the app starts? Or is that not recommended?

I’m wanting to be more simplistic about sending messages, something more like

# wishful thinking pseudo-code? 
Rabbit.send(:instance_name, "my-topic", "my-message", my_options)

Thanks for any pointers! Sorry if my nomenclature and way of thinking about this is all from the Phoenix.PubSub point of view, but that has been the bulk of experience.

I mean, the following is straight from the root page of the HexDocs page of amqp:

{:ok, conn} = AMQP.Connection.open()
# {:ok, %AMQP.Connection{pid: #PID<0.165.0>}}

{:ok, chan} = AMQP.Channel.open(conn)
# {:ok, %AMQP.Channel{conn: %AMQP.Connection{pid: #PID<0.165.0>}, pid: #PID<0.177.0>}

AMQP.Queue.declare(chan, "test_queue")
# {:ok, %{consumer_count: 0, message_count: 0, queue: "test_queue"}}

AMQP.Exchange.declare(chan, "test_exchange")
# :ok

AMQP.Queue.bind(chan, "test_queue", "test_exchange")
# :ok

AMQP.Basic.publish(chan, "test_exchange", "", "Hello, World!")
# :ok

{:ok, payload, meta} = AMQP.Basic.get(chan, "test_queue")
payload
# "Hello, World!"

AMQP.Queue.subscribe(chan, "test_queue", fn payload, _meta -> IO.puts("Received: #{payload}") end)
# {:ok, "amq.ctag-5L8U-n0HU5doEsNTQpaXWg"}

AMQP.Basic.publish(chan, "test_exchange", "", "Hello, World!")
# :ok
# Received: Hello, World!

Does that not cover your use-case?


Not sure if it’s the same thing but does the configuration section not cover this?

You can define a connection and channel in your config and AMQP will automatically…

  • Open the connection and channel at the start of the application
  • Automatically try to reconnect if they are disconnected
config :amqp,
  connections: [
    myconn: [url: "amqp://guest:guest@myhost:12345"],
  ],
  channels: [
    mychan: [connection: :myconn]
  ]

you can use Broadway RabbitMQ as a subscriber and highly recommend you do that. It can declare queues if they don’t exist yet. Its particularly useful if your consumer is in a separate service and you don’t have control over who generates the queue.
https://hexdocs.pm/broadway_rabbitmq/BroadwayRabbitMQ.Producer.html

If you wrap your connection in a Genserver you can start it with a name and then address it as well