Conduit Framework - Can I parameterize Conduit.Broker

Is it possible to parameterize a Conduit.Broker?

I have a scenario in which I’d like to be able to launch multiple instances of an app in a single BEAM, and I want each one to have its own broker. The broker (AMQP is the adapter) creates queue bindings based on an identifier of some sort. I’d like to have a single source file for the broker, but parameterize it when it starts up so each instance of it can have its own queue bindings.

1 Like

I’ll tentatively say no, but maybe depending on exactly what you need.

Can you give a code example of what the queue bindings would look like and what you’re looking to parameterize?
Also, could explain more about what you’re trying to achieve and why this is how you want to achieve it?

Thanks for the reply! I’ll try to explain why I want to do this.

Work queues are queues that can be serviced by any service instance. They have well-known names, so they don’t need to be parameterized. Any worker instance can subscribe to them to receive messages to perform work bound to those queues.

Example Work Queue Name: “TraceLogger”, which binds to “Framework.TraceLogger.#”, so it receives, any message with a routing key that begins with “Framework.TraceLogger”. Any instance of the TraceLogger service can receive a message from this queue and service that message.

Instance queues are specific to a particular instance of a service. They receive responses to requests made by this service instance. Typically this is for handling responses to RPCs, but may also be used for services that perform some stateful function, like receiving and processing blocks of a file. Once a specific instance of a file transfer service has agreed to process a file transfer, all the blocks of that particular file need to come to this particular service instance.

Example Instance Queue Name: “FileTransfer_52f1b81a”. It binds to “Framework.FileTransfer__52f1b81a.#”. The name is used in the ReplyTo field of a response message and becomes the destination of RPC responses and the destination for individual blocks of a file.

There’s another type of queue, a control queue, which enables our service manager to interact with a specific service instance to perform service management functions. It’s like an instance queue, but it doesn’t bind to application-specific messages, but to service control messages.

  def work_queue_name, do: BrokerConfig.service_name

  def instance_queue_name,  do: BrokerConfig.service_name <>
        "_" <> BrokerConfig.instance_name()

  def work_queue_bindings,  do: BrokerConfig.work_queue_bindings() ++
          "Framework." <> work_queue_name() <> ".#",

  def instance_queue_bindings, do: BrokerConfig.instance_queue_bindings() ++
          "Framework." <> instance_queue_name() <> ".#"

  configure do
    queue(&work_queue_name/0, &work_queue_options/0)
    queue(&instance_queue_name/0, &instance_queue_options/0)

  : other broker configuration, pipelines, subscribers, etc...

So an instance of the service is parameterized with a BrokerConfig struct containing service name, instance name, queue options, etc…

Technically speaking, the queue names aren’t important except for the well-known queue name, but the control and instance bindings are unique per service. That’s what we’re trying to parameterize.

If we want to be able to run multiple instances of a service within a single BEAM instance, we need to be able to instantiate a broker for each one, parameterized with a BrokerConfig.

Each is an independent service, made up of a supervision tree of various Elixir processes. Each creates and binds to an instance queue and a control queue and binds to a pre-existing work queue. Our service management framework starts them and shuts them down dynamically within in an Erlang cluster in response to various scaling or application management criteria.

For example, a file transfer service might have several instances, some of which write files to a high speed store, others of which might write files to a low-speed archive store.

All of the services in our AMQP distributed service fabric implement this protocol of queues with dynamic bindings. Our existing services are written in C#, Python, and C++. We are adding Elixir to the mix.

Possible (somewhat hacky) solution: If we made BrokerConfig a per-BEAM Singleton agent we could use logic that limits us to launching a single service instance at a time and it would parameterize itself from that singleton BrokerConfig. Once it’s launched, we could reset the contents of the BrokerConfig and then launch another instance of the service, which would receive the new parameters.

We could also just abandon our naming conventions and use completely random queue names which don’t require parameterization, but since all the other languages can implement the conventions, and our ops people are used to seeing these names in logfiles and rabbitmq management tools, it would be … unfortunate if our Elixir services couldn’t do it.

I know this is long and presents a particular way of interacting with Rabbit that may not be completely standard. It’s a set of conventions that work well for us. Thanks for reading and considering it all.



Thanks, I’ll come back to this later and read it more carefully. But, here’s what I think we could do that also supports some other cases I’ve had in mind.

Right now, the config for the broker is all specified in the application config. I’d like someone to be able to override that by passing config to start_link. That way, you’d be able to do something like this in a supervisor.

children = [
  {MyApp.Broker, [[config: :override]]}

Additionally, I’d be willing to support passing this config merged with the application config to the name and options functions. This would allow you to specify arbitrary config options and use them for parameterization.

Does that sound like it would work for your case? Would you be willing to work on that?

1 Like

I think that’ll work. I’ll DM you to work out the details and I’ll be happy to work on it.