Handle_info when using Task.Supervisor.async_nolink

Hi,

I want to handle the following exit example ( String.to_integer(input) is jus an example and can be replaced with raise). This happen when I call My.GenServer.some_method(My.GenServer, "test"). I will be grateful if someone has a suggestion for this.

10:26:42.321 [error] GenServer My.GenServer terminating
** (stop) exited in: Task.await(%Task{owner: #PID<0.124.0>, pid: #PID<0.128.0>, ref: #Reference<0.4181578440.4035444737.94083>}, 5000)
    ** (EXIT) an exception was raised:
        ** (ArgumentError) argument error
            :erlang.binary_to_integer("test")

Here’s the code:

defmodule My.Application do
  use Application

  def start(_type, _args) do
    import Supervisor.Spec, warn: false

    children = [
      worker(My.GenServer, []),
      supervisor(Task.Supervisor, [[name: My.Task.Supervisor, restart: :transient]]),
    ]
    
    opts = [strategy: :one_for_one, name: Test.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

defmodule My.GenServer do
    def start_link do
      GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
    end

    def init(:ok) do
      # Process.flag(:trap_exit, true)
      {:ok, []}
    end

    def some_method(pid, input) do
        GenServer.call(pid, {:some_method, input})
    end

    def handle_call({:some_method, input}, _from, state) do
      # because we don't want to terminate GenServer use Task.async_nolink
      task = Task.Supervisor.async_nolink(My.Task.Supervisor, fn ->
          # raise argument error when passing binary
          String.to_integer(input)
      end)
      {:reply, Task.await(task), state}
    end

    def handle_info({:EXIT, from, reason}, state) do
      IO.inspect "handle_info::exit"
      IO.inspect reason
      {:noreply, state}
    end

    def handle_info({_ref, result}, state) do
      IO.inspect "handle_info::result"
      IO.inspect result
      {:noreply, state}
    end

    def handle_info({:DOWN, _ref, :process, _pid, reason}, state) do
      IO.inspect "handle_info::down"
      IO.inspect reason
      {:noreply, state}
    end

    def handle_info(msg, state) do
      IO.inspect "handle_info"
      IO.inspect msg
      {:noreply, state}
    end
end

I’ve read the docs from here regarding Task.Supervisor.async_link: if you create a task using async_nolink inside an OTP behaviour like GenServer, you should match on the message coming from the task inside your GenServer.handle_info/2 callback.

My problem is that handle_info is not called, I try to debug using observer attaching a trace to My.GenServer and I see that My.GenServer is receiving a :DOWN message:

Any solution to this problem ?
Thanks

You’re not getting a result because there is no result: the task failed. It never got a chance to reply. :DOWN is the right thing to handle in that case.

When using Task.await/1 it will handle the :DOWN-message from the Task. It will handle it by calling exit/1. So the GenServer will be exited because of that.

If you want to be more failsafe, you have to do the waiting for the result and the :DOWN-message completely on your own.

1 Like

I don’t get a :DOWN message in handle_info({:DOWN, _ref, :process, _pid, reason}, state)

If I remove Task.await(task) and replace {:reply, Task.await(task), state} with {:reply, :ok, state} I got that a :DOWN message but I need to wait for response from task.

Oh, I see what you mean. Instead of using await you can store the task’s ref and the caller in your GenServer state, then use GenServer.reply in your handle_info to send back the response to the caller.

1 Like

As it is now you gain nothing from the Task except for the timeout, you could do the work in the GenServer directly and it wouldn’t matter (as long as no timout would happen in the Task)

The more correct way were to put the returned task into your state, alongside the from and return a :noreturn tuple in handle_call/3.

Some time later you will receive a message with either the result of the computation done in the Task, which you then can GenServer(from, answer) to your caller.

Or you might receive a :DOWN, which you will need to find ways to tell your caller about. Most idiomatic way were to use an :error-tuple I think.

1 Like

Thanks a lot for your help @NobbZ and @dom , right now works.

defmodule My.GenServer do
    def start_link do
      GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
    end

    def init(:ok) do
      {:ok, []}
    end

    def some_method(pid, input) do
        GenServer.call(pid, {:some_method, input})
    end

    def handle_call({:some_method, input}, from, _state) do
      Task.Supervisor.async_nolink(My.Task.Supervisor, fn ->
          # raise argument error when passing binary that not represent an integer
          String.to_integer(input)
      end)
      {:noreply, from}
    end

    def handle_info({:DOWN, _ref, :process, _pid, reason}, state) do
      GenServer.reply(state, {:error, reason})
      {:noreply, state}
    end

    def handle_info({_ref, result}, state) do
      GenServer.reply(state, {:ok, result})
      {:noreply, state}
    end

    def handle_info(_msg, state) do
      {:noreply, state}
    end
end
iex(1)> My.GenServer.some_method(My.GenServer, "test")
{:error,
 {:badarg,
  [{:erlang, :binary_to_integer, ["test"], []},
   {My.GenServer, :"-handle_call/3-fun-0-", 1, [file: 'lib/test.ex', line: 17]},
   {Task.Supervised, :do_apply, 2, [file: 'lib/task/supervised.ex', line: 85]},
   {Task.Supervised, :reply, 5, [file: 'lib/task/supervised.ex', line: 36]},
   {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 247]}]}}
iex(2)> My.GenServer.some_method(My.GenServer, "1")   
{:ok, 1}

This will break when your GenServer will be hit by multiple calls before the result of the spawned task is sent back!

Each time you end up in your handle_cast you are overwriting the receiver of the answer.

Once you get an answer back, you will send it to the last one who asked for the info, letting all other processes starve. Also when subsequent answers from all the other tasks come in, you will send them into the mailbox of the same process (unless another one has asked in the meantime), spamming that processes mailbox with stuff that will probably never be read again.

Please make sure, that you map a task to a caller, and only send the answers/results back correspondingly.

1 Like

Something like this ?

defmodule My.GenServer do
    def start_link do
      GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
    end

    def init(:ok) do
      {:ok, %{}}
    end

    def some_method(pid, input) do
        GenServer.call(pid, {:some_method, input})
    end

    def handle_call({:some_method, input}, from, state) do
      task = Task.Supervisor.async_nolink(My.Task.Supervisor, fn ->
        # raise argument error when passing binary
        String.to_integer(input)
      end)

      {:noreply, Map.put(state, Map.get(task, :ref), from)}
    end

    def handle_info({:DOWN, ref, :process, _pid, reason}, state) do
      new_state = reply_to_caller(ref, {:error, reason}, state)

      {:noreply, new_state}
    end

    def handle_info({ref, result}, state) do
      new_state = reply_to_caller(ref, {:ok, result}, state)

      {:noreply, new_state}
    end

    def handle_info(_msg, state) do
      {:noreply, state}
    end

    def reply_to_caller(ref, message, state) do
      case Map.get(state, ref) do
        nil -> state
        from  -> 
          GenServer.reply(from, message)
          Map.delete(state, ref)
      end
    end
end

Thanks again, @NobbZ

Looks good on a first glance. I’d prefer to not call functions from inside the result tuple, but thats more of a personal and stylistic issue.

what’s the name of this application?

https://elixirforum.com/uploads/default/original/2X/c/c695484a987ec6420117f28fa472ca5b9b9b6bf3.png

Observer

iex> :observer.start

Observer and you can use it remotely to debug your application

$ iex
iex(1)>  :observer.start

[error] ERROR: Could not find 'wxe_driver.so' in: /usr/lib/erlang/lib/wx-1.8.1/priv

How to solve this massively depends on your operating system and the way you installed erlang.

Sometimes it is enough to install wx and wx develop packages and then reinstall erlang the same way as before and then elixir the same way as before, but only in about 10% of all cases.

wxWidgets: Cross-platform GUI library

Example: How to fix ‘wxe_driver.so’ error in Elixir

4 posts were split to a new topic: Using a systems package manager vs. a specific version manager

I use Ubuntu on vps and laptop/pc and I have no problem. I’ve installed elixir using those commands from elixir-lang:

wget https://packages.erlang-solutions.com/erlang-solutions_1.0_all.deb && sudo dpkg -i erlang-solutions_1.0_all.deb
sudo apt-get update
sudo apt-get install esl-erlang
sudo apt-get install elixir