How to use GenStage for limiting remote requests by count per time?

Currently, I’m implementing a producer Client for receiving events which would call remote API.
I wrote a consumer Limiter. It’s supposed to take maximum limited batch of events allowed at certain period of time.

But sometimes, I received all events from producer to Limiter.handle_events without any limits.
That’s because there is no restriction of max events DemandDispatcher.

I wrote my own dispatcher for that case, but I had the feeling that I was using wrong GenStage for that case.

Could somebody help me to do it better?

defmodule Client do
  def write(message) do
    GenStage.cast(__MODULE__, {:write, message})
  end

  def init(:ok) do
    {:producer, :ok}
  end
  
  def handle_cast({:write, message}, state) do
    {:noreply, [{:write, message}], state}
  end
end
defmodule Limiter do
  defstruct [:max_demand, :max_interval, :producer_from, :scheduled_at, :get_events]

  def init(options) do
    state = %__MODULE__{
      max_demand: options[:max_demand] || 5,
      max_interval: options[:max_interval] || 5_000,
      get_events: 0,
      scheduled_at: nil
    }
    {:consumer, state, subscribe_to: [{GSS.Client, subscribe_options(state)}]}
  end
  
  def handle_events(events, from, state) do
    state = state
    |> Map.update!(:get_events, &(&1 + length(events)))
    |> ask_and_schedule()
    
    # process events
    
    {:noreply, [], state}
  end

  def handle_info({:ask, from}, state) do
    {:noreply, [], ask_and_schedule(state)}
  end

  defp subscribe_options(state) do
    [max_demand: state.max_demand, min_demand: 1]
  end

  defp ask_and_schedule(state) do
    now = :erlang.timestamp()
    interval_expired? = state.scheduled_at != nil && :timer.now_diff(now, state.scheduled_at) <= state.max_interval * 1000
    limited_events? = state.get_events >= state.max_demand
    
    cond do
      interval_expired? && !limited_events? ->
        GenStage.ask(state.producer_from, state.max_demand)
        %{state | get_events: 0, scheduled_at: nil}

      !interval_expired? && !limited_events? ->
        GenStage.ask(state.producer_from, state.max_demand)
        if state.scheduled_at == nil,
        do: Map.put(state, :scheduled_at, now),
        else: state
        
      interval_expired? && limited_events? ->
        Process.send_after(self(), {:ask, state.producer_from}, state.max_interval)
        %{state | get_events: 0, scheduled_at: nil}

      !interval_expired? && limited_events? ->
        Process.send_after(self(), {:ask, state.producer_from}, state.max_interval)
        %{state | get_events: 0, scheduled_at: nil}
    end
  end 
end

I simplified it. It looks better now.

But now I have periodic calls of functions in the interval.

Process.send_after(self(), :ask, state.interval)

Maybe someone know how to build it without periodically asking.

defmodule Client.Limiter do
  use GenStage

  defstruct [:max_demand, :max_interval, :producer,
             :scheduled_at, :taked_events, :interval]

  def start_link(args \\ []) do
    GenStage.start_link(__MODULE__, args)
  end

  ## Callbacks

  def init(args) do
    state = %__MODULE__{
      max_demand: args[:max_demand] || 5,
      max_interval: args[:max_interval] || 5_000,
      interval: args[:interval] || 100,
      taked_events: 0,
      scheduled_at: nil
    }
    sync_offset = args[:sync_offset] || 0
    subscribe_to = args[:client] || [GSS.Client]

    Process.send_after(self(), :ask, sync_offset)

    {:consumer, state, subscribe_to: subscribe_to}
  end

  # Set the subscription to manual to control when to ask for events
  def handle_subscribe(:producer, _options, from, state) do
    {:manual, Map.put(state, :producer, from)}
  end

  def handle_events(events, _from, state) do
    Enum.map(events, fn({kind, from, message})-> message end)
    |> Enum.join(" ")
    |> IO.inspect(label: "#{Time.utc_now}..............handle_events")

    state = state
    |> Map.update!(:taked_events, &(&1 + length(events)))
    |> schedule_counts()

    {:noreply, [], state}
  end

  def handle_info(:ask, state) do
    {:noreply, [], ask_and_schedule(state)}
  end

  def ask_and_schedule(state) do
    cond do
      limited_events?(state) ->
        Process.send_after(self(), :ask, state.max_interval)
        clear_counts(state)

      interval_expired?(state) ->
        GenStage.ask(state.producer, state.max_demand)
        Process.send_after(self(), :ask, state.interval)
        clear_counts(state)

      true ->
        GenStage.ask(state.producer, state.max_demand)
        Process.send_after(self(), :ask, state.interval)
        schedule_counts(state)
    end
  end

  defp limited_events?(state) do
    state.taked_events >= state.max_demand
  end

  defp interval_expired?(%__MODULE__{scheduled_at: nil}), do: false
  defp interval_expired?(%__MODULE__{scheduled_at: scheduled_at, max_interval: max_interval}) do
    now = :erlang.timestamp()
    :timer.now_diff(now, scheduled_at) >= max_interval * 1000
  end

  defp clear_counts(state) do
    %{state | taked_events: 0, scheduled_at: nil}
  end

  defp schedule_counts(%__MODULE__{scheduled_at: nil} = state) do
    %{state | scheduled_at: :erlang.timestamp()}
  end
  defp schedule_counts(state), do: state
end
defmodule Client do
  use GenStage

  defstruct [:queue]

  def start_link do
    GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def write(message, timeout \\ 50_000) do
    GenStage.call(__MODULE__, {:write, message}, timeout)
  end

  ## Callbacks

  def init(:ok) do
    state = %__MODULE__{
      queue: :queue.new()
    }
    {:producer, state}
  end

  def handle_call({:write, message}, from, state) do
    state = state
    |> Map.put(:queue, :queue.in({:write, from, message}, state.queue))

    {:noreply, [], state}
  end

  def handle_demand(demand, state) when demand > 0 do
    {events, queue} = take_from_queue(state.queue, demand, [])
    {:noreply, Enum.reverse(events), Map.put(state, :queue, queue)}
  end

  defp take_from_queue(queue, 0, events) do
    {events, queue}
  end
  defp take_from_queue(queue, demand, events) do
    case :queue.out(queue) do
      {{:value, {kind, from, event}}, queue} ->
        GenStage.reply(from, :ok)
        take_from_queue(queue, demand - 1, [{kind, from, event} | events])
      {:empty, queue} ->
        take_from_queue(queue, 0, events)
    end
  end
end

You might find this recent blog post by @chrismccord helpful.

Optimizing Your Elixir and Phoenix projects with ETS