I am trying to understand in detail how GenStage
works by following the documentation. I tried fiddling with max_demand
and min_demand
by changing their values and understand how they work under the hood. Below is my producer
, producer_consumer
and consumer
code:
producer.ex
defmodule GenstageExample.Producer do
use GenStage
def start_link(initial \\ 0) do
GenStage.start_link(__MODULE__, initial, name: __MODULE__)
end
def init(counter) do
{:producer, counter}
end
def handle_demand(demand, state) do
events = Enum.to_list(state..(state + demand))
IO.inspect({"events", Enum.count(events)})
{:noreply, events, state + demand}
end
end
producer_consumer.ex
defmodule GenstageExample.ProducerConsumer do
use GenStage
def start_link(factor) do
GenStage.start_link(__MODULE__, factor, name: __MODULE__)
end
def init(factor) do
{:producer_consumer, factor, subscribe_to: [{GenstageExample.Producer, max_demand: 4, min_demand: 2}]}
end
def handle_events(events, _from, factor) do
converted_events = Enum.map(events, & &1 * factor)
{:noreply, converted_events, factor}
end
end
consumer.py
defmodule GenstageExample.Consumer do
use GenStage
def start_link(useless) do
GenStage.start_link(__MODULE__, useless)
end
def init(state) do
{:consumer, state, subscribe_to: [{GenstageExample.ProducerConsumer , max_demand: 2, min_demand: 1}]}
end
def handle_events(events, _from, state) do
for event <- events do
IO.inspect({self(), event})
end
IO.puts("\n")
:timer.sleep(1000)
{:noreply, [], state}
end
end
For consumer, I set the demand as
max_demand: 2, min_demand: 1
For producer_consmer, I set the demand as:
max_demand: 4, min_demand: 2
I see strange behaviour in the values printed by the consumer, using the above demand configuration:
Output on running mix run --no-halt
{"events", 5}
{"events", 2}
{"events", 2}
{#PID<0.175.0>, 0}
{#PID<0.176.0>, 2}
{#PID<0.175.0>, 1}
{#PID<0.176.0>, 3}
{"events", 2}
{#PID<0.175.0>, 4}
{#PID<0.176.0>, 4}
{"events", 2}
{#PID<0.175.0>, 5}
{#PID<0.176.0>, 5}
{"events", 2}
{#PID<0.175.0>, 6}
{#PID<0.176.0>, 6}
{"events", 2}
My doubts:
- If the
max_demand
forproducer_consumer
is 4, then why is the first request to the producer has the demand 5? - Then, immediately after the producer produces 5 events, why isn’t it printed by the consumer? Why do I see two demands to the producer for 2 events each?
- Strangely, each consumer is printing the same event. This shouldn’t happen if the default dispatcher is DemandDispatcher. Why?
If anybody can help me understand how max_demand
and min_demand
in a simple, intuitive way, it would be really helpful.
Thanks.