Processing multiple messages in parallel with AMQP

I have a GenServer setup that connects to RabbitMQ using amqp from https://github.com/pma/amqp, and I’m trying to figure out how to make it consume more than 1 message at a time.

Originally I had a TaskSupervisor running, and using:

defp consume(channel, tag, redelivered, payload) do
  Task.Supervisor.async(
    MessageConsumer.TaskSupervisor,
    MessageConsumer.Task,
    process_msg,
    [channel, tag, payload]
  )
  |> Task.await()
  |> ack_processed_msg
end

But then it occoured to me that it isn’t doing what I think it is doing, and I might as well just do the processing in the consumer function, as it will still wait before processing the next message.

I then thought that maybe spawning a new process would do it:

defp consume(channel, tag, redelivered, payload) do
  spawn fn ->
    Task.Supervisor.async(
      MessageConsumer.TaskSupervisor,
      fn ->
        try do
          MessageConsumer.Task.process_msg(channel, tag, payload)
        catch _type, error ->
          {:error, error, %{channel: channel, tag: tag, redelivered: redelivered}}
        end
      end
    )
    |> Task.await()
    |> ack_processed_msg
  end
end

Which I think will do what I want, but then there isn’t any way that I can see to catch run away processes that get stuck and never end.

Im now thinking that maybe the spawning should be done when calling consume:

def handle_info({:basic_deliver, payload, %{delivery_tag: tag, redelivered: redelivered}}, chan) do
  consume(chan, tag, redelivered, payload)
  {:noreply, chan}
end

But I’m not sure.

Whats the best way to achieve this? Ideally I would like to limit it to say, 5 concurrent messages being processed at any time.

Hi,

To make your solution of one consumer process spawning multiple tasks, you’ll want to set the channel prefetch-count using Basic.qos/2. This tells the broker how many messages it will dispatch to the consumer pid before stopping and waiting for acks/rejects.

Or you can start a pool of N separate consumer processes, each with it’s own channel. All consumers can share the same connection, since you can open multiple channels per connection.

With the later, provided your consumers are supervised, if a consumer pid dies for some transient reason before acknowledging, the broker will automatically send it to the next consumer.

2 Likes

Thanks for the great library, by the way! :slight_smile:

So just by setting the Basic.qos(chan, prefetch_count: 5) it would automatically consume 5 messages at a time? I assume that would mean I wouldn’t even need to use a Task (other than to prevent it killing the consumer)?

Do you mean as in create multiple supervised worker(Consumer) or once a connection is established, use something like poolboy to set up a pool of channels?

Honestly, I think I was so absorbed with using spawn and tasks that I completely missed that I could just fire up another Consumer process as the easiest way.

So just by setting the Basic.qos(chan, prefetch_count: 5) it would automatically consume 5 messages at a time? I assume that would mean I wouldn’t even need to use a Task (other than to prevent it killing the consumer)?

By setting the prefetch_count to 5 you’ll still need to spawn sub-processes if you want them to be processed in parallel. The difference is that the broker now dispatches up to 5 messages at a time before blocking and waiting for an acknowledge/reject.

Do you mean as in create multiple supervised worker(Consumer) or once a connection is established, use something like poolboy to set up a pool of channels?

Yes. I’d just use regular processes under a Supervisor. You won’t be needing the checkout/checkin functions that poolboy gives you.

1 Like

Thanks very much! :slight_smile:

Something I keep forgetting about Elixir is that if something seems too simple, that’s because it actually is that simple!

Sorry for replying on a old thread, but I tried this:

This doesn’t seem to work completely.

Suppose I have this situation: a queue with 20 messages, 3 workers

If I send new messages to the queue, all 3 workers receive something. But if I kill my application and restart with messages still in the queue, only one workers consumes messages. Not sure what’s the cause of this?

Solved my own problem, I didn’t set a prefetch_count on my channel. Setting this to 1 fixes it.
:ok = AMQP.Basic.qos(chan, prefetch_count: 1)