I have a stream which doesnt produce data as fast as its consumed.
So I have a producer defined like this:
def start_link() do
create_stream
|> GenStage.from_enumerable(name: Producer)
end
Then my producer consumer subscribes to it
def init(:ok) do
{:producer_consumer, :the_state_does_not_matter, subscribe_to: [Producer]}
end
And my consumer subscribes to mu producer consumer
def init(:ok) do
{:consumer, :the_state_does_not_matter, subscribe_to: [ProducerConsumer]}
end
The issue I am having is the consumer hangs, I think because at some point the producer didnt manage to get new data and as stated in the docs:
When the enumerable finishes or halts, the stage will exit with :normal reason. This means that, if a consumer subscribes to the enumerable stage and the :cancel option is set to :permanent, which is the default, the consumer will also exit with :normal reason
So I read more and it suggests to add option cancel: :transient to dont finish the stage. I added it like this but its not working, am I missing something?
|> GenStage.from_enumerable(name: Producer, cancel: :transient)
Originally I was using a Flow.into_stages(flow, [ProducerConsumer]) but I cant do that because I cant reference (or I dont know how) the ProducerConsumer from my supervisor tree
children = [
{Worker, []},
{ProducerConsumer, []},
{Consumer, []}
]