Testing a GenStage rate limiter

Hey all! I’m currently working to do some data processing which requires some rate limiting due to third party api restrictions and am struggling to properly test the rate limit. The producer and consumer are heavily influenced by the example in the elixir GenStage docs, https://github.com/elixir-lang/gen_stage/blob/master/examples/rate_limiter.exs

Here is my implementation below. Any insight as to the best approach for testing GenStages would be greatly appreciated, I’m pretty new to the abstraction.

defmodule RateLimiterProducer do
  import Ecto.Query

  def init(args) do
    {:producer, args}
  end

  def start_link(args \\ []) do
    GenStage.start_link(__MODULE__, args, name: __MODULE__)
  end

  def handle_demand(demand, args) when demand > 0 do
    query = Keyword.fetch!(args, :query)
    offset = Keyword.fetch!(args, :offset)

    events = query |> limit(^demand) |> offset(^offset) |> order_by(:id) |> Repo.all()

    if events == [] do
      {:stop, :normal, args}
    else
      {:noreply, events, [query: query, offset: offset + demand]}
    end
  end
end

defmodule RateLimiterConsumer do
  use GenStage

  def init(args \\ %{}) do
    {:consumer, args}
  end

  def start_link(args \\ %{}) do
    GenStage.start_link(__MODULE__, args)
  end

  def handle_subscribe(:producer, opts, from, state) do
    pending = opts[:max_demand] || 1000
    interval = opts[:interval] || 5000

    state = Map.put(state, from, {pending, interval})
    state = ask_and_schedule(state, from)

    {:manual, state}
  end

  def handle_cancel(_, from, state) do
    {:noreply, [], Map.delete(state, from)}
  end

  def handle_events(events, from, state) do
    state =
      Map.update!(state, from, fn {pending, interval} ->
        {pending + length(events), interval}
      end)

    func = Map.get(state, :func)
    func.(events)

    {:noreply, [], state}
  end

  def handle_info({:ask, from}, state) do
    {:noreply, [], ask_and_schedule(state, from)}
  end

  defp ask_and_schedule(state, from) do
    case state do
      %{^from => {pending, interval}} ->
        GenStage.ask(from, pending)
        Process.send_after(self(), {:ask, from}, interval)
        Map.put(state, from, {0, interval})

      %{} ->
        state
    end
  end
end
1 Like