Broadway; consume multiple queues in the same pipeline

I have built a Broadway pipeline with a custom producer that reads from a single queue but I want to start broadway with 3 producers where each one reads from a separate queue. Is there any way to accomplish this?

1 Like

You can start three different topologies but use the same callback module for each of them. Something like

defmodule MyBroadway do
  use Broadway

  def start_link(queue) do
    Broadway.start_link(__MODULE__, ...., queue_name: queue)
  end
end

and then in your supervision tree:

children = [
  {MyBroadway, "queue-1"},
  {MyBroadway, "queue-2"},
  {MyBroadway, "queue-3"}
]

If you want them to be consumed precisely from the same topology (let’s say because you need to share batchers), then that’s a feature that yo need to add to your producer.

3 Likes

Thanks, that is a good idea! In this case I want to limit the number of processors though.

My producer is a bit involved, it fetches “batched items” which will each be processed as a Broadway message and I can’t fetch a new batch of items until all items in a batch are processed. I got this working by letting the producer keep track of which items it has emitted.
Fetching a batch of items is a costly operation in this case (~30s) and I don’t want the pipeline to just idle while this is happening so while I’m processing a batch from queue 1 I’m now fetching next batch from queue 2 concurrently.

I might have just forced a square peg into round hole with my current solution, but I got something that works now at least. :slight_smile:

I have something similar but a bit more contrived :grinning:.

I have an iot service (saas) where I would like to create a queue for every iot device that registers, so that each one of the millions of devices has its own queue and have broadway consuming / processing the messages sent. Is this even possible? As in, have broadway pipelines that consume messages from queues created at runtime?

This is just one possible MQ architecture I’m thinking about, the other one would be to have a single queue and every iot device just sends their messages to this common queue. Is this better than the previous solution? Won’t I possibly run into a queue length limit problem?

has something changed?

I am trying to start the same way, 3 times but its throwing error as

If using maps as child specifications, make sure the :id keys are unique.
If using a module or {module, arg} as child, use Supervisor.child_spec/2 to change the :id, for example:

    children = [
      Supervisor.child_spec({MyWorker, arg}, id: :my_worker_1),
      Supervisor.child_spec({MyWorker, arg}, id: :my_worker_2)
    ]

Jose’s example is a simplification. You have to give a different id for each supervised process (like described in the error message) and a different name when calling Broadway’s start_link/2. You can do both at the same time by defining a child_spec function in the module you use Broadway.

1 Like