I’ve got a 3 stage gen_stage I’m using (producer, producer_consumer & consumer). I want to set the demand to be pretty high–1000+. So I am doing this to get them all started:
{:ok, producer} = Producer.start_link(0)
{:ok, producer_consumer} = ProducerConsumer.start_link(0)
{:ok, consumer} = Consumer.start_link()
GenStage.sync_subscribe(consumer, to: producer_consumer,
max_demand: 1500, interval: 10, min_demand: 1400)
GenStage.sync_subscribe(producer_consumer, to: producer,
max_demand: 1500, interval: 10, min_demand: 1400)
Process.sleep(:infinity)
However, when I dump out the demand inside the consumer stage, it is just 100. I was assuming that in the sync_subscribe the max_demand and min_demand options would ensure that demand would be between those two values.
Am I correct in that assumption? I’m beginning to think not. If not, how do I ensure that the Consumer the correct number of items to process?
Here are the extremely skeletal implementations of the stages:
defmodule Producer do
use GenStage
def start_link(page), do: GenStage.start_link(Producer, page)
def init(page), do: {:producer, page}
def handle_demand(demand, offset) when demand > 0 do
# get some events
{:noreply, events, offset + demand}
end
end
defmodule ProducerConsumer do
use GenStage
def start_link(page), do: GenStage.start_link(ProducerConsumer, page)
def init(number), do: {:producer_consumer, number}
def handle_events(events, _from, number) do
# get demand items
{:noreply, items, number}
end
end
defmodule Consumer do
use GenStage
def start_link(), do: GenStage.start_link(EventBridge.Migrate.Consumer, :ok)
def init(:ok), do: {:consumer, :the_state_does_not_matter)
def handle_events(events, _from, state) do
Process.sleep(10)
# here I only get 100 events
{:noreply, [], state}
end
end
I removed a lot of business logic there just to keep this code easy to follow.