I have a GenServer setup that connects to RabbitMQ using amqp from https://github.com/pma/amqp, and I’m trying to figure out how to make it consume more than 1 message at a time.
Originally I had a TaskSupervisor running, and using:
defp consume(channel, tag, redelivered, payload) do
Task.Supervisor.async(
MessageConsumer.TaskSupervisor,
MessageConsumer.Task,
process_msg,
[channel, tag, payload]
)
|> Task.await()
|> ack_processed_msg
end
But then it occoured to me that it isn’t doing what I think it is doing, and I might as well just do the processing in the consumer function, as it will still wait before processing the next message.
I then thought that maybe spawning a new process would do it:
defp consume(channel, tag, redelivered, payload) do
spawn fn ->
Task.Supervisor.async(
MessageConsumer.TaskSupervisor,
fn ->
try do
MessageConsumer.Task.process_msg(channel, tag, payload)
catch _type, error ->
{:error, error, %{channel: channel, tag: tag, redelivered: redelivered}}
end
end
)
|> Task.await()
|> ack_processed_msg
end
end
Which I think will do what I want, but then there isn’t any way that I can see to catch run away processes that get stuck and never end.
Im now thinking that maybe the spawning should be done when calling consume:
def handle_info({:basic_deliver, payload, %{delivery_tag: tag, redelivered: redelivered}}, chan) do
consume(chan, tag, redelivered, payload)
{:noreply, chan}
end
But I’m not sure.
Whats the best way to achieve this? Ideally I would like to limit it to say, 5 concurrent messages being processed at any time.