Basically ditch Task.await
and simply grab the results in handle_info/2
. If your tasks have a habit of abnormally exiting, trap exits and process the :EXIT
messages accordingly.
defmodule Charlie do
use GenServer
defp new_task do
max_time = 5000
work =
case {(:rand.uniform max_time), (:rand.uniform max_time)} do
{work, panic} when work < panic ->
fn ->
Process.sleep work
work # return the work time as the result
end
{_, panic} ->
fn ->
Process.sleep panic
exit(:panic) # simulate non-normal exit
end
end
Task.async work
end
defp add_task(tasks, task),
do: Map.put tasks, task.pid, task
defp purge_task(tasks, pid) do
case Map.get tasks, pid, :none do
:none ->
{tasks, :none}
task ->
Process.demonitor task.ref, [:flush] # purge related :DOWN messages
{(Map.delete tasks, pid), task}
end
end
defp shutdown_tasks(tasks) do
tasks
|> Map.values()
|> Enum.each(&(Task.shutdown &1, :brutal_kill))
end
## callbacks
def handle_info(:next_task, {tasks, _ref}) do
# time to create another task
new_tasks =
cond do
(length (Map.keys tasks)) < 50 ->
add_task tasks, new_task()
true ->
tasks
end
timer_ref = Process.send_after self(), :next_task, 500
{:noreply, {new_tasks, timer_ref}}
end
def handle_info({ref, result}, state) when is_reference ref do
# handle Task completion
Process.demonitor ref, [:flush] # purge related :DOWN messages ASAP
IO.puts "Task #{inspect ref} completed with result #{result}"
{:noreply, state} # remove task when :EXIT :normal is processed
end
def handle_info({:EXIT, from, reason}, {tasks, timer_ref} = state) do
# Handle exit signal not handled by the behaviour (i.e. signal from parent process)
case purge_task tasks, from do
{_, :none} ->
IO.puts "Unknown :EXIT #{inspect reason}"
case reason do
:normal ->
{:noreply, state}
_ ->
{:stop, reason, state} # Follow protocol: unknown non-normal exit signal - time to terminate
end
{new_tasks, task} ->
case reason do
:normal ->
:ok
_ -> # The task panicked
IO.puts "Task #{inspect task.ref} panicked: #{inspect reason}"
end
{:noreply, {new_tasks, timer_ref}}
end
end
def init(_args) do
Process.flag(:trap_exit, true)
Kernel.send self(), :next_task # start spinning off tasks
{:ok, {%{}, :none}}
end
def terminate(_reason, {tasks, timer_ref}) do
IO.puts "Terminating"
cond do
is_reference timer_ref ->
Process.cancel_timer timer_ref
true ->
0 # no timer, therefore no time left
end
shutdown_tasks tasks
:ok
end
## public interface
def start_link do
GenServer.start_link(__MODULE__, [])
end
def stop(pid) do
GenServer.stop pid
end
# NOTE:
# https://hexdocs.pm/elixir/Task.html#await/2-compatibility-with-otp-behaviours
# It is not recommended to await a long-running task inside an OTP behaviour such as GenServer.
# Instead, you should match on the message coming from a task inside your
# GenServer.handle_info/2 callback.
#
end
.
$iex -S mix
iex(1)> Process.flag :trap_exit, true
false
iex(2)> {:ok,pid} = Charlie.start_link
{:ok, #PID<0.127.0>}
Task #Reference<0.0.6.653> panicked: :panic
Task #Reference<0.0.6.651> completed with result 1306
Task #Reference<0.0.6.647> panicked: :panic
Task #Reference<0.0.6.658> panicked: :panic
Task #Reference<0.0.5.605> panicked: :panic
Task #Reference<0.0.6.649> panicked: :panic
Task #Reference<0.0.6.661> completed with result 464
Task #Reference<0.0.6.668> panicked: :panic
Task #Reference<0.0.6.677> panicked: :panic
Task #Reference<0.0.6.673> panicked: :panic
Task #Reference<0.0.6.682> completed with result 957
Task #Reference<0.0.6.684> completed with result 665
Task #Reference<0.0.6.675> panicked: :panic
iex(3)> Process.exit pid, :whatever
Terminating
true
iex(4)>