How to exponentially retry messages if Broadway consumer fails

I have a use case that may not be uncommon.
I read messages from RabbitMQ with Broadway. In handle_message I take messages and process them.
My problem occurs when due to a bug in the code or invalid data the message can not be processed successfully and crashes. Broadway marks the messages as failed. RabbitMQ sends it again and again in an infinite loop making the CPU go to 100% and flooding our bug tracker with messages.
How can I have at least a timeout in the Broadway pipeline until it tries to take another set of messages? I was thinking some exponential retry. I found resubscribe_interval but this is related to producer crashing not consumers.

My solution by now is to put a :timer.sleep() in a rescue clause but I wonder if there is a better way:

defmodule OutcomesConsumer do
  use Broadway

  alias Broadway.Message

  def start_link do
      name: __MODULE__,
      producer: [
           queue: RabbitMQUtils.outcomes_queue(),
           qos: [
             prefetch_count: 6
           connection: rabbitmq_url()},
        concurrency: 1
      processors: [
        default: [
          min_demand: 1,
          max_demand: 6,
          concurrency: 2

  def handle_message(_, %Message{data: string_data} = message, _) do
    data = Jason.decode!(string_data)
    # process data in another module
    e ->
      Logger.error("Error reading outcome from RabbitMQ: #{inspect(e)} ")


      reraise e, __STACKTRACE__

I will possibly be tackling this problem soon. We use SQS as our source, but the ideas are similar. I also solved this in Java in an earlier lifetime. My plan is to set up an extra dead-letter queue (or I might use an S3 bucket, but the concept is similar). The handle_failure callback of Broadway would need to write the failed message to that dead-letter queue and also ACK the original source.

You could then have some secondary producer that reads from the dead-letter queue at a more reasonable interval. Or it could be a manually initiated action to read from there and reprocess those failed messages.

This is one improvement indeed. But to move all messages from a queue to another without triggering the proper warnings is risky since nobody will know there is an issue.
I think if this becomes an occurring issues I will use this failed queue and consume then manually after the fix.

I am trying to tackle this and find an optimum solution. Feedback appreciated.

Presuming the queue is called messages my thoughts are:

Read from messages queue and process messages as fast as possible.
If processing fails use the handle_failed callback and move message to another queue failed_messages . Do not notify bugtracker yet in here since we have too high through-output.

In another Broadway pipeline read messages from failed_messages queue and process them at a slower rate. If processing fails again notify bugtracker and fail the message in Rabbit. This will cause an endless loop but at least we can use a slow processing rate (1 per sec for example)

I would like also to implement a mechanism to rate limit bugtracker notifications for a queue. Something with exponential delay.

As utilities I plan to write a helper module that can be called from console to move the messages back in the main queue after the fix is deployed.