How to specify Gen_stage demand

I’ve got a 3 stage gen_stage I’m using (producer, producer_consumer & consumer). I want to set the demand to be pretty high–1000+. So I am doing this to get them all started:

{:ok, producer} = Producer.start_link(0)
{:ok, producer_consumer} = ProducerConsumer.start_link(0)
{:ok, consumer} = Consumer.start_link()

GenStage.sync_subscribe(consumer, to: producer_consumer, 
    max_demand: 1500, interval: 10, min_demand: 1400)
GenStage.sync_subscribe(producer_consumer, to: producer, 
    max_demand: 1500, interval: 10, min_demand: 1400)

Process.sleep(:infinity)

However, when I dump out the demand inside the consumer stage, it is just 100. I was assuming that in the sync_subscribe the max_demand and min_demand options would ensure that demand would be between those two values.

Am I correct in that assumption? I’m beginning to think not. If not, how do I ensure that the Consumer the correct number of items to process?

Here are the extremely skeletal implementations of the stages:

defmodule Producer do
  use GenStage

  def start_link(page), do: GenStage.start_link(Producer, page)
  
  def init(page), do: {:producer, page}

  def handle_demand(demand, offset) when demand > 0 do
    # get some events
    {:noreply, events, offset + demand}
  end
end

defmodule ProducerConsumer do
  use GenStage

  def start_link(page), do: GenStage.start_link(ProducerConsumer, page)

  def init(number), do: {:producer_consumer, number}
 
  def handle_events(events, _from, number) do
    # get demand items
    {:noreply, items, number}
  end
end

defmodule Consumer do
  use GenStage

  def start_link(), do: GenStage.start_link(EventBridge.Migrate.Consumer, :ok)

  def init(:ok), do: {:consumer, :the_state_does_not_matter)

  def handle_events(events, _from, state) do
    Process.sleep(10)
    
    # here I only get 100 events
    {:noreply, [], state}

  end
end

I removed a lot of business logic there just to keep this code easy to follow.

In case anyone stumbles onto this question as I did, here is how min_demand and max_demand relate to the amount of events the consumer will actually get. Lets say we have a hypothetical consumer with min_demand = 1 and max_demand = 5, and a hypothetical producer that always returns the amount of items demanded (so there is no waiting on the consumers part)

  1. The consumer will start by asking for 5 (max_demand) amount of items. So, Producer.handle_demand will be called with demand of 5. If it can, it will put 5 events into the Consumer’s queue.
  2. The maximum number of events that the consumer will ever handle at the same time will be max_demand - min_demand (in this case 4). I think this is because when the consumer asks for events, it will be at min_demand, so in this case it will still have 1 item in its queue. So, if the consumer always processed the full max_demand, the consumer would get overflown having min_demand + max_demand items in its queue. So, the maximum number the consumer will ever process at one time will be max_demand - min_demand .
  3. Now that the consumer processed 4 events, it still has one in its queue. It will ask for 4 more to get up to max demand of 5
  4. Next, the consumer will only process 1 event. I guess this means that the consumer won’t mix batches of events. Since the first time the consumer asked for events it got 5 and only processed 4, this is the remaining one from that first batch
  5. Next, the consumer will process 4 events, the whole batch from its 2nd request.
  6. The consumer’s queue is now empty, so it will ask for 5 events (demand = 5). The pattern has restarted, so you can go back to step 1.
1 Like