Flow / GenStage producers with very few or no data

Hey there,

I’m building a flow joining several custom GenStage producers. Some of these producers have sometime very few or even no data, and immediately return an empty list and stop themselves.

But when running my flow I will have consistent errors like this:

** (stop) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
Last message: {:DOWN, #Reference<0.1020439534.2853961736.202848>, :process, #PID<0.1339.0>, :noproc}

If I use send_after to delay the GenStage producer termination for 100ms, it solves the problem but also slows down my entire flow.

Here is a part of the producer code, related to the issue :

def handle_info(:exhausted, state) do
  {:stop, :normal, %EctoStreamer{state | exhausted: true}}
end

defp forward(stream, to) do
  Task.start_link(fn ->
    stream
    |> Stream.each(&handle_supply(&1, to))
    |> Stream.run()

    Process.send_after(to, :exhausted, @exhaustion_timeout)
  end)
end

Months after, I’m still struggling with this issue. Any idea someone?

Have you tried unsubscribing your consumers before you stop the producers?

Consumers don’t know when datasource is finished. Only the producer does

GenStage consumers crash when producer crashes though I’m not sure if they use link or monitor. Thats the reason behind the guideline to put producer consumer pipelines under : rest_for_one supervision strategy,.
If they don’t know then just notify them.