So in the example given here :-GenStage – gen_stage v0.14.0 that of A → B → C I implemented that and saw that the consumer is able to constantly send demands upstream and the numbers are getting printed. But when I created a normal consumer, producer
defmodule Asyncgenstage.Producer do
use GenStage
def start_link(_args) do
GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
end
def init(args) do
with {:ok, pid} <- :python.start([{:python_path, to_charlist("path")},{:python, 'python3'}]) do
{:producer, pid}
end
end
def handle_demand(demand, state) do
result = call_python(state)
{:noreply, result, state}
end
def call_python(pid) do
call_MFA(pid)
end
defp call_MFA(pid) do
module = :url_generator
function = :main
arguments = [10]
:python.call(pid, module, function, arguments)
end
end
defmodule Asyncgenstage.Consumer do
use GenStage
alias Asyncgenstage.Producer, as: Producer
def start_link, do: start_link([])
def start_link(_args) do
GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
end
def init(:ok) do
{:consumer, :ok, subscribe_to: [{Producer, max_demand: 10, mid_demand: 1}]}
end
def handle_events(events, from, state) do
IO.inspect(events)
{:noreply, [], state}
end
end
I observer the demand is not getting send continuously so I thought maybe the the function is not getting called so many times but when I introduced max_demand and min_demand in consumer then it start sending demand upstream constantly. Why is that, max_demand and min_demand are just opts right like by default also we have that value then why on explicitly declaring them is triggering the demand upstream constantly but without them its not.
You want to read the following part in the docs of a recent version of GenStage: GenStage — gen_stage v1.2.1
It explains how demand is handled. In your case the problem is likely the producer, which produces a fixed amount of events whenver handle_demend is triggered instead of producing as many events as there has been demand sent.
Your producer is expected to produce 5* events eventually if it received a demand of 5 in handle_demand. Consumers won’t demand more before their previous demand has been fulfilled to some degree (depending on min and max_demand setting).
[*] At least 5. GenStage can buffer some events exceeding demand to make e.g. producers working with batches of data simpler.
Okay so in this demand context when we say that demand is served or to put it in better manner when we can say that demand is sent. IMO whats happening here is since that demand variable is not being used hence its not being considered as served is that the case ? since what I did is I changed a code a little bit like this :-
defmodule Asyncgenstage.Producer do
use GenStage
def start_link(_args) do
GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
end
def init(args) do
with {:ok, pid} <- :python.start([{:python_path, to_charlist("/Users/ayoushchourasia/Documents/Projects/genstage_example/asyncgenstage/lib/asyncgenstage")},{:python, 'python3'}]) do
{:producer, pid}
end
end
def handle_demand(demand, state) do
result = call_python(state, demand)
{:noreply, result, state}
end
def call_python(pid, demand) do
call_MFA(pid, demand)
end
defp call_MFA(pid, demand) do
module = :url_generator
function = :main
arguments = [demand]
:python.call(pid, module, function, arguments)
end
end
and I start getting events constantly so is it the case ?
The interface for a producer for demand works like this: Additional demand is communicated to the producer through the handle_demand callback. The producer if meant to keep track of that demand (temp. or permanently) and to eventually (through any of the producer callbacks) emit the number of events it received demand for. If your python code – when called – ensures to return as many events as demand was sent then your code should be fine.