How I wouldn’t do it.
The problem is that the same result is delivered via two separate channels
- as a function return value
- as a message if it resolves late.
From a maintenance perspective that’s a mess - a single channel of delivery should be enough.
# file: my_app/lib/demo.ex
#
defmodule Demo do
use GenServer
def start_link(args) do
GenServer.start_link(__MODULE__, args, name: __MODULE__)
end
def init(args) do
with {:ok, name} <- Keyword.fetch(args, :name) do
{:ok, {name, nil, nil, false}}
else
_ ->
{:stop, :badarg}
end
end
def handle_call({:launch, arg}, from, {name, nil, nil, _}) do
handles = launch_task(name, arg)
{:noreply, {name, handles, from, false}}
end
@max_wait_ms 500
def handle_info({:DOWN, mon, _, down_pid, reason}, {name, handles, reply_to, pending} = state) do
# :DOWN message arrives when task exits
case handles do
{%Task{ref: ^mon} = task, timeout_ref} ->
Process.cancel_timer(timeout_ref)
Process.demonitor(mon, [:flush])
IO.puts("DOWN: #{inspect(down_pid)} #{inspect(reason)}")
if(reason == :normal) do
{:noreply, {name, {task, nil}, reply_to, pending}}
else
reply(reply_to, {:result, {:error, reason}}, pending)
{:noreply, {name, nil, nil, nil}}
end
_ ->
IO.puts("REDUNDANT DOWN: #{inspect(down_pid)} #{inspect(reason)}")
{:noreply, state}
end
end
def handle_info({:pending_task, mon}, {name, handles, reply_to, pending} = state) do
# first timeout expired
case {handles, pending} do
{{%Task{ref: ^mon} = task, _}, false} ->
ref = Process.send_after(self(), {:kill_task, mon}, @max_wait_ms)
reply(reply_to, {:result, :pending}, pending)
IO.puts("First time out: #{inspect(task)}")
{:noreply, {name, {task, ref}, reply_to, true}}
_ ->
IO.puts("REDUNDANT first timeout")
{:noreply, state}
end
end
def handle_info({:kill_task, mon}, {name, handles, reply_to, pending} = state) do
# last timeout expired
case handles do
{%Task{ref: ^mon} = task, _} ->
Process.demonitor(mon, [:flush])
kill_task(task)
reply(reply_to, {:result, {:error, :timeout}}, pending)
IO.puts("Timed out: #{inspect(task)}")
{:noreply, {name, nil, nil, nil}}
_ ->
IO.puts("REDUNDANT timeout")
{:noreply, state}
end
end
def handle_info({mon, value}, {name, handles, reply_to, pending} = state)
when is_reference(mon) do
# normal task result arrives here - demonitor/flush :normal :DOWN
case handles do
{%Task{ref: ^mon}, timeout_ref} ->
if timeout_ref do
Process.cancel_timer(timeout_ref)
Process.demonitor(mon, [:flush])
end
reply(reply_to, {:result, {:ok, value}}, pending)
IO.puts("result: #{inspect(value)}")
{:noreply, {name, nil, nil, nil}}
_ ->
IO.puts("REDUNDANT result: #{inspect(value)} with #{inspect(mon)}")
{:noreply, state}
end
end
def handle_info(msg, state) do
IO.inspect(msg)
{:noreply, state}
end
defp reply(reply_to, reply, false),
do: GenServer.reply(reply_to, reply)
defp reply({pid, _}, reply, _),
do: send(pid, reply)
# ---
@timely_timeout div(@max_wait_ms, 2)
@late_timeout @max_wait_ms + @timely_timeout
@too_late_timeout 3 * @max_wait_ms
defp launch_task(name, arg) do
%Task{ref: mon} = task = Task.Supervisor.async_nolink(name, __MODULE__, :some_work, [arg])
ref = Process.send_after(self(), {:pending_task, mon}, @max_wait_ms)
{task, ref}
end
defp kill_task(%Task{pid: pid}),
do: Process.exit(pid, :kill)
def some_work(:timely = type) do
Process.sleep(@timely_timeout)
type
end
def some_work(:late = type) do
Process.sleep(@late_timeout)
type
end
def some_work(:too_late = type) do
Process.sleep(@too_late_timeout)
type
end
def some_work(:crash = type) do
Process.sleep(@timely_timeout)
exit(type)
end
def some_work(:late_crash = type) do
Process.sleep(@late_timeout)
exit(type)
end
def some_work(:too_late_crash = type) do
Process.sleep(@too_late_timeout)
exit(type)
end
# --- Demo API ---
def launch(arg) do
{:result, result} = GenServer.call(__MODULE__, {:launch, arg})
result
end
end
$ iex -S mix
Erlang/OTP 22 [erts-10.5] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [hipe] [dtrace]
Compiling 1 file (.ex)
Interactive Elixir (1.9.1) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> Demo.launch(:timely)
result: :timely
{:ok, :timely}
iex(2)> Demo.launch(:late)
:pending
iex(3)> First time out: %Task{owner: #PID<0.145.0>, pid: #PID<0.150.0>, ref: #Reference<0.3242099791.3754688513.92878>}
result: :late
flush
{:result, {:ok, :late}}
:ok
iex(4)> Demo.launch(:too_late)
First time out: %Task{owner: #PID<0.145.0>, pid: #PID<0.153.0>, ref: #Reference<0.3242099791.3754688516.94279>}
:pending
iex(5)> Timed out: %Task{owner: #PID<0.145.0>, pid: #PID<0.153.0>, ref: #Reference<0.3242099791.3754688516.94279>}
flush
{:result, {:error, :timeout}}
:ok
iex(6)> Demo.launch(:crash)
16:51:58.678 [error] Task #PID<0.156.0> started from Demo terminating
** (stop) :crash
(my_app) lib/demo.ex:142: Demo.some_work/1
(elixir) lib/task/supervised.ex:90: Task.Supervised.invoke_mfa/2
(elixir) lib/task/supervised.ex:35: Task.Supervised.reply/5
(stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Function: &Demo.some_work/1
Args: [:crash]
DOWN: #PID<0.156.0> :crash
{:error, :crash}
iex(7)> Demo.launch(:late_crash)
First time out: %Task{owner: #PID<0.145.0>, pid: #PID<0.158.0>, ref: #Reference<0.3242099791.3754688516.94341>}
:pending
iex(8)>
16:52:10.607 [error] Task #PID<0.158.0> started from Demo terminating
** (stop) :late_crash
(my_app) lib/demo.ex:147: Demo.some_work/1
(elixir) lib/task/supervised.ex:90: Task.Supervised.invoke_mfa/2
(elixir) lib/task/supervised.ex:35: Task.Supervised.reply/5
(stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3
Function: &Demo.some_work/1
Args: [:late_crash]
DOWN: #PID<0.158.0> :late_crash
flush
{:result, {:error, :late_crash}}
:ok
iex(9)> Demo.launch(:too_late_crash)
First time out: %Task{owner: #PID<0.145.0>, pid: #PID<0.161.0>, ref: #Reference<0.3242099791.3754688516.94370>}
:pending
iex(10)> Timed out: %Task{owner: #PID<0.145.0>, pid: #PID<0.161.0>, ref: #Reference<0.3242099791.3754688516.94370>}
flush
{:result, {:error, :timeout}}
:ok
iex(11)>