How would you implement a rate limiter for API requests?

I’d like to maximize my available tokens / minute for my Azure deployments of OpenAI LLMs. My Elixir app submits HTTP requests to a single LLM Proxy Server for all my deployments. I’d like to implement a module in my Elixir app which ensures I have am maximizing the available throughput. e.g. I have 10 deployments of gpt-4o-mini, at 2M tokens/minute. Ideally I’d be utilizing all 2M * 10 = 20M tokens / minute.

My Elixir app uses the LLM Proxy Server to perform knowledge extraction on a huge number of documents in parallel. I split documents into pages. Each page is submitted in a request to the LLM Proxy Server. e.g. I want to parse 100 documents in parallel, each might have 100 pages. So without any throttling, there would be 100 x 100 = 10,000 requests in parallel. This will definitely cause a rate limiting error.

I’d like to avoid firing off all 10,000 requests at the same time, to avoid excessive rate limiting errors, and limit memory usage to track all pending requests. e.g. I might only allow 100 in-flight requests at any time. If there are already 100 requests in-flight, I would block the caller. Similar to how HTTP/DB clients implement connection pooling.

In the past, I’ve implemented this with Oban and two queues.

  1. Queue for handling incoming documents (limit: infinite)
  2. Queue for handling requests to LLM Proxy Server (limit: 100)

How would you solve this?

2 Likes

Unless I’m misunderstanding the issue, this book is probably the best resource for this sort of question specific to Elixir:

I would suggest you look at Flow or Broadway.

1 Like

Those might be good options. Thanks for the suggestions.

I’d like something as simple as Task.async_stream() for concurrently processing the pages, which takes into account how many documents are concurrently processing.

e.g.

10 documents. 1 Oban job per document
Each Oban job concurrently processes 10 pages, so 10 x 10 = 100 pages total,
but only 25 pages are in-flight at any time.

I could use Task.async_stream for that, but then 100 requests would be sent all at once, when I’d rather have at most 25, and block the rest. Does that make sense?

1 Like

Hi @bgoosman,

This is precisely the kind of use case that I created the external_service package for. You can use the ExternalService.call_async_stream/5 function to call external APIs concurrently while still respecting rate limits for the API.

Have a look and see if it fits your use case, and let me know if you have any questions.

https://hexdocs.pm/external_service/

9 Likes

The ExternalService.start(:some_external_service, rate_limit: {10, 1000}) pattern looks useful. I could estimate how many requests per minute are allowed, based on the workload, and set the limit appropriately. Thanks!

2 Likes

The answer is most likely GenStage.

Depending on whether your strategy is a side-effect (aka “put-and-forget”) or you are interested in the results, the actual solution would be different.

I have implemented the throttler on top of GenStage as a part of finitomata which basically mimics GenServer.call/2 but with throttling.

You might either try it as is or borrow the code/ideas from there.

3 Likes

Hammer might be another option.

3 Likes

Interesting.

For this use case, I’d like to do something with the results. So a blocking solution would be best.

Thank you for offering finitomata. This pattern might fit my need.

{:ok, pid} = Finitomata.Throttler.start_link(name: Throttler)

Finitomata.Throttler.call(
  Throttler,
  {MyApiClient, :do_expensive_work, [my_data]}
)

with child spec

{Finitomata.Throttler, name: name, initial: [], max_demand: 10, interval: 1_000}

Seems very similar to the external_service lib offered by @jvoegele.

ExternalService.start(:some_external_service, rate_limit: {10, 1000})

I like the flexibility offered by Hammer. I think this pattern might work for me.

defmodule MyApp.KnowledgeExtraction do

  def do_extraction(document) do
    # reduce_while?
    case Hammer.check_rate("knowledge_extraction", 60_000, 5) do
      {:allow, _count} ->
        # do the knowledge extraction
      {:deny, _limit} ->
        # sleep
    end
  end

end
2 Likes

Last August I implemented a rate limiter to prevent users of the sister app of Breek.gr, Breek.gr Managers from abusing my API access to the Greek business registry lookup by the Tax ID by using the website to perform too many lookups (since the public API is limited to a certain number of request per month).

I didn’t want to use Hammer yet, so I did it from scratch by using an Agent. Here’s my code, unchanged, in case anyone wants to see an alternative approach. I am running a cache with Cachex.

defmodule Managers.Directory.RateLimiter do
  use Agent
  require Logger
  alias Managers.Directory.TinChecker

  @cache :tincheck

  # the format of each rate window tuple is:
  # {last x minutes this limit applies to, limit of number of requests in this window }
  @windows [
    {1, 2},
    {2, 3},
    {4, 3},
    {7, 3},
    {15, 4},
    {30, 4},
    {60, 5}
  ]

  def start_link(_opts \\ nil) do
    Agent.start_link(fn -> %{} end, name: __MODULE__)
  end

  def get_all do
    Agent.get(__MODULE__, fn state -> state end)
  end

  def get_for_user(username) do
    Agent.get(__MODULE__, fn state -> Map.get(state, username) end)
  end

  def reset_for_user(username) do
    Agent.update(__MODULE__, fn state -> %{state | username => []} end)
  end

  def gsis(tin, username) do
    maybe_initialize_for_user(username)

    if permitted?(username) do
      do_gsis(tin, username)
    else
      {:error, :rate_limited}
    end
  end

  def do_gsis(tin, username) do
    with {:cache, {:ok, nil}} <- {:cache, Cachex.get(@cache, tin)},
         {:tin_format, true} <- {:tin_format, Regex.match?(~r/^\d{9}$/, String.trim(tin || ""))},
         {:vies, {:ok, m}} when is_map(m) <- {:vies, TinChecker.vies(tin)},
         {:gsis, {:ok, _} = fresh} <- {:gsis, TinChecker.gsis(tin)} do
      log_request_for(username)
      fresh
    else
      {:cache, {:ok, cached}} ->
        Logger.info("Cache hit for Tax ID #{tin}")
        cached

      {:tin_format, false} ->
        {:error, :invalid_tin_format}

      {:vies, {:error, _} = vies_with_error} ->
        Cachex.put(@cache, tin, vies_with_error)
        vies_with_error

      {_, {:error, reason}} ->
        {:error, reason}
    end
  end

  def log_request_for(username) do
    so_far = get_for_user(username)

    horizon = @windows |> Map.new() |> Map.keys() |> Enum.max()

    expired_removed =
      [DateTime.utc_now() | so_far]
      |> filter_in_last(horizon)

    Agent.update(__MODULE__, fn state -> %{state | username => expired_removed} end)
  end

  def maybe_initialize_for_user(username) do
    if is_nil(get_for_user(username)) do
      Agent.update(__MODULE__, fn state -> Map.put(state, username, []) end)
    end
  end

  def get_in_last(username, minutes) do
    username
    |> get_for_user()
    |> filter_in_last(minutes)
  end

  def filter_in_last(hits, minutes, t_ref \\ DateTime.utc_now())
      when is_list(hits) and is_integer(minutes) do
    hits
    |> Enum.filter(fn t ->
      DateTime.diff(t_ref, t, :minute) < minutes
    end)
  end

  def permitted?(username) do
    maybe_initialize_for_user(username)

    limits = Map.new(@windows)
    keys = Map.keys(limits) |> Enum.sort() |> Enum.reverse()

    now = DateTime.utc_now()
    initial = get_in_last(username, Enum.max(keys))

    Enum.reduce(
      keys,
      {true, initial},
      fn window, acc ->
        {ok?, filtered} = acc

        filtered = filter_in_last(filtered, window, now)
        ok? = ok? and length(filtered) <= limits[window]

        {ok?, filtered}
      end
    )
    |> elem(0)
  end

  def windows, do: @windows
end
3 Likes

What do you think about using Poolboy on your proxy server? A process needs to be checked out from the pool in order to make a request to the API. If request sizes and response times are reasonably uniform, you could tweak the pool size until you hit really high utilization. Also, if you Process.sleep after getting a rate limit error, it would organically slow down your request rate by holding on to a process from the pool.

Another unrelated idea:
If the API returned some headers about your current rate limit utilization you have some options to write that to ETS and read it from other processes to decide if they need to wait.

Another unrelated idea:
If the API doesn’t count rate limit error responses against your rate limit utilization, you can make requests until you get a rate limit error response, implement retry, and then use that to decide if you need to slow down your request rate.

2 Likes

Poolboy fits the bill too, thanks!