A slightly more complicated example:
# lib/my_app/application.ex
defmodule MyApp.Application do
use Application
def start(_type, _args) do
app = Application.get_application(__MODULE__)
name = MyApp.TaskSupervisor
children = [
{Task.Supervisor, name: name}, # use Task.Supervisor to protect
{Worker, app: app, name: name} # Worker GenServer process from harm
]
opts = [strategy: :rest_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
def prep_stop(state) do
IO.puts("prep_stop: #{state}")
state
end
def stop(state) do
IO.puts("stop: #{state}")
:ok
end
end
# lib/worker.ex
defmodule Worker do
use GenServer
def start_link(args) do
GenServer.start_link(__MODULE__, args);
end
def init(args) do
with {:ok, app} <- Keyword.fetch(args, :app),
{:ok, name} <- Keyword.fetch(args, :name) do
ref = Process.send_after(self(), :init, 1000)
{:ok, {app, name, ref}}
else
_ ->
{:stop, :badarg}
end
end
def handle_info(:init, {app, name, _}) do
# launch the task
IO.puts("Pre-launch: #{inspect app} #{inspect name}")
task_ref = launch_task(name)
{:noreply, {app, name, task_ref}}
end
def handle_info({task_ref, result}, {app, name, task_ref}) do
# normal task result arrives here - demonitor/flush :normal :DOWN
Process.demonitor(task_ref, [:flush])
IO.puts("Result: #{inspect result}")
{:stop, :normal, {app, name, nil}}
end
def handle_info({:DOWN, _down_ref, :process, pid, reason}, state) do
# :DOWN message arrives when task exits without result
# reason won't be :normal
IO.puts("DOWN: #{inspect pid} #{inspect reason}")
{:stop, reason, state}
end
def handle_info(msg, state) do
IO.inspect msg
{:noreply, state}
end
def terminate(reason, {app, _, _}) do
IO.puts("terminate: #{inspect reason}")
Application.stop(app) # experiment complete
end
defp launch_task(name) do
# async_nolink: won't force GenServer process to exit in case of any task exit
args = [name, [1000, -500, 2000]] # expected: [ok: 1000, exit: :crash, exit: :timeout]
task = Task.Supervisor.async_nolink(name, __MODULE__, :stream_tasks, args)
task.ref
end
# Single task being blocked (instead of GenServer process)
# until all elements in the list have been processed
def stream_tasks(name, list) do
# uncomment next line to cause :DOWN message instead of result
# exit(:crash_start)
options = [timeout: 1500, on_timeout: :kill_task] # options to kill JUST "2000" task
name
|> Task.Supervisor.async_stream_nolink(list, __MODULE__, :stream_fun, [], options)
|> Enum.to_list()
end
# Function being run as a task on each element in the list
def stream_fun(delay) when is_integer(delay) do
{timeout, crash} =
cond do
delay >= 0 ->
{delay, false}
true ->
{-delay, true}
end
Process.sleep(timeout)
cond do
crash ->
exit(:crash) # crash the task
true ->
timeout # return result
end
end
end
$ iex -S mix
Erlang/OTP 20 [erts-9.3] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:10] [hipe] [kernel-poll:false] [dtrace]
Compiling 2 files (.ex)
Interactive Elixir (1.6.3) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)>
Pre-launch: :my_app MyApp.TaskSupervisor
[error] Task #PID<0.133.0> started from #PID<0.130.0> terminating
** (stop) :crash
(my_app) lib/worker.ex:81: Worker.stream_fun/1
(elixir) lib/task/supervised.ex:88: Task.Supervised.do_apply/2
(elixir) lib/task/supervised.ex:38: Task.Supervised.reply/5
(stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Function: &Worker.stream_fun/1
Args: [-500]
Result: [ok: 1000, exit: :crash, exit: :timeout]
terminate: :normal
prep_stop:
stop:
[info] Application my_app exited: :stopped