Is the Task.Supervisor the right tool for the job?
I’m thinking of writing a GenServer that would use Task.Supervisor.async_nolink to start Tasks, specifying the :shutdown option and handling the {ref, result} and :DOWN messages.
But how do I make sure that the tasks are immediately terminated when the GenServer is shut down? I guess Task.Supervisor.async is what I should use. But how will I get the results/crashes? Am I going to receive the same messages as with async_nolink?
Use Task.Supervisor.async but notice you can’t use await, you will receive the done or the down message in your handle_info
be able to specify the timeout
When you call Task.Supervisor.async, also call Process.send_after(self, {:kill, task}, timeout). You will receive that message In handle_info, kill the task if timeout has passed
do some bookkeeping when the work succeeds
Match the {ref, result} message in handle_info and cancel the timeout started above
do some other bookkeeping when the work fails
Match the DOWN message in handle_info and cancel the timeout started above. Also make sure you call Process.flag(:trap_exit, true) so your genserver doesn’t die when the task crashes
Things really clicked when I realized that tasks run under Task.Supervisor are always linked to that supervisor. Task.Supervisor.async and Task.Supervisor.async_nolink are used to decide whether to link (or not) to the caller of those functions, which should be a GenServer that should probably do something about the messages from the tasks.
Then I also realized that if I want to do any extra bookkeeping after the tasks complete, it has to be done in some GenServer, not in a supervisor.
Once I’m done with my stuff, I’d like to work on a PR to improve the docs for the Task.Supervisor.
I was following this thread due to how common this use case is as expressed by @stefanchrobot and and the solution @josevalim gave seemed interesting and simple.
It took me a while to grasp but this is the code. Please correct me if there is anything I misunderstood:
Crete the project
mix new stask --sup
cd stask
mix deps.get
The supervision tree
defmodule Stask.Application do
@moduledoc false
use Application
def start(_type, _args) do
children = [
{Task.Supervisor,
name: Stask.TaskSupervisor,
restart: :temporary
}
]
opts = [strategy: :one_for_one, name: Stask.Supervisor]
Supervisor.start_link(children, opts)
end
end
The basic task
defmodule Stask do
def some_task(text) do
# Some hard work
Process.sleep(15000)
# Final result
{:ok, "Hello #{text}"}
end
end
The GenServer
defmodule Stask.Server do
use GenServer
## Client API
def start() do
GenServer.start(__MODULE__, nil, name: __MODULE__)
end
def execute_task(pid, name, task_timeout) do
GenServer.cast(pid, {:execute, name, task_timeout})
end
## Server Callbacks
def init(_) do
Process.flag(:trap_exit, true)
{:ok, %{msg: "", timer: "", timeout: ""}}
end
def handle_cast({:execute, name, task_timeout}, state) do
task = Task.Supervisor.async(Stask.TaskSupervisor, Stask, :some_task, [name])
timer_ref = Process.send_after(self(), {:kill, task}, task_timeout)
{:noreply, %{state | timer: timer_ref}}
end
## Handle info functions
# This handle function executes when the task has timed out
def handle_info({:kill, task}, state) do
# Do some book keeping here
IO.puts("Task has been canceled due to time out")
# You can specify how much time you can wait a task to exit. If it
# does not exit in this treshold time it will be killed
# The default time is 5000ms. You can have Task.shutdown(task, 2000).
# You can also kill it immediatly. Task.shutdown(task, :brutal_kill)
# Check documentation: https://hexdocs.pm/elixir/1.5/Task.html#shutdown/2
case Task.shutdown(task) do
{:ok, _reply} ->
# Do some book keeping, the task responded while the task was
# been shutdown
IO.puts("The task responded while it was shutting down")
{:exit, _reason} ->
# Do seme book keeping, if task dies while it was waiting for
# shutdown
IO.puts("The task died before just it was shutdown")
nil ->
# Do some book kepping, the task was shutdown
IO.puts("The task was shutdown")
end
{:noreply, state}
end
# This handle_info functions receives the message when the task
# finished successfully in time
def handle_info({_ref, {:ok, msg}}, state) do
# Do some book keeping, the task finish successfully
IO.puts("Task finished successfully")
Process.cancel_timer(state.timer)
{:noreply, %{state | msg: msg}}
end
# Once the task finised successfully, it exits normally.
# This handle_info function responds to this message.
def handle_info({:EXIT, _pid, :normal}, state) do
# Do some book keeping
IO.puts("The task exited and finished normally")
{:noreply, state}
end
# Finally, when the task, the caller receives a DOWN message.
# In this case the caller was the GenServer.
def handle_info({:DOWN, _ref, :process, _pid, :normal}, state) do
# Do some book keeping one the task goes down
IO.puts(state.msg)
{:noreply, state}
end
end
I would put Stask.Server under the same supervisor in the Stask.Application.
Unless you intend to process a single message at a time, you should probably keep a map task.pid -> timer_ref in the GenServer.
When killing the task due to timeout, I opted in for Process.exit(task.pid, :kill), since I wanted a hard timeout. It makes that handle_info much simpler - you don’t have to repeat the logic from handling the :DOWN message.
@josevalim I’m not sure what’s the criteria for picking Task.Supervisor.async_nolink vs Task.Supervisor.async. Can you shed some light on that?
In the first case, I don’t need to trap exits and only have to handle the :DOWN message. In the latter, I need to handle both :DOWN and :EXIT. My GenServer is linking to some other processes (RabbitMQ connection), so doing async_nolink seems more convenient (if RabbitMQ connection goes down, I need my GenServer to go down as well).
So based on your first two comments the use case was to have a single GenServer coordinating the execution of async tasks?
I think that @josevalim suggested the Task.Supervisor.async because it links the task to the caller and you will be able to trap the exit signal to do the book keeping. The Task.Supervisor.async_nolink will not link the task to the caller so you won’t be able to know when the task fails.
If you create a task using async_nolink inside an OTP behaviour like GenServer, you should match on the message coming from the task inside your GenServer.handle_info/2 callback.
The reply sent by the task will be in the format {ref, result}, where ref is the monitor reference held by the task struct and result is the return value of the task function.
Keep in mind that, regardless of how the task created with async_nolink terminates, the caller’s process will always receive a :DOWN message with the same ref value that is held by the task struct. If the task terminates normally, the reason in the :DOWN message will be :normal.
Oh… I missed the “Compatibility with OTP behaviours” section. I read the async_nolink/3 documentation and missed the async_nolink/4.
So async_nolink will not link the task to the caller. Without the link, the caller will not receive the :EXIT signal and therefor it will not crash if the task crashes but the caller will always receive the :DOWN message with the result once the task finishes… cool.