GenStage - dynamic management of consumers

Hi, I am currently experimenting with my first GenStage setup. I have one producer and many consumers. Each consumer performs multiple database INSERT queries once new events arrive. This works fine given constant load however once there more and more incoming events consumers don’t catch up and eventually producer’s buffer gets overflown. There are a lot of following messages/warnings:

GenStage producer EventsCollector has discarded 100 events from buffer

The first thought I came up with was simply increasing producer’s max_buffer_size. However this is not really a scalable solution. The next thing I thought of was usage of DynamicSupervisor to manage instantiation and termination of GenStage consumers. My question is would it be possible to spawn new consumers once buffer is filled up to a certain size and then terminate consumers once there are no incoming events for a longer period of time?

Or maybe there are other approaches, besides DynamicSupervisor which could help to achieve this goal?

Current code (DynamicSupervisor not used yet):

defmodule EventsCollector do
  use GenStage

  def start_link(_state) do
    GenStage.start_link(__MODULE__, :ok, name: EventsCollector)
  end

  def add(events) do
    GenStage.cast(EventsCollector, {:add, events})
  end

  def init(:ok) do
    storage = Application.fetch_env!(:app_name, :storage)
    gen_stage = Keyword.fetch!(storage, :gen_stage)
    max_buffer_size = Keyword.fetch!(gen_stage, :max_buffer_size)

    # Run as producer and specify the max amount of events to buffer.
    {:producer, :ok, buffer_size: max_buffer_size}
  end

  def handle_cast({:add, events}, state) do
    # These will be buffered if there are no consumers ready.
    {:noreply, events, state}
  end

  def handle_demand(incoming_demand, state) do
    # Do nothing. Events will be dispatched as-is.

    {:noreply, [], state}
  end
end
defmodule EventsSaver do
  use GenStage

  def start_link(_state) do
    consumers_count = 10

    consumers =
      Enum.reduce(1..consumers_count, [], fn _i, consumers ->
        [GenStage.start_link(__MODULE__, :ok) | consumers]
      end)

    List.last(consumers) # PID of last consumer
  end

  def init(:ok) do
    {:consumer, :ok, subscribe_to: [{EventsCollector, min_demand: 800, max_demand: 1_000}]}
  end

  def handle_info(_, state), do: {:noreply, [], state}

  def handle_events(events, _from, state) when is_list(events) and length(events) > 0 do
    # We got some events from the EventsCollector. Let’s save them in DB.

    # Code which save events to Database, multiple insert queries involved as events are complex data structures	

    {:noreply, [], state}
  end

  def handle_events(_, _, state), do: {:noreply, [], state}
end