I have a use case that may not be uncommon.
I read messages from RabbitMQ with Broadway. In handle_message I take messages and process them.
My problem occurs when due to a bug in the code or invalid data the message can not be processed successfully and crashes. Broadway marks the messages as failed. RabbitMQ sends it again and again in an infinite loop making the CPU go to 100% and flooding our bug tracker with messages.
How can I have at least a timeout in the Broadway pipeline until it tries to take another set of messages? I was thinking some exponential retry. I found resubscribe_interval but this is related to producer crashing not consumers.
My solution by now is to put a :timer.sleep() in a rescue clause but I wonder if there is a better way:
defmodule OutcomesConsumer do
use Broadway
alias Broadway.Message
def start_link do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module:
{BroadwayRabbitMQ.Producer,
queue: RabbitMQUtils.outcomes_queue(),
qos: [
prefetch_count: 6
],
connection: rabbitmq_url()},
concurrency: 1
],
processors: [
default: [
min_demand: 1,
max_demand: 6,
concurrency: 2
]
]
)
end
def handle_message(_, %Message{data: string_data} = message, _) do
data = Jason.decode!(string_data)
# process data in another module
message
rescue
e ->
Logger.error("Error reading outcome from RabbitMQ: #{inspect(e)} ")
Bugsnag.report(...)
:timer.sleep(4_000)
reraise e, __STACKTRACE__
end
end