GenStage - putting back-pressure on external processes sending events being sent to the Producer

I have a simple scenario where my Phoenix controllers are sending some events to GenStage pipeline.

The stages are super simple, I just have a Producer and Consumer stages.

My producer code is similar to:

defmodule Producer do
  use GenStage

  def start_link(_) do
    GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
  end 

  def record_event(event, timeout \\ 5000) do
    GenStage.call(__MODULE__, {:event, event}, timeout)
  end 

  ## Callbacks

  def init(:ok) do
    # Notice buffer_size is set to :infinity
    {:producer, :state_does_not_matter, dispatcher: GenStage.BroadcastDispatcher, buffer_size: :infinity}
  end 

  def handle_call({:event, event}, _from, state) do
    {:reply, :ok, [event], state}
  end 

  def handle_demand(_demand, state) do
    {:noreply, [], state}
  end 
end

The controller does something like:

def index(conn, _) do
   Producer.record_event(%{some: :event})
    ...
end

As you can clearly see, the code above has a possible significant bug, since I am setting buffer_size to :infinity.

If I set the buffer_size to a number (default is 10_000), the events keep dropping once the buffer is exceeded in case I am receiving events at higher rate than consumer processes them. This is not my preference, as I would rather slow down or even crash the clients / web requests, rather than dropping an event.

I can work this around by stopping using the default GenStage’s buffer, and keep the queue of events in memory, which will be similar to QueuedBroadcaster described in the docs: https://hexdocs.pm/gen_stage/GenStage.html#module-buffering-demand

However, there might be simpler solution that I am struggling to find. If there was a way to peek inside the default buffer and get it’s current length, I could do something like:

  def handle_call({:event, event}, _from, state) do
     if :queue.len(somehow.get.to.internal.buffer.here) < 10_000 do
       {:reply, :ok, [event], state}
     else
       {:reply, :buffer_full, [], state}
     end
  end 

The problematic part is reaching out to internal buffer from handle_call callback, it seems to be intentionally hidden from the programmer.

Is there a way I can reach the internal buffer from here?

I would love to retain the default implementation of buffer queue and simplicity of my producer, which gets pretty much ruined once I have to implement the buffer queue on my own.

1 Like

I guess the solution is to implement own buffering :). That’s good enough for me.