How to avoid code duplication (newbie question)

Hi,

I currently have a RoR application that uses the Sneakers gem to monitor RabbitMQ queues and process messages. I’m looking into migrating this to Elixir and have started experimenting. We have about 20 queues, all with different logic that needs to be processed.

I was able to get Broadway up and running and want to create a worker per queue (or per 2 - 3 queues that share business logic). So I started creating workers like this:

defmodule MessageWorker do
  require Logger
  use Broadway

  @queue_names ~w(messages)

  def start_link(_opts) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {BroadwayRabbitMQ.Producer,
          queue: "",
          connection: [
            username: "guest",
            password: "guest",
            host: "localhost"
          ],
          merge_options: fn (index) ->
            queue_name = Enum.fetch!(@queue_names, index)
            Logger.info(" connecting to queue #{queue_name}")
            [queue: queue_name]
          end
        },
        concurrency: 1
      ],
      processors: [
        default: [
          concurrency: 10
        ]
      ]
    )
  end

  @spec handle_message(any, any, any) :: none
  def handle_message(_, message, _context) do
    Broadway.Message.ack_immediately(message)
    IO.inspect(message.data, label: "MESSAGE Got message")
  end

end

This all seems to work, which is great. However since the start_link will be the same for all workers (except for the queue name and maybe the number of processors I would like to put this ‘somewhere’ so I could avoid the code being duplicated in each worker. I just want each worker to specify the queue(s) to monitor and the concurrency. So the complete start_link method shouldn’t be duplicated.

How can I do this?

I would recommend creating a helper module for this. Here’s how one might look:

defmodule MessageWorkerUtils do
 def broadway_options(module_name, queue_names) do
    [
      name: module_name,
      producer: [
        module: {BroadwayRabbitMQ.Producer,
          queue: "",
          connection: [
            username: "guest",
            password: "guest",
            host: "localhost"
          ],
          merge_options: fn (index) ->
            queue_name = Enum.fetch!(queue_names, index)
            Logger.info(" connecting to queue #{queue_name}")
            [queue: queue_name]
          end
        },
        concurrency: 1
      ],
      processors: [
        default: [
          concurrency: 10
        ]
      ]
    )
  ]
end

Then you could call it in your start_link like:

def start_link(_opts) do
  options = MessageWorkerUtils.broadway_options(__MODULE__, @queue_names)
  Broadway.start_link(__MODULE__, options)
end

And welcome to the forum! :wave:

2 Likes

Hi @axelson,

Thanks a lot! Tried it out and this works great.

1 Like

I don’t know if this is a good idea, but it sounds like we could use a little bit metaprogramming.

defmodule MQWorker do
  @callback handle_message(any(), any(), any()) :: none()

  defmacro __using__(queue_names) do
    quote do
      @behaviour unquote(__MODULE__)

      def start_link(_opts) do
        Broadway.start_link(__MODULE__,
          name: __MODULE__,
          producer: [
            module: {BroadwayRabbitMQ.Producer,
              queue: "",
              connection: [
                username: "guest",
                password: "guest",
                host: "localhost"
              ],
              merge_options: fn (index) ->
                queue_name = Enum.fetch!(unquote(queue_names), index)  #<---- Replace `@queue_names` with `unquote(queue_names)`
                Logger.info(" connecting to queue #{queue_name}")
                [queue: queue_name]
              end
            },
            concurrency: 1
          ],
          processors: [
            default: [
              concurrency: 10
            ]
          ]
        )
      end
    end
  end
end

Then in each of your actual worker module, just use MQWorker, ["queue1", "queue2", ...] and implement handle_message/3

1 Like

I may be missing some complexity, but can’t you just pass the variable bits into the opts param of start_link that you’re not using?

The problem is, for different queues, there are different message handling logic. Maybe we can pack each queue name and the corresponding message handling into a module, and pass that module to a “factory function” that creates the process.

I tried the metaprogramming suggestion, works too and looks even cleaner!

Thanks.

I’ve added workers like this:

defmacro __using__({queue_names, workers}) do

....

      processors: [
        default: [
          concurrency: unquote(workers)
        ]
      ]

And now call it like this:

use MQWorker, {["messages2"], 10}

Seems to work.