Hey there,
I’m building a flow joining several custom GenStage producers. Some of these producers have sometime very few or even no data, and immediately return an empty list and stop themselves.
But when running my flow I will have consistent errors like this:
** (stop) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
Last message: {:DOWN, #Reference<0.1020439534.2853961736.202848>, :process, #PID<0.1339.0>, :noproc}
If I use send_after to delay the GenStage producer termination for 100ms, it solves the problem but also slows down my entire flow.
Here is a part of the producer code, related to the issue :
def handle_info(:exhausted, state) do
{:stop, :normal, %EctoStreamer{state | exhausted: true}}
end
defp forward(stream, to) do
Task.start_link(fn ->
stream
|> Stream.each(&handle_supply(&1, to))
|> Stream.run()
Process.send_after(to, :exhausted, @exhaustion_timeout)
end)
end