Retrying GenStage producer

We’re using GenStage in our current code base to push various events to 3rd party tools via HTTP APIs. Like every other API on the web the current 3rd party tool enforces various rate limiting policies on how much we can spam them. We’re using a rate limiter :producer_consumer stage which is basically a copy of the RateLimiter module included in the GenStage examples. This rate limiter works fine, however when the 3rd party API returns some error (which honestly is totally fine) everything goes south pretty fast. The problem is our :consumer stage: It’s pretty dump and just retries sending previously failed requests after a certain (random) timeout. After a while the failed requests pile up and we hit the enforced rate limits because besides sending the failed requests again, we also ask for more events from the :producer stage and send these new events as well.

I tried to create a separate :producer_consumer stage which connects between the RateLimiter and our Consumer however the problem is that this Retry module has no clue of the rate limiting happening in the RateLimiter module. I can ask for 999 instead of the default 1000 events when an error occurs, but how do I communicate to the previous producer-consumer that I want whatever demand it has minus 1?

Btw when my Retry module directly sends the failed events to the producer, I can handle it in its handle_info/3 callback and it just works™:

def handle_info({:fail, event}, state) do
  {:noreply, [event], state}
end

However this does not feel right, as I don’t want my producer-consumers to know where the data is coming from, and it stops working when I modify the data somewhere in the pipeline via another stage.

Got it kinda working by sending the RateLimiters demand down to the subscribed consumer in the handle_subscribe/4 callback. However the problem I’m now facing is that I don’t know how to propagate the demand from my Retry stage all the way up to the producer. The :manual subscription of the RateLimiter breaks the automatic demand propagation.

I’m now trying to combine both RateLimiter and Retry, but this feels for me like breaking SRP.

This is the solution I went for:

defmodule RetryingRateLimiter do
  use GenStage

  defstruct producer: nil, max_demand: 0, interval: 0, failed: []

  def init(_) do
    {:producer_consumer, %__MODULE__{}}
  end

  def handle_subscribe(:producer, opts, from, state)  do
    # We will only allow max_demand events every 5000 milliseconds
    max_demand = opts[:max_demand] || 1000
    interval = opts[:interval] || 5000

    # Register the producer in the state
    state = %__MODULE__{state | producer: from, max_demand: max_demand, interval: interval}
    # Ask for the pending events and schedule the next time around
    state = ask_and_schedule(state, from)

    # Returns manual as we want control over the demand
    {:manual, state}
  end

  def handle_subscribe(_, _, _, state), do: {:automatic, state}

  def handle_cancel(_, from, state) do
    {:noreply, [], %__MODULE__{state | producer: nil}}
  end

  def handle_events(events, from, %__MODULE__{failed: failed} = state) do
    state = %__MODULE__{state | failed: []}
    {:noreply, failed ++ events, state}
  end

  def handle_info({:fail, event}, state) do
    # Reschedule failed events
    state = %__MODULE__{state | failed: state.failed ++ [event] }
    {:noreply, [], state}
  end

  def handle_info({:ask, from}, state) do
    # This callback is invoked by the Process.send_after/3 message below.
    {:noreply, [], ask_and_schedule(state, from)}
  end

  defp ask_and_schedule(state, from) do
    GenStage.ask(from, state.max_demand - length(state.failed))
    Process.send_after(self(), {:ask, from}, state.interval)

    state
  end
end

In case a request fails the consumer sends the event back with:

Process.send(pid, {:fail, event}, [])