Could you explain min_demand and max_demand in GenStage?

I’m trying to wrap my head around GenStage’s min_demand and max_demand but after going through existing topics, blog posts and official documentation, it is still not very clear to me how to think about them and how they work in practice.

I’m experimenting with the simplest possible case: a producer and a single consumer:

defmodule Producer do
  use GenStage
  require Logger

  def start_link(_args) do
    GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def init(:ok) do
    {:producer, nil}
  end

  def handle_demand(demand, state) do
    Logger.info("Demand: #{demand}")

    events = Enum.to_list(1..850)

    Logger.info("Producer: #{Enum.count(events)}")
    {:noreply, events, state}
  end
end
defmodule Consumer do
  use GenStage
  require Logger

  def start_link(_args) do
    GenStage.start_link(__MODULE__, :ok)
  end

  def init(:ok) do
    {:consumer, nil, subscribe_to: [Producer]}
  end

  def handle_events(events, _from, state) do
    Logger.info("Processing: #{Enum.count(events)}")
    Process.sleep(2000)
    {:noreply, [], state}
  end
end

The default values for min_demand and max_demand are 500 and 1000 respectively, and the Producer in this case will constantly satisfy demand by producing 850 events. The initial demand seems to be always equivalent to max_demand.

So far so good, but after the first demand, things get a bit confusing. Running the producer and consumer in an application produces the following:

15:40:45.131 [info]  Demand: 1000
15:40:45.133 [info]  Producer: 850
15:40:45.134 [info]  Processing: 500

15:40:47.135 [info]  Processing: 350
15:40:47.135 [info]  Demand: 500
15:40:47.135 [info]  Producer: 850

15:40:49.136 [info]  Processing: 500

15:40:51.137 [info]  Processing: 150
15:40:51.137 [info]  Demand: 650
15:40:51.137 [info]  Producer: 850

15:40:53.138 [info]  Processing: 200

15:40:55.139 [info]  Processing: 500

15:40:57.140 [info]  Processing: 150
15:40:57.140 [info]  Demand: 650
15:40:57.140 [info]  Producer: 850

15:40:59.141 [info]  Processing: 200

15:41:01.142 [info]  Processing: 500

15:41:03.143 [info]  Processing: 150
15:41:03.143 [info]  Demand: 650
15:41:03.143 [info]  Producer: 850

(I grouped lines them by timestamp manually to make it a bit easier to follow)

I can’t really follow the logic of the above output and make sense of it. Particularly, why after sending demand for 1000 it will go to 500 and then settle at 650? Also why batches would be processed in this quantity and order: initially 500 and 350, and then 500 and 150 and finally 200, 500, 150?

For the record, I have looked at the following resources, besides the official documentation:

4 Likes

Welcome to the forum!

You are not storing remaining demand - so that you can release it later when it can be satisfied.

Have a look at:

3 Likes

Thank you @peerreynders!

It is true that the producer returns more events than the demand sent by the consumer, but I still do not understand the reasoning behind the output I see. I had a look at the topic you included but I do not see the connection between my use case and the replies there.

Could you please elaborate further? Thanks!

1 Like
  • The initial demand is 1000 (max_demand).
  • The producer releases 850. The producer owes 150 events.
  • GenStage releases 500 (min_demand) of the 850 events to the consumer.
  • Genstage releases the remaining 350 events to the consumer
  • As min_demand of the max_demand has already been consumed by the consumer 500 events (min_demand) are demanded by GenState to be buffered
  • The producer receives 500 demand while still owing 150. So demand is actually 650. Producer erroneously releases 850 events, exceeding demand by 200. Total demand is 1500 but producer released 1700.
  • Genstage releases 500 (min_demand) to consumer (1350 running total)
  • Genstage releases 150 events to consumer to total demand level of 1500.

I suspect that at this point things start to go off the rails because the producer is not adhering to the protocol (it’s releasing more events than are demanded).


4 Likes

Let’s do the math!

First I put the events, then I explain them. I may slightly reorder them, because they are running concurrently.

15:40:45.131 [info]  Demand: 1000
15:40:45.133 [info]  Producer: 850
  1. The consumer asks for 1000 elements, we send 850, which means the producer owns 150.
15:40:45.134 [info]  Processing: 500
15:40:47.135 [info]  Demand: 500
15:40:47.135 [info]  Processing: 350
  1. The consumer processes 500 elements (so it can ask for 500 more straight-away) and then the remaining 350. When it finishes processing the 350, its demand (the number of events it already asked for) is 650, so it doesn’t ask for more right now as it is above min demand.
15:40:47.135 [info]  Producer: 850 (200 buffered)
  1. Meanwhile the producer received the next 500 and sends 850. Of those 850, 650 are effectively sent (150 that we owned plus 500 asked) and the producer buffers the remaining 200. Those 200 are not sent.
15:40:49.136 [info]  Processing: 500
15:40:51.137 [info]  Demand: 650 (it is actually 850 but the producer serves 200 from buffer)
15:40:51.137 [info]  Processing: 150
  1. Of those 650 received, the consumer processes 500 and asks for 850 more (its demand was 650, it processed 500, lowering it to 150, which means it needs 850 to replenish it back to 1000) and then processes 150, which puts its demand back to 850.
15:40:51.137 [info]  Producer: 850 (200 buffered)
  1. The producer receives a demand for 850, but 200 was buffered, so it sends the 200 buffered immediately and treats the 650 as its new demand. Of this new demand, it sends 850, which means 200 will be buffered again.
15:40:53.138 [info]  Processing: 200
15:40:55.139 [info]  Processing: 500
15:40:57.140 [info]  Demand: 650 (it is actually 850 but the producer serves 200 from buffer)
15:40:57.140 [info]  Processing: 150
  1. The consumer has a current demand of 850, it receives 200 from the buffer which it processes straight-away, which means its demand is now 650. It then processes 500, which puts its demand to 150, so it asks for 850 more to put it back to 1000. Then it processes 150, putting its current demand back to 850.

  2. Go back to step 5.

It is actually fine for the processor to supply more events than it has asked for. It goes to a buffer (which you can set limits on). But you are 100% correct that a producer implementation must not “discard” demand.

12 Likes

Brilliant!! Thank you so much for the detailed response José, now I got it! It really clicked for me when you said that the producer buffers the remaining events and releases them later, somehow I was unaware of the built-in buffer options, that was the missing link for me!

I just want to step back from the example I picked and ask specifically about min_demand and max_demand to clear them up in my mind. The documentation says:

The :max_demand specifies the maximum amount of events that must be in flow while the :min_demand specifies the minimum threshold to trigger for more demand.

I think I understand max_demand but min_demand is bit trickier :smile: Where is threshold coming from, what does it represent exactly?

I have read elsewhere that the actual number of events a consumer will process could be at most max_demand - min_demand, so if they are set to 1000 and 750, after the initial 1000 demand, 250 will be processed and 250 will be requested. It seems counter-intuitive to me so I must be missing something. What is the right way to think about min_demand?

1 Like

The consumer keeps track of its “pending demand”, the number of demanded events that were still not processed. This “pending demand” (I put it in quotes because I made up the word in the attempt to explain the concept better) grows whenever more events are demanded, and shrinks whenever events are processed.

As long as this ”pending demand” is higher than min_demand, the consumer won’t ask for more. When, after processing enough events, this “pending demand” falls under min_demand, the consumer will ask for more, making sure not to exceed a “pending demand” of max_demand (so it will ask for max_demand - “pending demand” events).

So it’s not true that the consumer processes at most max_demand - min_demand events. There is one number that the consumer keeps track of, what I called “pending demand”, and this number is constantly compared with the min/max thresholds: if it goes lower than min_demand, that triggers a new demand, while max_demand constraints how many events are requested given the current “pending demand”.

Please correct me if I wrote something wrong :slightly_smiling_face:

5 Likes

Thank you @lucaong, this is very interesting! :clap: The concept of pending demand really clears up things for me. So this pending demand is basically granted by the producer, but not yet processed by the consumer, if I understand it correctly? And as soon as the producer can process the next amount of events, this pending demand is used straight away?

There is just one thing left that will complete the puzzle for me: how does GenStage decide how many events to process and how many will remain as pending? If it demands 1000 events initially and receives 1000 events, why does it split them in 500 for processing and 500 - pending? Is this where max_demand - min_demand comes into play? (sorry, I’ve seen this max_demand - min_demand formula in quite a few places and I’m trying to make sense of it!)

2 Likes

Pending demand has already been requested by the consumer - but has not yet been released by the producer. It is up to the producer to “remember” how much demand is still pending and release it to the consumer as soon as (some of) the pending demand can be delivered.

That is what I was referring to when I wrote “You are not storing remaining demand” in the producer’s state.

If you look at the GenStage documentation you should notice that the return values from most of the GenStage callback functions can release events to the consumer - not just handle_demand.

2 Likes

I don’t think max_demand - min_demand necessarily comes into play, but I might be wrong on that. One way to think about it that leads to that calculation is that, if events were consumed one by one, when the consumer processes enough events to reach the min_demand threshold, it would ask for more without exceeding max_demand. If the events were consumed one at a time (which in reality is not necessarily the case, as you can see in your test), the threshold would be crossed when the pending demand is exactly min_demand, so then max_demand - min_demand events would be requested. But that’s just because the “pending demand” is exactly equal to min_demand.

I don’t know why in your example a maximum of 500 events are processed at each time. I would guess that the implementation batches events in groups of 500 (not sure if that’s a constant or calculated), but maybe @josevalim can clarify that.

One additional note: I see that the term “pending demand” is also used in the official docs and in @peerreynders comment to mean a different concept, which can be confusing in light of what I wrote. In the docs, the producer side pending demand is the amount of demand that could not yet be fulfilled by the producer because of lack of events. The way to think about it is that producers and consumers both have to keep track of their tallies, but the internal consumer state and the producer state are independent and unrelated:

  • The producer keeps track of the available events that could be sent if demanded, as well as of the demand that could not yet be satisfied because there weren’t enough events at the time of the request (the “pending demand” on the producer side, completely different from the consumer side)

  • The consumer internally keeps track of its own “pending demand”, meaning how many events it already requested, but were not yet processed (maybe I should have called it “unprocessed demand”). It uses this to decide on when to request more events (based on min_demand), and how many to request (based on max_demand).

3 Likes

That’s pretty much it. We process in batches of 500 (max_demand - min_demand) because we know that after we process said amount, we can already ask for more, so this guarantees a consumer would be busy all the time because while it processes the rest the producer is already working on sending more.

One thing to note is that, when we have a pending demand of 650, a consumer could only process 150 and ask for more. The reason we don’t do that is because often producers can be more efficient when working with large demands. For example, in Amazon SQS, you are almost always better off asking for batches of 10 in 10 and not meal piece.

4 Likes