Task.await terminates GenServer because of timeout. How to fix?

My Genserver terminates after a little while, after sending a few http requests. I can’t understand the reason:

[error] GenServer MyGenServer terminating
** (stop) exited in: Task.await(%Task{owner: #PID<0.420.0>, pid: #PID<0.1054.0>, ref: #Reference<....>}, 5000)
    ** (EXIT) time out
    (elixir) lib/task.ex:416: Task.await/2
    (elixir) lib/enum.ex:966: Enum.flat_map_list/2
    (my_app123) lib/my_genserver.ex:260: MyApp.MyGenServer.do_work/1
    (my_app123) lib/my_genserver.ex:180: MyApp.MyGenServer.handle_info/2
    (stdlib) gen_server.erl:601: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:683: :gen_server.handle_msg/5
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: :tick
State: [%{var1: "fdsafdsfd", var2: "43243242"}]

A chunk of the code:

  # it's called from handle_info

  def do_work(some_data) do
    Enum.map(some_data, fn(x) ->
      Task.async(fn ->
        case HTTPoison.post(.....) do
        # ...........

Is “Task.async” causing the timeout? But why? Yes, it can take more than 5 seconds to complete, but why does it cause an exception which then terminates GenServer? How to fix it?

About await:

If the timeout is exceeded, await will exit; however, the task will continue to run. When the calling process exits, its exit signal will terminate the task if it is not trapping exits.

Task.await/1 calls exit/1 on timeout, so it will end your current process. If you do not wan’t that behaviour you will have to implement the receiving of the result and receiving of :DOWN messages on your own.

1 Like

I’m still getting to grips with Tasks myself, but I think it’s because the Task is linked to the calling process, so if it throws an error and exits, then the calling process will also die. See https://hexdocs.pm/elixir/Task.html#module-async-and-await

You could use Task.start_link/1 if you don’t need the response, I’ve been using a Task.Supervisor for mine, so that if a Task dies, it doesn’t bring the genserver down.

Add a supervisor:

supervisor(Task.Supervisor, [[name: App.MyTaskSupervisor]])

Then can use it like:

def do_work(some_data) do
  Enum.map(some_data, fn(x) ->
    Task.Supervisor.async_nolink(App.MyTaskSupervisor, fn ->
      case HTTPoison.post(.....) do

Because I wnted to know if it was a timeout, I use yield

task = Task.Supervisor.async_nolink(
  App.MyTaskSupervisor,
  MyModule,
  :task_function,
  [args]
)

case Task.yield(task) || Task.shutdown(task) do
  {:ok, result} ->
    ack_message({:ok, %{channel: channel, tag: tag}})
  
  {:error, msg} ->
    ack_message({:error, %{error: msg, channel: channel, tag: tag, redelivered: redelivered}})

  {:exit, reason} ->
    ack_message({:error, %{error: reason, channel: channel, tag: tag, redelivered: redelivered}})

  nil ->
    ack_message({:error, %{error: "TIMEOUT", channel: channel, tag: tag, redelivered: redelivered}})
end
1 Like

How and where?

Or maybe I could

  1. use try…catch inside Task.await
  2. or use Task.yield?

but I already have this in:

  def start(_type, _args) do
    import Supervisor.Spec

    children = [supervisor(MyWebApp.Repo, []), supervisor(MyWebApp.Endpoint, []),
                worker(MyWebApp.MyGenServer, [])]

    opts = [strategy: :one_for_one, name: MyWebApp.Supervisor]
    Supervisor.start_link(children, opts)
  end

I’m not sure what you mean by using try/catch in Task.await/1, since you can’t alter Task.await/1. You can look at its implementation though to see how the different kind of messages look like, then you will know how to math them in handle_info/2. I’m not quite sure what will be the best way to time out a Task then, though…

If you mean you wan’t to wrap Task.await/1 with try/catch, this won’t work either. exit/1 does end the process. It does end the process. There is no try, nothing raised or thrown, only exit.

Again, I’m not sure this is the best way as I’m still learning myself, but something like:

Add supervisor(Task.Supervisor, [[name: MyWebApp.TaskSupervisor]]) to the app

def start(_type, _args) do
  import Supervisor.Spec

  children = [
    supervisor(MyWebApp.Repo, []),
    supervisor(MyWebApp.Endpoint, []),
    worker(MyWebApp.MyGenServer, []),
    supervisor(Task.Supervisor, [[name: MyWebApp.TaskSupervisor]]) # This is added
  ]

  opts = [strategy: :one_for_one, name: MyWebApp.Supervisor]
  Supervisor.start_link(children, opts)
end

Then inside MyWebApp.MyGenServer you could do:

def do_work(some_data) do
  Enum.map(some_data, fn(x) ->
    Task.Supervisor.async(MyWebApp.TaskSupervisor, fn -> # Use the Task.Supervisor here
      case HTTPoison.post(.....) do

The reason I switched to using Task.yield was because I wanted to be able to trap timeouts (it sends back nil) so that I could do something with it.

I mean this:

Task.async fn ->
  try do
    # http request
  catch
  end
end

How about Task.yield instead Task.async, will it work?

How will I know that it’s the “exit” or :DOWN sent exactly from that Task.await?

That of course would work, but you have to do it in every Task you spawn, also this will not do anything about a timeout in Task.await/1!


What do you wan’t to return in the case that there was no answer before having the yield timing out?

It won’t exit on raise though, but give you an :error-tuple which you could return straight.


By using a map to map your Task to your callers. roughly like this:

def handle_call(:foo, from, state) do
  task = Task.async(&do_stuff/0)
  state = Map.put(state, task.ref, from)
  {:noreturn, state}
end

How to pull of the correct value from the state is left as an exercise for the reader :wink: Also I have to admit that the code is untested since I currently have no access to a properly set up elixir environment.

How will that help me to handle :DOWN or :exit?
I already have something like this.

I don’t understand: no answer and before timeout? why would I want to return anything in such a case? I’d wait for timeout or answer

Don’t just look at the function head but the body as well. The code you have shown elsewhere just stored the last from in the state, this is a map which is able to get the correct from for any ref. And the ref is exactly the thing, that is also used by Task.await/1 and Task.yield/1 to check if they got the correct message. The ref is also available in the :DOWN message.


Task.yield does return nil after the timeout (which is bad enough since it could have been a proper result of the computation in the task). The task will continue to run, and when you return the %Task{}, your receiver can of course ask for the result again using Task.yield/1/2. Another possibility were to simply return {:error, :timeout} from your GenServer when yield didn’t yield anything (but then one should try to kill or stop the Task.


Currently, I’m not even sure if a Task is the thing you want to go with. I do think, that it were so much easier, just to spawn a process and let it do the GenServer.reply(from, answer) stuff. In that process you could do anything you want regarding error- and timeout handling and don’t need to mess up with your genserver anymore.

I need to use GenServer because I have a bunch of user with some data for which i need to poll an external website every N seconds. Data/state in GenServer can grow, become empty and grow again over time depending on a response of an external website. That is, it’s not a one-time operation, it has to work in background all the time.

I’m not talking about repl,acing the GenServer, but about replacing the Task by a bare process

Maybe. I’m asking for advice. How will it work in “do_work” function, will it get along with GenServer well?

I’m not sure if I do understand your question correctly here. But outsourcing work from a GenServer into another process as soon as it will take longer than a couple of “cycles” is idiomatic and good practice.

This is done to keep the work the actual GenServer process has to do to a minimum so that it is able to process incomming calls and casts as fast as possible,

But as a rough concept it looks like this:

def handle_call{:do_stuff, from, status} do
  spawn(fn ->
    answer = do_stuff()
    GenServer.reply(from, answer)
  end)
  {:noreply, state}
end

Doing it this way, will remove the burden to handle any communication with that process from your GenServer. Assuming that HTTPoison does have timeouts, you can even handle them inside that spawned process and reply differently from the default when a timeout happens. Also you can try and catch there. You will still run into trouble when do_stuff or something inside of it does call exit!

1 Like

Basically ditch Task.await and simply grab the results in handle_info/2. If your tasks have a habit of abnormally exiting, trap exits and process the :EXIT messages accordingly.

defmodule Charlie do
  use GenServer

  defp new_task do
    max_time = 5000
    work =
      case {(:rand.uniform max_time), (:rand.uniform max_time)} do
        {work, panic} when work < panic ->
          fn ->
            Process.sleep work
            work # return the work time as the result
          end
        {_, panic} ->
          fn ->
            Process.sleep panic
            exit(:panic) # simulate non-normal exit
          end
      end

    Task.async work
  end

  defp add_task(tasks, task),
    do: Map.put tasks, task.pid, task

  defp purge_task(tasks, pid) do
    case Map.get tasks, pid, :none do
      :none ->
        {tasks, :none}
      task ->
        Process.demonitor task.ref, [:flush]   # purge related :DOWN messages
        {(Map.delete tasks, pid), task}
    end
  end

  defp shutdown_tasks(tasks) do
    tasks
    |> Map.values()
    |> Enum.each(&(Task.shutdown &1, :brutal_kill))
  end

  ## callbacks
  def handle_info(:next_task, {tasks, _ref}) do
    # time to create another task
    new_tasks =
      cond do
        (length (Map.keys tasks)) < 50 ->
          add_task tasks, new_task()
        true ->
          tasks
      end

    timer_ref = Process.send_after self(), :next_task, 500
    {:noreply, {new_tasks, timer_ref}}
  end
  def handle_info({ref, result}, state) when is_reference ref do
    # handle Task completion
    Process.demonitor ref, [:flush]   # purge related :DOWN messages ASAP

    IO.puts "Task #{inspect ref} completed with result #{result}"

    {:noreply, state} # remove task when :EXIT :normal is processed
  end
  def handle_info({:EXIT, from, reason}, {tasks, timer_ref} = state) do
    # Handle exit signal not handled by the behaviour (i.e. signal from parent process)
    case purge_task tasks, from  do
      {_, :none} ->
        IO.puts "Unknown :EXIT #{inspect reason}"
        case reason do
          :normal ->
            {:noreply, state}
          _ ->
            {:stop, reason, state}  # Follow protocol: unknown non-normal exit signal - time to terminate
        end
      {new_tasks, task} ->
        case reason do
          :normal ->
            :ok
          _ -> # The task panicked
            IO.puts "Task #{inspect task.ref} panicked: #{inspect reason}"
        end
        {:noreply, {new_tasks, timer_ref}}
    end
  end

  def init(_args) do
    Process.flag(:trap_exit, true)
    Kernel.send self(), :next_task # start spinning off tasks
    {:ok, {%{}, :none}}
  end

  def terminate(_reason, {tasks, timer_ref}) do
    IO.puts "Terminating"
    cond do
      is_reference timer_ref ->
        Process.cancel_timer timer_ref
      true ->
        0 # no timer, therefore no time left
    end

    shutdown_tasks tasks
    :ok
  end

  ## public interface

  def start_link do
    GenServer.start_link(__MODULE__, [])
  end

  def stop(pid) do
    GenServer.stop pid
  end

  # NOTE:
  # https://hexdocs.pm/elixir/Task.html#await/2-compatibility-with-otp-behaviours
  # It is not recommended to await a long-running task inside an OTP behaviour such as GenServer.
  # Instead, you should match on the message coming from a task inside your
  # GenServer.handle_info/2 callback.
  #
end

.

$iex -S mix
iex(1)> Process.flag :trap_exit, true 
false
iex(2)> {:ok,pid} = Charlie.start_link
{:ok, #PID<0.127.0>}
Task #Reference<0.0.6.653> panicked: :panic
Task #Reference<0.0.6.651> completed with result 1306
Task #Reference<0.0.6.647> panicked: :panic
Task #Reference<0.0.6.658> panicked: :panic
Task #Reference<0.0.5.605> panicked: :panic
Task #Reference<0.0.6.649> panicked: :panic
Task #Reference<0.0.6.661> completed with result 464
Task #Reference<0.0.6.668> panicked: :panic
Task #Reference<0.0.6.677> panicked: :panic
Task #Reference<0.0.6.673> panicked: :panic
Task #Reference<0.0.6.682> completed with result 957
Task #Reference<0.0.6.684> completed with result 665
Task #Reference<0.0.6.675> panicked: :panic
iex(3)> Process.exit pid, :whatever
Terminating
true
iex(4)>
1 Like
def handle_info({ref, result}, state) when is_reference ref do
    # handle Task completion
    Process.demonitor ref, [:flush]   # purge related :DOWN messages ASAP

    IO.puts "Task #{inspect ref} completed with result #{result}"

    {:noreply, state} # remove task when :EXIT :normal is processed
  end

That’s not what I’m looking for because I have tasks that don’t complete after the 1st iteration, thus it takes them a few iterations to complete. Also, they’re sending http requests which can either take a long time or cause a timeout depending on how I should set them up. How should you code be adjusted then?