Help: How to use AMQP channel from pool for publishing messages?

I set up a small process that maintains an AMQP connection/channel. This script automatically reconnects to RabbitMQ if the connection fails. I built this based on an old, depreciated and incomplete guide. I’m still a newbie, so I don’t fully understand how to use this. I know the script is connecting and reconnecting (as far as I know), but how do I actually use/fetch the channel?

For example, if I have a function in a different module, such as chatroom.ex, how do I fetch the existing channel in the pool so that I can publish messages to a queue?

def publish_function(chatmessage) do
   # Receive a chat room message here and publish it to "test_queue" in RabbitMQ. How do I use the existing connections?
   #something like:
  channel = ???? #channel in the consumer module?
  AMQP.Basic.publish(*channel*, "", "test_queue", chatmessage)

end

Here is what I have so far:

connectionmanager.ex:

defmodule ExchatWeb.AMQPConnectionManager do

  use GenServer
  use AMQP

  def start_link(_opts) do
    GenServer.start_link(__MODULE__, :ok, [name: __MODULE__])
  end

  def init(:ok) do
    children = [
      ExchatWeb.Publish
    ]

    Supervisor.start_link(children, strategy: :one_for_one, name: ExchatWeb.PublishSupervisor)
    establish_new_connection()
  end

  defp establish_new_connection do
    case AMQP.Connection.open do
      {:ok, conn} ->
        Process.link conn.pid
        {:ok, {conn, %{}}}
      {:error, reason} ->
        IO.puts "failed for #{inspect reason}"
        :timer.sleep 5000
        establish_new_connection()
    end
  end

  def request_channel(consumer) do
    GenServer.cast(__MODULE__, {:chan_request, consumer})
  end

  def handle_cast({:chan_request, consumer}, {conn, channel_mappings}) do
    new_mapping = store_channel_mapping(conn, consumer, channel_mappings)
    channel = Map.get(new_mapping, consumer)
    consumer.channel_available(channel)
    {:noreply, {conn, new_mapping}}
  end

  defp store_channel_mapping(conn, consumer, channel_mappings) do
    Map.put_new_lazy(channel_mappings, consumer, fn() -> create_channel(conn) end)
    IO.inspect(channel_mappings)
  end

  defp create_channel(conn) do
    {:ok, chan} = Channel.open(conn)
    chan
  end

end

publishconsumer.ex:

defmodule ExchatWeb.PublishConsumer do
  use GenServer

  def start_link(_opts) do
    GenServer.start_link(__MODULE__, :ok, [name: __MODULE__])
  end

  def init(_opts) do
    ExchatWeb.AMQPConnectionManager.request_channel(__MODULE__)
    {:ok, nil}
  end

  def channel_available(chan) do
    GenServer.cast(__MODULE__, {:channel_available, chan})
  end

  def handle_cast({:channel_available, chan}, _state) do
    IO.inspect(chan)
    #bind_to_queue chan
    {:noreply, chan}
  end

end

Someone must know how to do this. This is basically a conceptual question that could be applied to many other Elixir modules, not just RabbitMQ and Broadway. If the consumer process and connection are linked together, how do I actually access the live connection?

I’ve been testing for awhile, I know the connection exists and re-connects, as I can see it in my RabbitMQ admin panel and can manually close the connection - and my script reconnects automatically. But I can’t seem to actually find and use this connection in the Elixir module script. Like how do I find it to publish messages to RabbitMQ without having to start a new connection for each and every message sent?

If I am reading this correctly, you want to checkout a channel from your connection pool and publish to said channel. Based off the code you’ve posted, I assume the following:

  • Your connection pool abstraction is using a process (although I would use something like Poolboy)
  • You need a method akin to ConnectionManager.checkout(...) which results in getting a channel that you can use.

In this instance, user land code may look like this:

def publish_msg(msg) do
  channel = ConnectionManager.checkout()
  AMQP.Basic.publish(channel, "", "test_queue", msg)
  # Check the connection back in.
end

This means that while ConnectionManager is a process, you need to change it to use a synchronous API, using the call() variants. You would also need to implement your check-in logic, unless you use a callback API where it automatically cleans up for you.

def callback_example(msg) do
  ConnectionManager.with_channel(fn channel ->
    AMQP.Basic.publish(channel, "", "test_queue", msg)
    # Check-ins are implicit.
  end)
end

Your handle_cast simply will not work as it is now. Since your user land code assumes that a channel is something that you return, you can’t use a handle_cast, unless you utilise GenServer.reply, which isn’t necessary in your use case.

1 Like