GenStage includes an example of building a rate-limiter in a consumer: gen_stage/rate_limiter.exs at main · elixir-lang/gen_stage · GitHub
Although Broadway’s docs mention rate-limiting as a feature, I can’t find concrete examples of actually using it. The “rate-limiting” that Broadway is referring to seems to be rate-limiting of the demand (i.e. within the producer), not rate-limiting of the consumer.
I was able to adapt the GenStage example to build a solution that would throttle outgoing requests made to an external API (to obey their threshold rules), and this has been working well. However, I’d like to compare my solution to a Broadway implementation because Broadway has a lot nice features (including, importantly, being designed and peer-reviewed by people way smarter than me).
Does Broadway support rate-limiting of the consumer like this? Thanks in advance!
Ah, I found it. You have to set the rate_limiting
option, e.g. in your processor’s start_link
:
def start_link(_opts) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {LtdProducer, 0},
transformer: {LtdProducer, :transform, []},
rate_limiting: [allowed_messages: 10, interval: 60_000]
],
processors: [
default: [concurrency: 1]
],
batchers: [
default: [concurrency: 1, batch_size: 100, batch_timeout: 2000]
]
)
end
And then the LtdProducer
POC looks like this (adapted from the Counter
example):
defmodule LtdProducer do
@moduledoc """
Pretend this is connecting to a web site that has a rate-limiting threshold that
we shall not exceed.
Bogarting the `Counter` example from https://hexdocs.pm/broadway/custom-producers.html#example
"""
use GenStage
alias Broadway.Message
def start_link(number) do
GenStage.start_link(LtdProducer, number)
end
def init(counter) do
{:producer, counter}
end
def handle_demand(demand, counter) when demand > 0 do
IO.puts("HANDLING DEMAND for #{demand} events at #{DateTime.utc_now()}")
events = Enum.to_list(counter..(counter + demand - 1))
{:noreply, events, counter + demand}
end
# Not part of the behavior, but Broadway req's that we translate the genstage events
# into Broadway msgs
def transform(event, _opts) do
%Message{
data: event,
acknowledger: {__MODULE__, :ack_id, :ack_data}
}
end
def ack(:ack_id, _successful, _failed) do
# Write ack code here
end
end
Starting the processor, you can watch as “requests” are slowly simulated obeying the rate-limiting. Easy peasy!
3 Likes