Hi,
I want to handle the :down message for crashed task in GenServer when using async_stream_nolink, but handle_info is not called… I will be grateful if someone has a suggestion for this.
here is my implementation:
defmodule CSVServer do
use GenServer
import Logger
def start_link(_opts \\ []) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
def init(state) do
{:ok, state}
end
def start_stream(process_name) do
stream= Agent.get( {:global, String.to_atom(process_name)}, fn %{"stream" => stream } ->
stream
end)
#I start the stream inside the GenServer process, to receive :DOWN messages, but no :DOWN message received.
Task.start_link(Stream, :run, [stream])
end
def prepare_stream(csvs_params), do: GenServer.call(__MODULE__, %{csvs_params: csvs_params})
def handle_call(%{csvs_params: csvs_params}, _from, state) do
# some code removed, just to show how I call async_stream_nolink below:
stream= File.stream!(path)
|> CSVParser.parse_stream
|> Stream.chunk(chunck_n, step, [])
|> (&(Task.Supervisor.async_stream_nolink(CSVSupervisor, &1, CSVServer, :chunk_handler_fn, [], max_concurrency: 10))).()
status= case Agent.start_link(fn -> %{"stream" => stream} end, name: {:global, String.to_atom(process_name)}) do
{:ok, pid} -> :ok
{status, error} -> status
end
#reply with process_name that I will start later
{:reply, %{process_name: process_name, status: status}, state}
end
def chunk_handler_fn(chunk) do
#here is where I handle chunks
#I tried to raise error to test handling :DOWN message, but it did not received..
raise RuntimeError, message: "Test CSV error"
end
#not called:
def handle_info({:DOWN, _, :process, _pid, reason}, state) do
Logger.info ":DOWN"
Logger.info (Kernel.inspect(_pid))
Logger.info (Kernel.inspect(state))
Logger.info (Kernel.inspect(reason))
end
end
In the documentation , I could not find anything about compatibility with OTP behaviours, I assumed that Task.Supervisor.async_stream_nolink
is compatible with OTP behaviours like Task.Supervisor.async_nolink
as in docs:
Compatibility with OTP behaviours
If you create a task using async_nolink inside an OTP behaviour like GenServer, you should match on the message coming from the task inside your GenServer.handle_info/2 callback.
But it seems not? I don’t know… Any guidance is highly appreciated.