GenStage - dynamic management of consumers

Hi, I am currently experimenting with my first GenStage setup. I have one producer and many consumers. Each consumer performs multiple database INSERT queries once new events arrive. This works fine given constant load however once there more and more incoming events consumers don’t catch up and eventually producer’s buffer gets overflown. There are a lot of following messages/warnings:

GenStage producer EventsCollector has discarded 100 events from buffer

The first thought I came up with was simply increasing producer’s max_buffer_size. However this is not really a scalable solution. The next thing I thought of was usage of DynamicSupervisor to manage instantiation and termination of GenStage consumers. My question is would it be possible to spawn new consumers once buffer is filled up to a certain size and then terminate consumers once there are no incoming events for a longer period of time?

Or maybe there are other approaches, besides DynamicSupervisor which could help to achieve this goal?

Current code (DynamicSupervisor not used yet):

defmodule EventsCollector do
  use GenStage

  def start_link(_state) do
    GenStage.start_link(__MODULE__, :ok, name: EventsCollector)
  end

  def add(events) do
    GenStage.cast(EventsCollector, {:add, events})
  end

  def init(:ok) do
    storage = Application.fetch_env!(:app_name, :storage)
    gen_stage = Keyword.fetch!(storage, :gen_stage)
    max_buffer_size = Keyword.fetch!(gen_stage, :max_buffer_size)

    # Run as producer and specify the max amount of events to buffer.
    {:producer, :ok, buffer_size: max_buffer_size}
  end

  def handle_cast({:add, events}, state) do
    # These will be buffered if there are no consumers ready.
    {:noreply, events, state}
  end

  def handle_demand(incoming_demand, state) do
    # Do nothing. Events will be dispatched as-is.

    {:noreply, [], state}
  end
end
defmodule EventsSaver do
  use GenStage

  def start_link(_state) do
    consumers_count = 10

    consumers =
      Enum.reduce(1..consumers_count, [], fn _i, consumers ->
        [GenStage.start_link(__MODULE__, :ok) | consumers]
      end)

    List.last(consumers) # PID of last consumer
  end

  def init(:ok) do
    {:consumer, :ok, subscribe_to: [{EventsCollector, min_demand: 800, max_demand: 1_000}]}
  end

  def handle_info(_, state), do: {:noreply, [], state}

  def handle_events(events, _from, state) when is_list(events) and length(events) > 0 do
    # We got some events from the EventsCollector. Let’s save them in DB.

    # Code which save events to Database, multiple insert queries involved as events are complex data structures	

    {:noreply, [], state}
  end

  def handle_events(_, _, state), do: {:noreply, [], state}
end

There’s a couple things you can do.
One thing is lower the minimum demand, or else your consumers will wait until they receive 800 events before they start processing. Another thing you can do is start your 10 consumers from your application.ex file with an id.

def start(_type, _args) do
  children = [
    EventsCollector,
    Supervisor.child_spec(EventsSaver, id: :consumer_a),
    Supervisor.child_spec(EventsSaver, id: :consumer_b),
    ... # 10 times
  ]

  opts = [strategy: :one_for_one, name: Events.Supervisor] # <- Or whatever your app is called
  Supervisor.start_link(children, opts)
end

Then your consumer start_link will look like this:

def start_link(_args) do
  GenStage.start_link(__MODULE__, :ok)
end

Lowering your min_demand and max_demand for each one of those consumers will increase the concurrency of the overall app because like I was saying above, having a min_demand of 0 will start processing events immediately as they are available instead of waiting for the queue to reach 800, and lowering the max_demand will ensure less than 1000 events will be processed synchronously by a single consumer, so maybe try something like 10 at first or even lower.

1 Like