I set up a small process that maintains an AMQP connection/channel. This script automatically reconnects to RabbitMQ if the connection fails. I built this based on an old, depreciated and incomplete guide. I’m still a newbie, so I don’t fully understand how to use this. I know the script is connecting and reconnecting (as far as I know), but how do I actually use/fetch the channel?
For example, if I have a function in a different module, such as chatroom.ex, how do I fetch the existing channel in the pool so that I can publish messages to a queue?
def publish_function(chatmessage) do
# Receive a chat room message here and publish it to "test_queue" in RabbitMQ. How do I use the existing connections?
#something like:
channel = ???? #channel in the consumer module?
AMQP.Basic.publish(*channel*, "", "test_queue", chatmessage)
end
Here is what I have so far:
connectionmanager.ex:
defmodule ExchatWeb.AMQPConnectionManager do
use GenServer
use AMQP
def start_link(_opts) do
GenServer.start_link(__MODULE__, :ok, [name: __MODULE__])
end
def init(:ok) do
children = [
ExchatWeb.Publish
]
Supervisor.start_link(children, strategy: :one_for_one, name: ExchatWeb.PublishSupervisor)
establish_new_connection()
end
defp establish_new_connection do
case AMQP.Connection.open do
{:ok, conn} ->
Process.link conn.pid
{:ok, {conn, %{}}}
{:error, reason} ->
IO.puts "failed for #{inspect reason}"
:timer.sleep 5000
establish_new_connection()
end
end
def request_channel(consumer) do
GenServer.cast(__MODULE__, {:chan_request, consumer})
end
def handle_cast({:chan_request, consumer}, {conn, channel_mappings}) do
new_mapping = store_channel_mapping(conn, consumer, channel_mappings)
channel = Map.get(new_mapping, consumer)
consumer.channel_available(channel)
{:noreply, {conn, new_mapping}}
end
defp store_channel_mapping(conn, consumer, channel_mappings) do
Map.put_new_lazy(channel_mappings, consumer, fn() -> create_channel(conn) end)
IO.inspect(channel_mappings)
end
defp create_channel(conn) do
{:ok, chan} = Channel.open(conn)
chan
end
end
publishconsumer.ex:
defmodule ExchatWeb.PublishConsumer do
use GenServer
def start_link(_opts) do
GenServer.start_link(__MODULE__, :ok, [name: __MODULE__])
end
def init(_opts) do
ExchatWeb.AMQPConnectionManager.request_channel(__MODULE__)
{:ok, nil}
end
def channel_available(chan) do
GenServer.cast(__MODULE__, {:channel_available, chan})
end
def handle_cast({:channel_available, chan}, _state) do
IO.inspect(chan)
#bind_to_queue chan
{:noreply, chan}
end
end