Question about how to use GenStage.from_enumerable with an intermitent stream

I have a stream which doesnt produce data as fast as its consumed.

So I have a producer defined like this:

def start_link() do
  create_stream
  |> GenStage.from_enumerable(name: Producer)
end

Then my producer consumer subscribes to it

  def init(:ok) do
    {:producer_consumer, :the_state_does_not_matter, subscribe_to: [Producer]}
  end

And my consumer subscribes to mu producer consumer

  def init(:ok) do
    {:consumer, :the_state_does_not_matter, subscribe_to: [ProducerConsumer]}
  end

The issue I am having is the consumer hangs, I think because at some point the producer didnt manage to get new data and as stated in the docs:

When the enumerable finishes or halts, the stage will exit with :normal reason. This means that, if a consumer subscribes to the enumerable stage and the :cancel option is set to :permanent, which is the default, the consumer will also exit with :normal reason

So I read more and it suggests to add option cancel: :transient to dont finish the stage. I added it like this but its not working, am I missing something?

|> GenStage.from_enumerable(name: Producer, cancel: :transient)

Originally I was using a Flow.into_stages(flow, [ProducerConsumer]) but I cant do that because I cant reference (or I dont know how) the ProducerConsumer from my supervisor tree

children = [
  {Worker, []},
  {ProducerConsumer, []},
  {Consumer, []}
]

Based on the typespecs I think you are supposed to pass :transient instead of :cancel when you subscribe, rather than do anything in GenStage.from_enumerable/1.

Thanks, reading the docs it should be something like this right?

{:producer_consumer, :the_state_does_not_matter, subscribe_to: [{Worker, :transient}]}

:subscribe_to - a list of producers to subscribe to. Each element represents either the producer module or a tuple with the producer module and the subscription options (as defined in sync_subscribe/2).

But it complains with argument error:

** (ArgumentError) argument error
:erlang.monitor(:process, {Worker, :transient})

It needs to be a Keyword list. This is the syntax, for anyone wondering
subscribe_to: [{Worker, [cancel: :transient]}]

2 Likes