Saša Jurić's "Beyond Task.Async" blog

I am trying to follow this blog Beyond Task.Async. So far I have got the following code working:

defmodule Aggregator do
  @moduledoc """
  Testing async tasks
  """

  def new, do: 0

  def aggregate(result), do: result
  def aggregate(aggregator, result) do
    Process.sleep(500)
    aggregator + result
  end
end

defmodule Calc do
  @moduledoc """
  Testing async tasks
  """

  @spec run(Integer.t) :: Integer.t
  def run(id) when id > 0 do
    IO.puts "\n\n sleeping: #{id}"
    Process.sleep(id)
    IO.puts "\n\n waking from slumber: #{id}"
    id
  end
end

defmodule Async do
  @moduledoc """
  Testing async tasks
  """

  def run(diff \\ 0, timeout \\ 900) do
    1..10
    |> Enum.map(fn _ -> Enum.random(1..1000) - diff end)
    |> Enum.map(&Task.async(fn ->
          try do
            {:ok, Calc.run(&1)}
          rescue _ ->
            :error
          end
       end))
    |> collect_result timeout
  end

  defp collect_result(tasks, timeout) do
    ref = make_ref()
    timer = Process.send_after(self(), {:timeout, ref}, timeout)

    try do
      collect_result(tasks, Aggregator.new, ref)
    after
      :erlang.cancel_timer(timer)
      receive do
        {:timeout, ^ref} ->
          :ok

      after 0 ->
        :ok
      end
    end
  end
  defp collect_result([], aggregator, _), do: {:ok, Aggregator.aggregate(aggregator)}
  defp collect_result(tasks, aggregator, ref) do
    receive do
      {:timeout, ^ref} ->
        {:timeout, Aggregator.aggregate(aggregator)}

      msg ->
        case Task.find(tasks, msg) do
          {{:ok, result}, task} ->
              collect_result(
                List.delete(tasks, task),
                Aggregator.aggregate(aggregator, result),
                ref
              )

          # if task errors, stop monitoring task and ignore its result
          {:error, task} ->
            collect_result(List.delete(tasks, task), aggregator, ref)

          nil ->
            collect_result(tasks, aggregator, ref)
        end
    end
  end
end

So when I run: iex(295)> Async.run 700, I get {:ok, 489}

However, I read that Task.find was deprecated in favour of explicit message matching. So I refactored collect_result/3 to:

  defp collect_result(tasks, aggregator, ref) do
    receive do
      {:timeout, ^ref} ->
        {:timeout, Aggregator.aggregate(aggregator)}

      {task, {:ok, result}} ->
        collect_result(
          List.delete(tasks, task),
          Aggregator.aggregate(aggregator, result),
          ref
        )

      # if task errors, stop monitoring task and ignore its result
      {task, :error} ->
        collect_result(List.delete(tasks, task), aggregator, ref)
    end
  end

Now Async.run times out no matter the value of diff. For examples, iex(306)> Async.run 700 returned {:timeout, 260}, which is ridiculous since Calc.run(260) was the only successful task.

I can’t seem to figure out how to refactor the code using explicit message matching and still get same behaviour as first iteration of the code. Any help from the community will be greatly appreciated.

Ok, I got it working. I forgot that Task.async returns %Task{owner: owner, ref: ref, ...} so that List.delete(tasks, task) did not delete the task since tasks = [%Task{}, %Task{}, ...], but task is a reference. So I got the tasks element corresponding to task using the code below:

defp get_matched_task(task, tasks) do
    Enum.find(tasks, nil, fn task_struct ->
      case task_struct do
        %Task{ref: ^task} ->
          true
        _ ->
          false
      end
    end)
  end

And so collect_result/3 becomes:

defp collect_result(tasks, aggregator, timer_ref) do
    receive do
      {:timeout, ^timer_ref} ->
        {:timeout, Aggregator.aggregate(aggregator)}

      {task, val} ->
        task_ = get_matched_task(task, tasks)
        case val do
          {:ok, result} ->
            collect_result(
              List.delete(tasks, task_),
              Aggregator.aggregate(aggregator, result),
              timer_ref
            )

          _ ->
            collect_result(List.delete(tasks, task_), aggregator, timer_ref)
        end

      _ ->
        collect_result(tasks, aggregator, timer_ref)
    end
  end

Now I the timeout message is sent only when a task runs for longer than the timeout. But I’m thinking may be there is a better way of achieving same result. Any help in this regard will be appreciated.

2 Likes

Might as well use a Map

defmodule Async do
  @moduledoc """
  Testing async tasks
  """

  def run(diff \\ 0, timeout \\ 900) do
    1..10
    |> Enum.map(fn _ -> Enum.random(1..1000) - diff end)
    |> Enum.map(&Task.async(fn ->
          try do
            {:ok, Calc.run(&1)}
          rescue _ ->
            :error
          end
        end))
    |> Enum.reduce(%{}, &(Map.put &2, &1.ref, &1)) # 1. Put tasks into a map indexed by monitor reference
    |> collect_result(timeout)
  end

  defp collect_result(tasks, timeout) do
    ref = make_ref()
    timer = Process.send_after(self(), {:timeout, ref}, timeout)

    try do
      collect_result(tasks, Aggregator.new, ref)
    after
      :erlang.cancel_timer(timer)
      receive do
        {:timeout, ^ref} ->
          :ok
      after 0 ->
        :ok
      end
    end
  end
  defp collect_result(tasks , aggregator, _) when tasks == %{} do
    {:ok, Aggregator.aggregate(aggregator)} # 5. no more tasks to process
  end
  defp collect_result(tasks, aggregator, ref) do
    receive do
      {:timeout, ^ref} ->
        (Map.values tasks) # 4. clean up remaining tasks
        |> Enum.each(&(Task.shutdown &1, :brutal_kill))
        |> Enum.each(&(Process.demonitor &1.ref, [:flush]))

        {:timeout, Aggregator.aggregate(aggregator)}

      {task_ref, reply} ->
        cond do
          Map.has_key? tasks, task_ref ->        # 2. verify source
            Process.demonitor task_ref, [:flush] # 3. purge :DOWN message
            new_aggregator =
              case reply do
                {:ok, result} ->
                  Aggregator.aggregate aggregator, result
                :error ->
                  aggregator
              end
            tasks
            |> Map.delete(task_ref)
            |> collect_result(new_aggregator, ref)

          true ->
            collect_result(tasks, aggregator, ref)
        end
    end
  end
end

What particular avenue of improvement did you have in mind?

At this point I’m wondering whether

  1. A GenServer could clean this up
  2. Once GenServers are in the picture whether I can actually get rid of Task entirely

The more I’m exposed to Task the more I’m predisposed to shoving it over into the corner with Agent as a nice curiosity which ultimately isn’t all that useful.

1 Like

It has a few limited uses, I do not think the use with GenServer is such a use though and GenServer should just be used. :slight_smile:

I’ve legit used Task only a couple of times total, and I could have easily done them another way (which really might have been shorter code too).

Thanks @peerreynders. Your corrections worked. I asked if there was a better way because I happened to look at Task.find after I had written get_matched_task/2 and they looked similar. Since Task.find was deprecated, I thought a function which does almost the same thing had to be defective.

I really like the cleanups you introduced. It solved a headache I was having where I’d get {:DOWN, _, _, _, _} messages and couldn’t figure out where they were coming from. I tried using observer.start to see if I could catch them, but I guess since those processes had exited before I turned to observer, I wouldn’t find them.

I am truly grateful. Thanks

So here is an approach where you essentially handle the spawned processes by yourself.

 # File: calc.ex
defmodule Calc do
  @spec run(Integer.t) :: Integer.t
  def run(id) when id > 0 do
    desc = "#{id} (#{inspect self()})"
    IO.puts (" sleeping: " <> desc)
    Process.sleep(id)
    IO.puts (" waking from slumber: " <> desc)
    id
  end
end

# File: accumulator.ex
defmodule Accumulator do
  def sum(addend, augend) do
    Process.sleep(500) # NOTE: This is responsible for some tasks
    addend + augend    # running beyond the timeout.
  end                  # The point is that only messages
end                    # up to the timeout message are actually processed
                       # See what happens when you comment "sleep" out

# File map_reduce.ex
defmodule MapReduce do
  use GenServer

  defp map_spawn(fun, reply_to),
    do: fn elem ->
          Kernel.spawn_monitor(
            fn ->
              reply =
                try do
                  {self(), {:ok, fun.(elem)}}
                rescue _ ->
                  {self(), :error}
                end
              GenServer.cast reply_to, reply
            end
          )
        end

  defp put_task({pid, ref}, tasks),
    do: Map.put tasks, pid, ref

  defp remove_task(tasks, pid),
    do: Map.delete tasks, pid

  defp purge_task({pid, ref}) do
    # No longer interested in the process
    # or the associated :DOWN messages
    Process.demonitor ref, [:flush]

    result =
      cond do
        Process.alive? pid ->
          Process.exit pid, :kill
        true ->
          false # no need
      end

    IO.puts "  purge task #{inspect pid}: #{inspect result}"

    result
  end

  defp cleanup_tasks(tasks) do
    tasks
    |> Map.to_list()
    |> Enum.each(&purge_task/1)
  end

  defp assess_progress({tasks, _, _, _, _} = state) do
    cond do
      tasks == %{} ->
        {:stop, :normal, state}
      true ->
        {:noreply, state}
    end
  end

  defp handle_result({tasks, acc, reduce, reply_to, timer_ref}, {pid, result}) do
    new_acc =
      case result do
        {:ok, value} ->
          reduce.(acc, value)
        :error ->
          acc # TODO account for errors in the reply to creator
      end

    assess_progress {(remove_task tasks, pid), new_acc, reduce, reply_to, timer_ref}
  end

  def handle_down({tasks, acc, reduce, reply_to, timer_ref}, pid, reason) do
    IO.puts "Task #{inspect pid} terminated with reason: #{inspect reason}"
    # TODO: should indicate in reply to creator that there are DOWN tasks
    assess_progress {(remove_task tasks, pid), acc, reduce, reply_to, timer_ref}
  end

  ## callbacks: message handlers
  def handle_cast({pid, _} = reply, {tasks, _, _, _, _} = state) when is_pid pid do
    # process result
    case Map.get tasks, pid, :none do
      :none ->
        {:noreply, state}
      ref ->
        Process.demonitor ref, [:flush] # purge :DOWN message
        handle_result state, reply
    end
  end
  def handle_cast(:init, {data, init, fun, reduce, reply_to, timeout}) do
    tasks =
      data
      |> Enum.map(map_spawn(fun, self()))
      |> Enum.reduce(%{}, &put_task/2)

    timer_ref = Process.send_after self(), :timeout, timeout
    {:noreply, {tasks, init, reduce, reply_to, timer_ref}}
  end

  def handle_info(:timeout, {tasks, acc, reduce, reply_to, _timer_ref}) do
    IO.puts "  !!! timeout !!!"
    {:stop, :normal, {tasks, acc, reduce, reply_to, :none}}
  end
  def handle_info({:DOWN, _ref, :process, pid, reason}, state) do
    handle_down state, pid, reason
  end

  ## callbacks: lifecycle
  def init(arg) do
    GenServer.cast self(), :init   # return ASAP, i.e. delay initialization
    {:ok, arg}
  end

  def terminate(reason, {tasks, acc, _reduce, reply_to, timer_ref}) do
    reply =
      case {reason, timer_ref} do
        {:normal, :none} ->
          {:timeout, self(), acc}
        {:normal, _} ->
          Process.cancel_timer timer_ref
          {:ok, self(), acc}
        _ ->
          {:error, self(), reason}
      end

    cleanup_tasks tasks
    Kernel.send reply_to, reply

    IO.puts "  unprocessed messages - #{inspect (Process.info self(), :message_queue_len)}"
  end

  ## public interface
  def start(data, init, fun, reduce, timeout) do
    args = {data, init, fun, reduce, self(), timeout}
    # Note:  [spawn_opt: :monitor] is not allowed
    # http://erlang.org/doc/man/gen_server.html#start_link-4
    #
    case GenServer.start __MODULE__, args do
      {:ok, pid} ->
        ref = Process.monitor pid
        {pid, ref}
      other ->
        other
    end
  end

  def run_demo(diff \\ 0, timeout \\ 900) do
    data = Enum.map 1..10, fn _ -> Enum.random(1..1000) - diff end
    start data, 0, &Calc.run/1, &Accumulator.sum/2, timeout
  end

end

.

$ iex -S mix
iex(1)> MapReduce.run_demo 700
 sleeping: 6 (#PID<0.120.0>)
 sleeping: 126 (#PID<0.122.0>)
 sleeping: 130 (#PID<0.125.0>)
 sleeping: 152 (#PID<0.127.0>)
 sleeping: 289 (#PID<0.128.0>)
{#PID<0.118.0>, #Reference<0.0.4.573>}
 waking from slumber: 6 (#PID<0.120.0>)
 waking from slumber: 126 (#PID<0.122.0>)
 waking from slumber: 130 (#PID<0.125.0>)
 waking from slumber: 152 (#PID<0.127.0>)
 waking from slumber: 289 (#PID<0.128.0>)
  unprocessed messages - {:message_queue_len, 1}
iex(2)> flush()
{:ok, #PID<0.118.0>, 703}
{:DOWN, #Reference<0.0.4.573>, :process, #PID<0.118.0>, :normal}
:ok
iex(3)> MapReduce.run_demo 0, 500
{#PID<0.131.0>, #Reference<0.0.5.441>}
 sleeping: 203 (#PID<0.132.0>)
 sleeping: 315 (#PID<0.133.0>)
 sleeping: 944 (#PID<0.134.0>)
 sleeping: 959 (#PID<0.135.0>)
 sleeping: 304 (#PID<0.136.0>)
 sleeping: 783 (#PID<0.137.0>)
 sleeping: 407 (#PID<0.138.0>)
 sleeping: 117 (#PID<0.139.0>)
 sleeping: 44 (#PID<0.140.0>)
 sleeping: 564 (#PID<0.141.0>)
 waking from slumber: 44 (#PID<0.140.0>)
 waking from slumber: 117 (#PID<0.139.0>)
 waking from slumber: 203 (#PID<0.132.0>)
 waking from slumber: 304 (#PID<0.136.0>)
 waking from slumber: 315 (#PID<0.133.0>)
 waking from slumber: 407 (#PID<0.138.0>)
 waking from slumber: 564 (#PID<0.141.0>)
 waking from slumber: 783 (#PID<0.137.0>)
 waking from slumber: 944 (#PID<0.134.0>)
 waking from slumber: 959 (#PID<0.135.0>)
  !!! timeout !!!
  kill task #PID<0.134.0>: false
  kill task #PID<0.135.0>: false
  kill task #PID<0.137.0>: false
  kill task #PID<0.141.0>: false
  unprocessed messages - {:message_queue_len, 4}
iex(4)> flush()
{:timeout, #PID<0.131.0>, 1390}
{:DOWN, #Reference<0.0.5.441>, :process, #PID<0.131.0>, :normal}
:ok
iex(5)> 

In the end though you shouldn’t spin off processes just because you can. Make sure there is a “good enough reason” for the process to exist. Have a really good, long look at The Erlangelist: To spawn, or not to spawn?

Edit: Added some more IO.puts to reveal more information to help explain behaviour that may seem strange on first blush.

1 Like

Thanks a lot. That Genserver code looks so nice - something I will be adding to my tool set. And thanks for the advice about spawning processes. I was trying to wrap my head around how concurrency works in elixir, and your advice was timely because I can see myself overdoing it.

Do have a great day!

Thanks @peerreynders, I learnt a lot :joy: