Understanding `max_demand` and `min_demand` in `GenStage`

I am trying to understand in detail how GenStage works by following the documentation. I tried fiddling with max_demand and min_demand by changing their values and understand how they work under the hood. Below is my producer, producer_consumer and consumer code:

producer.ex

defmodule GenstageExample.Producer do
  use GenStage

  def start_link(initial \\ 0) do
    GenStage.start_link(__MODULE__, initial, name: __MODULE__)
  end

  def init(counter) do
    {:producer, counter}
  end

  def handle_demand(demand, state) do
    events = Enum.to_list(state..(state + demand))
    IO.inspect({"events", Enum.count(events)})
    {:noreply, events, state + demand}
  end
end

producer_consumer.ex

defmodule GenstageExample.ProducerConsumer do
  use GenStage

  def start_link(factor) do
    GenStage.start_link(__MODULE__, factor, name: __MODULE__)
  end

  def init(factor) do
    {:producer_consumer, factor, subscribe_to: [{GenstageExample.Producer, max_demand: 4, min_demand: 2}]}
  end

  def handle_events(events, _from, factor) do
    converted_events = Enum.map(events, & &1 * factor)
    {:noreply, converted_events, factor}
  end
end

consumer.py

defmodule GenstageExample.Consumer do
  use GenStage

  def start_link(useless) do
    GenStage.start_link(__MODULE__, useless)
  end

  def init(state) do
    {:consumer, state, subscribe_to: [{GenstageExample.ProducerConsumer , max_demand: 2, min_demand: 1}]}
  end

  def handle_events(events, _from, state) do

    for event <- events do
      IO.inspect({self(), event})
    end

    IO.puts("\n")
    :timer.sleep(1000)

    {:noreply, [], state}
  end
end

For consumer, I set the demand as
max_demand: 2, min_demand: 1

For producer_consmer, I set the demand as:
max_demand: 4, min_demand: 2

I see strange behaviour in the values printed by the consumer, using the above demand configuration:

Output on running mix run --no-halt

{"events", 5}
{"events", 2}
{"events", 2}
{#PID<0.175.0>, 0}
{#PID<0.176.0>, 2}




{#PID<0.175.0>, 1}
{#PID<0.176.0>, 3}
{"events", 2}




{#PID<0.175.0>, 4}
{#PID<0.176.0>, 4}
{"events", 2}




{#PID<0.175.0>, 5}
{#PID<0.176.0>, 5}
{"events", 2}




{#PID<0.175.0>, 6}
{#PID<0.176.0>, 6}
{"events", 2}

My doubts:

  1. If the max_demand for producer_consumer is 4, then why is the first request to the producer has the demand 5?
  2. Then, immediately after the producer produces 5 events, why isn’t it printed by the consumer? Why do I see two demands to the producer for 2 events each?
  3. Strangely, each consumer is printing the same event. This shouldn’t happen if the default dispatcher is DemandDispatcher. Why?

If anybody can help me understand how max_demand and min_demand in a simple, intuitive way, it would be really helpful.

Thanks.

3 Likes

For 1.

iex> 0..4 |> Enum.count
5

It seems to be off by one error, You start at 0 and end at 4, try to correct your list.

BTW we can see You are a python programmer from the name of your consumer :slight_smile:

iex(2)> length(Enum.to_list(0..4))
5
  1. If max_demand is 4 and min_demand is 2, the consumer will batch each run of handle_events by 2, because max_demand - min_demand is 2.

I agree that max_demand and min_demand are confusing/non-intuitive names. Unfortunately I don’t have a better suggestion but I would recommend that you read over this GitHub issue that discusses alternative names: https://github.com/elixir-lang/gen_stage/issues/201

1 Like
  1. But, ideally, since the first batch of events (first 5 events) are already generated, shouldn’t the consumers print those first before more events are produced? It seems over here, that no. of events in the buffer initially are 9 (5+2+2), instead of being just 5.

Okay, so I realise now that your scenario is more difficult than I thought at first. You have a producer_consumer with separate demands from the consumer, which makes it more difficult to reason about. In a lot of cases I expect you’d be fine just specifying demand in the final consumer, since it will propagate normally. But basically, the consumer_producer has 4 and 2, so it initially asks for 4 (producer always makes 1 too many, so it gets 5). It handles events in batches of 2, bringing the number of events down to 3, which is less than max demand of 4, so it asks for 1 more, and gets 2 (again, always 1 too many).

In turn, the final consumer has 2 and 1, so it asks for 2 initially, then batches handle events by 1 and asks for 1 more each time.

Could you add the code where you start the stages? How many consumers? 2? How many consumerproducers?

2 Likes

Oh, yes. I have two consumers. Here is my code

defmodule GenstageExample.Application do
  # See https://hexdocs.pm/elixir/Application.html
  # for more information on OTP Applications
  @moduledoc false

  use Application

  def start(_type, _args) do
    # List all child processes to be supervised
    children = [
      # Starts a worker by calling: GenstageExample.Worker.start_link(arg)
      # {GenstageExample.Worker, arg}
      {GenstageExample.Producer, 0},
      {GenstageExample.ProducerConsumer, 1},
      Supervisor.child_spec({GenstageExample.Consumer, []}, id: :consumer_1),
      Supervisor.child_spec({GenstageExample.Consumer, []}, id: :consumer_2)
    ]

    # See https://hexdocs.pm/elixir/Supervisor.html
    # for other strategies and supported options
    opts = [strategy: :one_for_one, name: GenstageExample.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Ok, I got it now. I changed my producer code to this:

defmodule GenstageExample.Producer do
  use GenStage

  def start_link(initial \\ 1) do
    GenStage.start_link(__MODULE__, initial, name: __MODULE__)
  end

  def init(counter) do
    {:producer, counter}
  end

  def handle_demand(demand, state) do
    events = Enum.to_list(state..(state + demand - 1))
    IO.inspect({"events", Enum.count(events)})
    {:noreply, events, state + demand}
  end
end

i.e started the state from 1 and producing the events that equals the demand. Also, my 3rd doubt in which the same event was consumed by both the consumers has been resolved.

I noticed when I was producing events “one too many”, I got the following sequence of events produced:

[0, 1, 2, 3, 4]
[4, 5]
[5, 6]

...so on

Since I was producing the same no. twice, it was being consumed twice.

Also, I tried specifying demand for just the consumer and removed it from producer_consumer. What I observed was the following.

{"events", 100}
{"events", 500}
{"events", 500}
..
{"events": 2}
{"events", 2}

The first batch of events produced was 500 and 1000 respectively. It uses the defaults in producer_consumer if nothing is specified.

Ah, that’s interesting! There’s so much to learn when it comes to GenStage :slight_smile: