Saša Jurić's "Beyond Task.Async" blog

So here is an approach where you essentially handle the spawned processes by yourself.

 # File: calc.ex
defmodule Calc do
  @spec run(Integer.t) :: Integer.t
  def run(id) when id > 0 do
    desc = "#{id} (#{inspect self()})"
    IO.puts (" sleeping: " <> desc)
    Process.sleep(id)
    IO.puts (" waking from slumber: " <> desc)
    id
  end
end

# File: accumulator.ex
defmodule Accumulator do
  def sum(addend, augend) do
    Process.sleep(500) # NOTE: This is responsible for some tasks
    addend + augend    # running beyond the timeout.
  end                  # The point is that only messages
end                    # up to the timeout message are actually processed
                       # See what happens when you comment "sleep" out

# File map_reduce.ex
defmodule MapReduce do
  use GenServer

  defp map_spawn(fun, reply_to),
    do: fn elem ->
          Kernel.spawn_monitor(
            fn ->
              reply =
                try do
                  {self(), {:ok, fun.(elem)}}
                rescue _ ->
                  {self(), :error}
                end
              GenServer.cast reply_to, reply
            end
          )
        end

  defp put_task({pid, ref}, tasks),
    do: Map.put tasks, pid, ref

  defp remove_task(tasks, pid),
    do: Map.delete tasks, pid

  defp purge_task({pid, ref}) do
    # No longer interested in the process
    # or the associated :DOWN messages
    Process.demonitor ref, [:flush]

    result =
      cond do
        Process.alive? pid ->
          Process.exit pid, :kill
        true ->
          false # no need
      end

    IO.puts "  purge task #{inspect pid}: #{inspect result}"

    result
  end

  defp cleanup_tasks(tasks) do
    tasks
    |> Map.to_list()
    |> Enum.each(&purge_task/1)
  end

  defp assess_progress({tasks, _, _, _, _} = state) do
    cond do
      tasks == %{} ->
        {:stop, :normal, state}
      true ->
        {:noreply, state}
    end
  end

  defp handle_result({tasks, acc, reduce, reply_to, timer_ref}, {pid, result}) do
    new_acc =
      case result do
        {:ok, value} ->
          reduce.(acc, value)
        :error ->
          acc # TODO account for errors in the reply to creator
      end

    assess_progress {(remove_task tasks, pid), new_acc, reduce, reply_to, timer_ref}
  end

  def handle_down({tasks, acc, reduce, reply_to, timer_ref}, pid, reason) do
    IO.puts "Task #{inspect pid} terminated with reason: #{inspect reason}"
    # TODO: should indicate in reply to creator that there are DOWN tasks
    assess_progress {(remove_task tasks, pid), acc, reduce, reply_to, timer_ref}
  end

  ## callbacks: message handlers
  def handle_cast({pid, _} = reply, {tasks, _, _, _, _} = state) when is_pid pid do
    # process result
    case Map.get tasks, pid, :none do
      :none ->
        {:noreply, state}
      ref ->
        Process.demonitor ref, [:flush] # purge :DOWN message
        handle_result state, reply
    end
  end
  def handle_cast(:init, {data, init, fun, reduce, reply_to, timeout}) do
    tasks =
      data
      |> Enum.map(map_spawn(fun, self()))
      |> Enum.reduce(%{}, &put_task/2)

    timer_ref = Process.send_after self(), :timeout, timeout
    {:noreply, {tasks, init, reduce, reply_to, timer_ref}}
  end

  def handle_info(:timeout, {tasks, acc, reduce, reply_to, _timer_ref}) do
    IO.puts "  !!! timeout !!!"
    {:stop, :normal, {tasks, acc, reduce, reply_to, :none}}
  end
  def handle_info({:DOWN, _ref, :process, pid, reason}, state) do
    handle_down state, pid, reason
  end

  ## callbacks: lifecycle
  def init(arg) do
    GenServer.cast self(), :init   # return ASAP, i.e. delay initialization
    {:ok, arg}
  end

  def terminate(reason, {tasks, acc, _reduce, reply_to, timer_ref}) do
    reply =
      case {reason, timer_ref} do
        {:normal, :none} ->
          {:timeout, self(), acc}
        {:normal, _} ->
          Process.cancel_timer timer_ref
          {:ok, self(), acc}
        _ ->
          {:error, self(), reason}
      end

    cleanup_tasks tasks
    Kernel.send reply_to, reply

    IO.puts "  unprocessed messages - #{inspect (Process.info self(), :message_queue_len)}"
  end

  ## public interface
  def start(data, init, fun, reduce, timeout) do
    args = {data, init, fun, reduce, self(), timeout}
    # Note:  [spawn_opt: :monitor] is not allowed
    # http://erlang.org/doc/man/gen_server.html#start_link-4
    #
    case GenServer.start __MODULE__, args do
      {:ok, pid} ->
        ref = Process.monitor pid
        {pid, ref}
      other ->
        other
    end
  end

  def run_demo(diff \\ 0, timeout \\ 900) do
    data = Enum.map 1..10, fn _ -> Enum.random(1..1000) - diff end
    start data, 0, &Calc.run/1, &Accumulator.sum/2, timeout
  end

end

.

$ iex -S mix
iex(1)> MapReduce.run_demo 700
 sleeping: 6 (#PID<0.120.0>)
 sleeping: 126 (#PID<0.122.0>)
 sleeping: 130 (#PID<0.125.0>)
 sleeping: 152 (#PID<0.127.0>)
 sleeping: 289 (#PID<0.128.0>)
{#PID<0.118.0>, #Reference<0.0.4.573>}
 waking from slumber: 6 (#PID<0.120.0>)
 waking from slumber: 126 (#PID<0.122.0>)
 waking from slumber: 130 (#PID<0.125.0>)
 waking from slumber: 152 (#PID<0.127.0>)
 waking from slumber: 289 (#PID<0.128.0>)
  unprocessed messages - {:message_queue_len, 1}
iex(2)> flush()
{:ok, #PID<0.118.0>, 703}
{:DOWN, #Reference<0.0.4.573>, :process, #PID<0.118.0>, :normal}
:ok
iex(3)> MapReduce.run_demo 0, 500
{#PID<0.131.0>, #Reference<0.0.5.441>}
 sleeping: 203 (#PID<0.132.0>)
 sleeping: 315 (#PID<0.133.0>)
 sleeping: 944 (#PID<0.134.0>)
 sleeping: 959 (#PID<0.135.0>)
 sleeping: 304 (#PID<0.136.0>)
 sleeping: 783 (#PID<0.137.0>)
 sleeping: 407 (#PID<0.138.0>)
 sleeping: 117 (#PID<0.139.0>)
 sleeping: 44 (#PID<0.140.0>)
 sleeping: 564 (#PID<0.141.0>)
 waking from slumber: 44 (#PID<0.140.0>)
 waking from slumber: 117 (#PID<0.139.0>)
 waking from slumber: 203 (#PID<0.132.0>)
 waking from slumber: 304 (#PID<0.136.0>)
 waking from slumber: 315 (#PID<0.133.0>)
 waking from slumber: 407 (#PID<0.138.0>)
 waking from slumber: 564 (#PID<0.141.0>)
 waking from slumber: 783 (#PID<0.137.0>)
 waking from slumber: 944 (#PID<0.134.0>)
 waking from slumber: 959 (#PID<0.135.0>)
  !!! timeout !!!
  kill task #PID<0.134.0>: false
  kill task #PID<0.135.0>: false
  kill task #PID<0.137.0>: false
  kill task #PID<0.141.0>: false
  unprocessed messages - {:message_queue_len, 4}
iex(4)> flush()
{:timeout, #PID<0.131.0>, 1390}
{:DOWN, #Reference<0.0.5.441>, :process, #PID<0.131.0>, :normal}
:ok
iex(5)> 

In the end though you shouldn’t spin off processes just because you can. Make sure there is a “good enough reason” for the process to exist. Have a really good, long look at The Erlangelist: To spawn, or not to spawn?

Edit: Added some more IO.puts to reveal more information to help explain behaviour that may seem strange on first blush.

1 Like