I have a simple scenario where my Phoenix controllers are sending some events to GenStage pipeline.
The stages are super simple, I just have a Producer and Consumer stages.
My producer code is similar to:
defmodule Producer do
use GenStage
def start_link(_) do
GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
end
def record_event(event, timeout \\ 5000) do
GenStage.call(__MODULE__, {:event, event}, timeout)
end
## Callbacks
def init(:ok) do
# Notice buffer_size is set to :infinity
{:producer, :state_does_not_matter, dispatcher: GenStage.BroadcastDispatcher, buffer_size: :infinity}
end
def handle_call({:event, event}, _from, state) do
{:reply, :ok, [event], state}
end
def handle_demand(_demand, state) do
{:noreply, [], state}
end
end
The controller does something like:
def index(conn, _) do
Producer.record_event(%{some: :event})
...
end
As you can clearly see, the code above has a possible significant bug, since I am setting buffer_size
to :infinity
.
If I set the buffer_size
to a number (default is 10_000), the events keep dropping once the buffer is exceeded in case I am receiving events at higher rate than consumer processes them. This is not my preference, as I would rather slow down or even crash the clients / web requests, rather than dropping an event.
I can work this around by stopping using the default GenStage’s buffer, and keep the queue of events in memory, which will be similar to QueuedBroadcaster
described in the docs: https://hexdocs.pm/gen_stage/GenStage.html#module-buffering-demand
However, there might be simpler solution that I am struggling to find. If there was a way to peek inside the default buffer and get it’s current length, I could do something like:
def handle_call({:event, event}, _from, state) do
if :queue.len(somehow.get.to.internal.buffer.here) < 10_000 do
{:reply, :ok, [event], state}
else
{:reply, :buffer_full, [], state}
end
end
The problematic part is reaching out to internal buffer from handle_call callback, it seems to be intentionally hidden from the programmer.
Is there a way I can reach the internal buffer from here?
I would love to retain the default implementation of buffer queue and simplicity of my producer, which gets pretty much ruined once I have to implement the buffer queue on my own.