Do something after Task death

I have a legacy code that I’m refactoring into executing asynchronously. The easiest would be to start a Task from DynamicSupervisor. But I don’t know how to handle task termination/completion.

  • Don’t need to restart failed tasks, so :temporary mode for supervisor suits me.
  • For normal completion of the task, need to send a message via PubSub
  • For abnormal termination of the task, need to send a message via PubSub and possibly do some cleanup.

I suppose for normal death I can trigger PubSub in the function that calls legacy process and is passed into Task. But abnormal must be caught somehow…

Is DynamicSupervisor suitable for that? How do I catch task’s :DOWN message for whatever reason - normal or abnormal termination?

1 Like

IIRC tasks are linked to the calling process

When invoked, a new process will be created, linked and monitored by the caller. Once the task action finishes, a message will be sent to the caller with the result.

The calling process will need to trap exits, handle the task’s abnormal termination, and then perform the cleanup. This is described here, but what that really means is you just need a handle_info to capture the exit message.

1 Like

I’d additionally recommend using a Task.Supervisor.

iex(1)> Supervisor.start_link([{Task.Supervisor, name: MyTaskSupervisor}], strategy: :one_for_one)
{:ok, #PID<0.114.0>}

iex(2)> Task.Supervisor.async_nolink(MyTaskSupervisor, fn -> :awesome_task end)
%Task{
  mfa: {:erlang, :apply, 2},
  owner: #PID<0.112.0>,
  pid: #PID<0.117.0>,
  ref: #Reference<0.987707504.2573795360.120005>
}

iex(3)> flush()
{#Reference<0.987707504.2573795360.120005>, :awesome_task},
 {:DOWN, #Reference<0.987707504.2573795360.120005>, :process, #PID<0.117.0>, :normal}

iex(4)> Task.Supervisor.async_nolink(MyTaskSupervisor, fn -> raise "uh oh" end)
%Task{
  mfa: {:erlang, :apply, 2},
  owner: #PID<0.112.0>,
  pid: #PID<0.120.0>,
  ref: #Reference<0.987707504.2573795360.120046>
}

iex(5)> flush()
{:DOWN, #Reference<0.987707504.2573795360.120178>, :process, #PID<0.129.0>,
 {%RuntimeError{message: "uh oh"}, ...}}

Docs for async_nolink/3 have some more information on this. Basically, if the task succeeds, you’ll receive a {task_ref, result} message. Regardless of success of failure, you’ll receive a {:DOWN, ref, :process, pid, reason} message. If it was a normal exit, that reason will be :normal. Otherwise, it was abnormal.

Your process kicking off these tasks can implement handle_info and pattern-match on these to broadcast the appropriate PubSub message.

2 Likes

The calling process is the thread that handles graphql query. Since the Task starts asynchronously that thread will probably terminate before the task is completed normally. Also I heard trap_exists is unreliable in case of abnormal termination. And handle_info is a callback in a GenServer. So if you’re not in a GenServer, I was hoping there must be a way to provide a callback to handle task death - I would think that’s task supervisor’s job.

But I guess I’ll need to use GenServer to create a Monitor for my tasks that implements handle_info callback, start it statically in Application supervisor tree, and it will sit there idly until I call some monitor(pid) function so it can call Process.monitor(pid) which would then guarantee I get notification via handle_info() regardless of how the task died.

I just thought that the simplest way to achieve asynchronous things is by using DynamicSupervisor and Task, but they seem to be too limited in their functionality out of the box.

Perhaps you’re used to something like promises that have a success/error callback. That’s just not how the BEAM works, though – the primitives and assumptions are different.

Consider that you could pass a function to the supervisor to run on success/failure of a task. When the supervisor calls your function, what happens if it crashes? It takes down the whole supervisor.

All of these components have been very carefully designed and tested for robustness. Sometimes things seem tricky because there’s a gap in some API or some ergonomics to be improved, but I don’t think that’s the case here.

2 Likes

Since the Task starts asynchronously that thread will probably terminate before the task is completed normally

If there is a high chance that the task will fail and there is no process to receive the DOWN message, why not just try/catch directly in the task process and publish your message accordingly?

Because

  1. Elixir’s MOD is “let if fail”, instead of coding paranoid catching of all possible exception like in Java
  2. A task can be killed by other means and one still needs to handle it gracefully (perform cleanups, pubsub events etc)
    So I followed @mpope’s recommendation and built a GenServer that monitors task execution and handles its death in handle_info({:DOWN, ... and normal return in handle_info({_, result}... . The GenServer is started in application’s supervisor tree and then when the task is started I register it using MyMonitor.monitor(task) which delegates to Process.monitor(task.pid) - to make sure the calling process - in this case, my GenServer/Monitor - gets its handle_info called.
    I wish there was an easier way out of the box but c’est la vie.

Here’s what I came up with as an exercise. Some small liberties taken based on how I set up Phoenix.PubSub, but hopefully still understandable.

defmodule MyApp.TaskManager do
  use GenServer
  
  @pubsub_topic "really_important_tasks"

  @doc """
  Starts a singleton task manager.
  """
  def start_link(arg) do
    GenServer.start_link(__MODULE__, arg, name: __MODULE__)
  end

  @doc """
  Starts an async task and broadcasts the result or failure.
  """
  def async_task(id, fun) do
    GenServer.cast(__MODULE__, {:async_task, id, fun})
  end
  
  @impl true
  def init(opts) do
    {:ok, %{supervisor: Keyword.fetch!(opts, :supervisor), tasks: %{}}}
  end
  
  @impl true
  def handle_cast({:async_task, id, fun}, %{supervisor: supervisor, tasks: tasks} = state) do
    %Task{ref: ref} = Task.Supervisor.async_nolink(supervisor, fun)
    {:noreply, %{state | tasks: Map.put(tasks, ref, id)}}
  end
  
  @impl true
  def handle_info({ref, result}, state) when is_reference(ref) do
    {:noreply, broadcast_result(state, ref, {:ok, result})}
  end
  
  def handle_info({:DOWN, _ref, :process, _pid, :normal}, state) do
    {:noreply, state}
  end
  
  def handle_info({:DOWN, ref, :process, _pid, reason}, %{tasks: tasks} = state) do
    {:noreply, broadcast_result(state, ref, {:error, reason})}
  end
  
  defp broadcast_result(%{tasks: tasks} = state, ref, result) do
    if id = tasks[ref] do
      MyApp.PubSub.broadcast(@pubsub_topic, {:task, id, result})
    end
    
    %{state | tasks: Map.delete(tasks, ref)}
  end
end

# when starting your application
children = [
  MyApp.PubSub,
  {Task.Supervisor, name: MyApp.TaskSupervisor},
  {MyApp.TaskManager, supervisor: MyApp.TaskSupervisor}
]

Supervisor.start_link(children, strategy: :one_for_one)

# to start tasks
MyApp.TaskManager.async_task("some_important_thing", &do_the_thing/0)

# if you care about results
MyApp.PubSub.subscribe("really_important_tasks:*")

(Having written this out, I think it’s pretty well worth the ~50 lines of code to have control over and insight into task execution.)

3 Likes

Yeah that’s pretty much what I got, except I used pid instead of ref and I have case reason instead of functions with different pattern matches. Thanks!

1 Like