I currently have a RoR application that uses the Sneakers gem to monitor RabbitMQ queues and process messages. I’m looking into migrating this to Elixir and have started experimenting. We have about 20 queues, all with different logic that needs to be processed.
I was able to get Broadway up and running and want to create a worker per queue (or per 2 - 3 queues that share business logic). So I started creating workers like this:
defmodule MessageWorker do
require Logger
use Broadway
@queue_names ~w(messages)
def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {BroadwayRabbitMQ.Producer,
queue: "",
connection: [
username: "guest",
password: "guest",
host: "localhost"
],
merge_options: fn (index) ->
queue_name = Enum.fetch!(@queue_names, index)
Logger.info(" connecting to queue #{queue_name}")
[queue: queue_name]
end
},
concurrency: 1
],
processors: [
default: [
concurrency: 10
]
]
)
end
@spec handle_message(any, any, any) :: none
def handle_message(_, message, _context) do
Broadway.Message.ack_immediately(message)
IO.inspect(message.data, label: "MESSAGE Got message")
end
end
This all seems to work, which is great. However since the start_link will be the same for all workers (except for the queue name and maybe the number of processors I would like to put this ‘somewhere’ so I could avoid the code being duplicated in each worker. I just want each worker to specify the queue(s) to monitor and the concurrency. So the complete start_link method shouldn’t be duplicated.
The problem is, for different queues, there are different message handling logic. Maybe we can pack each queue name and the corresponding message handling into a module, and pass that module to a “factory function” that creates the process.