OffBroadwayRedisStream - A Redis Stream consumer for Broadway

I’m tinkering around the idea of processing redis-stream messages in Elixir and this is one solution.

off_broadway_redis_stream is a redis-stream consumer for Broadway.

It acts as a consumer within the redis-stream-consumer-group. Supports failover by automatically claiming pending messages of the dead consumer.

Rationale

  • a way to process redis-stream messages, see more about redis-streams here
  • for background job processing (among other use-cases). Note that there is a subtle difference in approach when compared to traditional sidekiq like job-processing setup. With Broadway our setup is akin to event-processing rather than “async remote procedure call”, this might not be important most of the time, but separating event and processing helps in use-cases such as “single event - multiple different processor”. Also note that this is not a drop-in replacement to existing solutions yet since it is missing important features such as retry-with-backoff (more on this later).
  • piggyback on Broadway tooling and approach.

Example:

defmodule MyBroadway do
  use Broadway

  def start_link(_opts) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module:
          {OffBroadwayRedisStream.Producer,
           [
             redis_client_opts: [host: "localhost"],
             stream: "orders",
             group: "processor-group",
             consumer_name: hostname()
           ]}
      ],
      processors: [
        default: [min_demand: 5, max_demand: 1000]
      ]
    )
  end

  def handle_message(_, message, _) do
    [_id, key_value_list] = message.data
    IO.inspect(key_value_list, label: "Got message")
    message
  end

  @max_attempts 5

  def handle_failed(messages, _) do
    for message <- messages do
      if message.metadata.attempt < @max_attempts do
        Broadway.Message.configure_ack(message, retry: true)
      else
        [id, _] = message.data
        IO.inspect(id, label: "Dropping")
      end
    end
  end

  defp hostname do
    {:ok, host} = :inet.gethostname()
    to_string(host)
  end
end

Acknowledgments & Retries

Both successful and failed messages are acknowledged by default. Use handle_failure callback to handle failures by moving messages to other stream, or to schedule retry using sorted-set or to persist failed jobs etc. Currently, library does not provide any retry strategy precisely because there are multiple solutions which differ depending on use case and it’s left to consumer to implement any such logic (but this might change in future)

But it does provide simple retry strategy using Broadway.Message.configure_ack/2 to handle simple external failures (network error etc). For this user has to explicitly configure to retry the particular failed message. Configured message will be attempted again in next batch.

Message.configure_ack(message, retry: true)

For more information and configuration see module documentation.

Github: off_broadway_redis_stream

2 Likes