Rate-limiting consumer to throttle external connections in Broadway?

GenStage includes an example of building a rate-limiter in a consumer: gen_stage/rate_limiter.exs at main · elixir-lang/gen_stage · GitHub

Although Broadway’s docs mention rate-limiting as a feature, I can’t find concrete examples of actually using it. The “rate-limiting” that Broadway is referring to seems to be rate-limiting of the demand (i.e. within the producer), not rate-limiting of the consumer.

I was able to adapt the GenStage example to build a solution that would throttle outgoing requests made to an external API (to obey their threshold rules), and this has been working well. However, I’d like to compare my solution to a Broadway implementation because Broadway has a lot nice features (including, importantly, being designed and peer-reviewed by people way smarter than me).

Does Broadway support rate-limiting of the consumer like this? Thanks in advance!

Ah, I found it. You have to set the rate_limiting option, e.g. in your processor’s start_link:

  def start_link(_opts) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {LtdProducer, 0},
        transformer: {LtdProducer, :transform, []},
        rate_limiting: [allowed_messages: 10, interval: 60_000]
      ],
      processors: [
        default: [concurrency: 1]
      ],
      batchers: [
        default: [concurrency: 1, batch_size: 100, batch_timeout: 2000]
      ]
    )
  end

And then the LtdProducer POC looks like this (adapted from the Counter example):

defmodule LtdProducer do
  @moduledoc """
  Pretend this is connecting to a web site that has a rate-limiting threshold that
  we shall not exceed.
  Bogarting the `Counter` example from https://hexdocs.pm/broadway/custom-producers.html#example
  """
  use GenStage

  alias Broadway.Message

  def start_link(number) do
    GenStage.start_link(LtdProducer, number)
  end

  def init(counter) do
    {:producer, counter}
  end

  def handle_demand(demand, counter) when demand > 0 do
    IO.puts("HANDLING DEMAND for #{demand} events at #{DateTime.utc_now()}")
    events = Enum.to_list(counter..(counter + demand - 1))
    {:noreply, events, counter + demand}
  end

  # Not part of the behavior, but Broadway req's that we translate the genstage events
  # into Broadway msgs
  def transform(event, _opts) do
    %Message{
      data: event,
      acknowledger: {__MODULE__, :ack_id, :ack_data}
    }
  end

  def ack(:ack_id, _successful, _failed) do
    # Write ack code here
  end
end

Starting the processor, you can watch as “requests” are slowly simulated obeying the rate-limiting. Easy peasy!

2 Likes