Broadway; consume multiple queues in the same pipeline

I have built a Broadway pipeline with a custom producer that reads from a single queue but I want to start broadway with 3 producers where each one reads from a separate queue. Is there any way to accomplish this?

1 Like

You can start three different topologies but use the same callback module for each of them. Something like

defmodule MyBroadway do
  use Broadway

  def start_link(queue) do
    Broadway.start_link(__MODULE__, ...., queue_name: queue)
  end
end

and then in your supervision tree:

children = [
  {MyBroadway, "queue-1"},
  {MyBroadway, "queue-2"},
  {MyBroadway, "queue-3"}
]

If you want them to be consumed precisely from the same topology (let’s say because you need to share batchers), then that’s a feature that yo need to add to your producer.

2 Likes

Thanks, that is a good idea! In this case I want to limit the number of processors though.

My producer is a bit involved, it fetches “batched items” which will each be processed as a Broadway message and I can’t fetch a new batch of items until all items in a batch are processed. I got this working by letting the producer keep track of which items it has emitted.
Fetching a batch of items is a costly operation in this case (~30s) and I don’t want the pipeline to just idle while this is happening so while I’m processing a batch from queue 1 I’m now fetching next batch from queue 2 concurrently.

I might have just forced a square peg into round hole with my current solution, but I got something that works now at least. :slight_smile: