Proper use of Task.async?

A slightly more complicated example:

# lib/my_app/application.ex
defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    app = Application.get_application(__MODULE__)
    name = MyApp.TaskSupervisor
    children = [
      {Task.Supervisor, name: name}, # use Task.Supervisor to protect
      {Worker, app: app, name: name} # Worker GenServer process from harm
    ]
    opts = [strategy: :rest_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end

  def prep_stop(state) do
    IO.puts("prep_stop: #{state}")
    state
  end

  def stop(state) do
    IO.puts("stop: #{state}")
    :ok
  end
end
# lib/worker.ex

defmodule Worker do
  use GenServer

  def start_link(args) do
    GenServer.start_link(__MODULE__, args);
  end

  def init(args) do
    with {:ok, app} <- Keyword.fetch(args, :app),
         {:ok, name} <- Keyword.fetch(args, :name) do
      ref = Process.send_after(self(), :init, 1000)
      {:ok, {app, name, ref}}
     else
       _ ->
         {:stop, :badarg}
     end
  end

  def handle_info(:init, {app, name, _}) do
    # launch the task
    IO.puts("Pre-launch: #{inspect app} #{inspect name}")
    task_ref = launch_task(name)
    {:noreply, {app, name, task_ref}}
  end
  def handle_info({task_ref, result}, {app, name, task_ref}) do
    # normal task result arrives here - demonitor/flush :normal :DOWN
    Process.demonitor(task_ref, [:flush])
    IO.puts("Result: #{inspect result}")
    {:stop, :normal, {app, name, nil}}
  end
  def handle_info({:DOWN, _down_ref, :process, pid, reason}, state) do
    # :DOWN message arrives when task exits without result
    # reason won't be :normal
    IO.puts("DOWN: #{inspect pid} #{inspect reason}")
    {:stop, reason, state}
  end
  def handle_info(msg, state) do
    IO.inspect msg
    {:noreply, state}
  end

  def terminate(reason, {app, _, _}) do
    IO.puts("terminate: #{inspect reason}")
    Application.stop(app)                   # experiment complete
  end

  defp launch_task(name) do
    # async_nolink: won't force GenServer process to exit in case of any task exit
    args = [name, [1000, -500, 2000]] # expected: [ok: 1000, exit: :crash, exit: :timeout]
    task = Task.Supervisor.async_nolink(name, __MODULE__, :stream_tasks, args)
    task.ref
  end

  # Single task being blocked (instead of GenServer process)
  # until all elements in the list have been processed
  def stream_tasks(name, list) do
    # uncomment next line to cause :DOWN message instead of result
    # exit(:crash_start)
    options = [timeout: 1500, on_timeout: :kill_task] # options to kill JUST "2000" task
    name
    |> Task.Supervisor.async_stream_nolink(list, __MODULE__, :stream_fun, [], options)
    |> Enum.to_list()
  end

  # Function being run as a task on each element in the list
  def stream_fun(delay) when is_integer(delay) do
    {timeout, crash} =
      cond do
        delay >= 0 ->
          {delay, false}
        true ->
          {-delay, true}
      end

    Process.sleep(timeout)

    cond do
      crash ->
        exit(:crash) # crash the task
      true ->
        timeout      # return result
    end
  end

end
$ iex -S mix
Erlang/OTP 20 [erts-9.3] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:10] [hipe] [kernel-poll:false] [dtrace]

Compiling 2 files (.ex)
Interactive Elixir (1.6.3) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> 
Pre-launch: :my_app MyApp.TaskSupervisor

[error] Task #PID<0.133.0> started from #PID<0.130.0> terminating
** (stop) :crash
    (my_app) lib/worker.ex:81: Worker.stream_fun/1
    (elixir) lib/task/supervised.ex:88: Task.Supervised.do_apply/2
    (elixir) lib/task/supervised.ex:38: Task.Supervised.reply/5
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Function: &Worker.stream_fun/1
    Args: [-500]

Result: [ok: 1000, exit: :crash, exit: :timeout]
terminate: :normal
prep_stop: 
stop: 

[info]  Application my_app exited: :stopped
5 Likes