Handle_info when using Task.Supervisor.async_stream_nolink

Hi,

I want to handle the :down message for crashed task in GenServer when using async_stream_nolink, but handle_info is not called… I will be grateful if someone has a suggestion for this.

here is my implementation:

defmodule CSVServer do
use GenServer
import Logger

def start_link(_opts \\ []) do
    GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end

def init(state) do
    {:ok, state}
end
def start_stream(process_name) do
    stream= Agent.get( {:global, String.to_atom(process_name)}, fn %{"stream" => stream } ->
        stream
      end)
    #I start the stream inside the GenServer process, to receive :DOWN messages, but no :DOWN message received. 
    Task.start_link(Stream, :run, [stream])
end
def prepare_stream(csvs_params), do: GenServer.call(__MODULE__, %{csvs_params: csvs_params})

def handle_call(%{csvs_params: csvs_params}, _from, state) do
  # some code removed, just to show how I call async_stream_nolink below:
   stream= File.stream!(path)
    |> CSVParser.parse_stream
    |> Stream.chunk(chunck_n, step, [])
    |> (&(Task.Supervisor.async_stream_nolink(CSVSupervisor, &1, CSVServer, :chunk_handler_fn, [], max_concurrency: 10))).()
   
status= case Agent.start_link(fn -> %{"stream" => stream}  end, name: {:global, String.to_atom(process_name)}) do
    {:ok, pid} ->  :ok
    {status, error} -> status
    end
    #reply with process_name that I will start later
    {:reply, %{process_name: process_name, status: status}, state}
end

def chunk_handler_fn(chunk) do

#here is where I handle chunks
#I tried to raise error to test handling :DOWN message, but it did not received..
raise RuntimeError, message: "Test CSV error" 


end


#not called:
def handle_info({:DOWN, _, :process, _pid, reason}, state) do
    Logger.info ":DOWN"
    Logger.info (Kernel.inspect(_pid))
    Logger.info (Kernel.inspect(state))
    Logger.info (Kernel.inspect(reason))
end
end

In the documentation , I could not find anything about compatibility with OTP behaviours, I assumed that Task.Supervisor.async_stream_nolink is compatible with OTP behaviours like Task.Supervisor.async_nolink as in docs:

Compatibility with OTP behaviours

If you create a task using async_nolink inside an OTP behaviour like GenServer, you should match on the message coming from the task inside your GenServer.handle_info/2 callback.

But it seems not? I don’t know… Any guidance is highly appreciated.

Got the same problem. The answer is - it just does not monitor tasks. You can see the difference between async_nolink https://github.com/elixir-lang/elixir/blob/v1.10.3/lib/elixir/lib/task/supervisor.ex#L459 and async_stream_nolink https://github.com/elixir-lang/elixir/blob/v1.10.3/lib/elixir/lib/task/supervisor.ex#L481, there are no Process.monitor line. I opened a proposal theme at google groups https://groups.google.com/forum/#!topic/elixir-lang-core/dVK4mR43hww
For now I guess the only way is to use async_nolink in Enum/Stream/Flow