Hi, I am currently experimenting with my first GenStage setup. I have one producer and many consumers. Each consumer performs multiple database INSERT queries once new events arrive. This works fine given constant load however once there more and more incoming events consumers don’t catch up and eventually producer’s buffer gets overflown. There are a lot of following messages/warnings:
GenStage producer EventsCollector has discarded 100 events from buffer
The first thought I came up with was simply increasing producer’s max_buffer_size
. However this is not really a scalable solution. The next thing I thought of was usage of DynamicSupervisor
to manage instantiation and termination of GenStage consumers. My question is would it be possible to spawn new consumers once buffer is filled up to a certain size and then terminate consumers once there are no incoming events for a longer period of time?
Or maybe there are other approaches, besides DynamicSupervisor
which could help to achieve this goal?
Current code (DynamicSupervisor
not used yet):
defmodule EventsCollector do
use GenStage
def start_link(_state) do
GenStage.start_link(__MODULE__, :ok, name: EventsCollector)
end
def add(events) do
GenStage.cast(EventsCollector, {:add, events})
end
def init(:ok) do
storage = Application.fetch_env!(:app_name, :storage)
gen_stage = Keyword.fetch!(storage, :gen_stage)
max_buffer_size = Keyword.fetch!(gen_stage, :max_buffer_size)
# Run as producer and specify the max amount of events to buffer.
{:producer, :ok, buffer_size: max_buffer_size}
end
def handle_cast({:add, events}, state) do
# These will be buffered if there are no consumers ready.
{:noreply, events, state}
end
def handle_demand(incoming_demand, state) do
# Do nothing. Events will be dispatched as-is.
{:noreply, [], state}
end
end
defmodule EventsSaver do
use GenStage
def start_link(_state) do
consumers_count = 10
consumers =
Enum.reduce(1..consumers_count, [], fn _i, consumers ->
[GenStage.start_link(__MODULE__, :ok) | consumers]
end)
List.last(consumers) # PID of last consumer
end
def init(:ok) do
{:consumer, :ok, subscribe_to: [{EventsCollector, min_demand: 800, max_demand: 1_000}]}
end
def handle_info(_, state), do: {:noreply, [], state}
def handle_events(events, _from, state) when is_list(events) and length(events) > 0 do
# We got some events from the EventsCollector. Let’s save them in DB.
# Code which save events to Database, multiple insert queries involved as events are complex data structures
{:noreply, [], state}
end
def handle_events(_, _, state), do: {:noreply, [], state}
end