Function for Task.await_first/1 - Replication Concurrency Pattern

I’m watching a video on Go Concurrency Patterns. Here Rob Pike is discussing the Replication pattern:

The pattern is to launch multiple Go routines with the same function and return when you receive the first result.

I’m trying to accomplish the same result using Elixir Tasks. Here is my approach recursively using Task.yield_many/1:

# task_await_one.exs
defmodule TaskTwo do
  def run() do
    tasks_for_a =
      for i <- 1..10 do
        Task.async(fn ->
          Process.sleep(Enum.random(1000..5000))
          IO.puts("Delivering result A from #{i}")
          i
        end)
      end

    await_first(tasks_for_a)
  end

  defp await_first(tasks) do
    IO.puts("Trying...")
    task_results = Task.yield_many(tasks, 100)

    case Enum.find(task_results, fn {_, result} -> result != nil end) do
      nil ->
        # No tasks have completed yet, recurse with the same tasks
        await_first(tasks)

      {_completed_task, {:ok, result}} ->
        # We got a result from one of the tasks
        # Now, shut down all tasks
        Enum.each(tasks, &Task.shutdown(&1, :brutal_kill))
        result
    end
  end
end

{time, result} = :timer.tc(fn -> TaskTwo.run() end)

IO.puts(result)
IO.puts("Execution time: #{time / 1_000_000} seconds")

Does anyone have any ways I could improve my approach? I’m sure I could return more quickly if I wrote my own receive function.

My next iteration based on the video would be to start tasks for three different functions and return the result as soon as each function returned one result. Something like this:

await_first_of_many(tasks_for_a, tasks_for_b, tasks_for_c)

Here is my next iteration which modifies the code in Task.await_many/1.

defmodule TaskFour do
  def run() do
    tasks = Enum.map(["Web", "Image", "Video"], &start_tasks/1)
    Enum.map(tasks, &await_first/1)
  end

  def start_tasks(type) do
    for i <- 1..2 do
      Task.async(fn ->
        Process.sleep(Enum.random(1000..5000))
        IO.puts("Delivering #{type} result from #{i}")
        "#{type} result from #{i}"
      end)
    end
  end

  def await_first(tasks, timeout \\ 5000) do
    awaiting =
      for task <- tasks, into: %{} do
        %Task{ref: ref} = task

        {ref, true}
      end

    timeout_ref = make_ref()

    timer_ref =
      if timeout != :infinity do
        Process.send_after(self(), timeout_ref, timeout)
      end

    try do
      await_first(tasks, timeout, awaiting, nil, timeout_ref)
    after
      timer_ref && Process.cancel_timer(timer_ref)
      receive do: (^timeout_ref -> :ok), after: (0 -> :ok)
    end
  end

  defp await_first(tasks, _timeout, _awaiting, reply, _timeout_ref)
       when not is_nil(reply) do
        IO.puts("Done!")
        
        Enum.each(tasks, &Task.shutdown(&1, :brutal_kill))
    
        reply
  end

  defp await_first(tasks, timeout, awaiting, _reply, timeout_ref) do
    IO.puts("Awaiting...")
    receive do
      ^timeout_ref ->
        demonitor_pending_tasks(awaiting)
        exit({:timeout, {__MODULE__, :await_many, [tasks, timeout]}})

      {:DOWN, ref, _, _proc, reason} when is_map_key(awaiting, ref) ->
        demonitor_pending_tasks(awaiting)
        exit({reason, {__MODULE__, :await_many, [tasks, timeout]}})

      {ref, reply} when is_map_key(awaiting, ref) ->
        Process.demonitor(ref, [:flush])

        await_first(
          tasks,
          timeout,
          Map.delete(awaiting, ref),
          reply,
          timeout_ref
        )
    end
  end

  defp demonitor_pending_tasks(awaiting) do
    Enum.each(awaiting, fn {ref, _} ->
      Process.demonitor(ref, [:flush])
    end)
  end
end

{time, result} = :timer.tc(fn -> TaskFour.run() end)

IO.inspect(result, label: "Results: ")
IO.puts("Execution time: #{time / 1_000_000} seconds")

Here is an example of the output:

Awaiting...
Delivering Web result from 2
Done!
Awaiting...
Delivering Video result from 1
Delivering Image result from 1
Done!
Awaiting...
Done!
Results: : ["Web result from 2", "Image result from 1", "Video result from 1"]
Execution time: 2.466733 seconds

Edit: please see my next message instead.

If your have the need to do that a lot, I would create a specific data structure to represent a collection of collections of tasks.

Here when you call Enum.map(tasks, &await_first/1), each group must be handled individually, which prevents you to just receive anything that comes up.

For instance I would do something like this:

defmodule TaskFour do
  def run() do
    task_groups = Enum.map(["Web", "Image", "Video"], &start_tasks/1)
    await_first_of_each_group(task_groups, 10_000)
  end

  def start_tasks(type) do
    for i <- 1..2 do
      Task.async(fn ->
        Process.sleep(Enum.random(1000..5000))
        IO.puts("Delivering #{type} result from #{i}")
        "#{type} result from #{i}"
      end)
    end
  end

  def await_first_of_each_group(task_groups, timeout \\ 5000) do
    groups =
      Enum.map(task_groups, fn tasks ->
        refs = Enum.map(tasks, & &1.ref)
        {refs, tasks, :awaiting}
      end)

    tref = Process.send_after(self(),:await_timeout, timeout)
    result = await_first(groups)
    Process.cancel_timer(tref, info: false, async: false)
    result
  end

  defp await_first(groups) do
    receive do
      :await_timeout ->
        exit(:timeout)
      {:DOWN, _ref, _, _proc, _reason} ->
        await_first(groups)
      {ref, result} ->
        case set_result(groups, ref, result) do
          {:halt, groups} -> Enum.map(groups, &elem(&1, 2))
          {:cont, groups} -> await_first(groups)
        end
    end
  end

  defp set_result(groups, ref, result) do
    groups =
      Enum.map(groups, fn {refs, tasks, state} ->
        cond do
          ref in refs and state == :awaiting ->
            stop_all_tasks(tasks, ref)
            {refs, tasks, {:received, result}}

          true ->
            {refs, tasks, state}
        end
      end)

    if Enum.all?(groups, fn {_, _, state} -> state != :awaiting end) do
      {:halt, groups}
    else
      {:cont, groups}
    end
  end

  defp stop_all_tasks(tasks, ref) do
    tasks
    |> Enum.filter(fn %Task{ref: r} -> r != ref end)
    |> Enum.each(&Task.shutdown(&1, :brutal_kill))
  end
end

{time, result} = :timer.tc(fn -> TaskFour.run() end)

IO.inspect(result, label: "Results: ")
IO.puts("Execution time: #{time / 1_000_000} seconds")

Of course my set_result function is very naive, but you get the idea. And the :DOWN messages are not properly handled. We should have a large map of all known refs so we can just match the down messages that belong to our work and ignore them, while not consuming other down messages.

What do you think ?

Edit: well… if we have a large map with all the refs, then we can actually make it work without a custom data structure.

1 Like

So yeah I would just do this:

defmodule TaskFour do
  def run() do
    task_groups = Enum.map(["Web", "Image", "Video"], &start_tasks/1)
    Enum.map(task_groups, &await_first/1)
  end

  def start_tasks(type) do
    for i <- 1..2 do
      Task.async(fn ->
        Process.sleep(Enum.random(1000..5000))
        IO.puts("Delivering #{type} result from #{i}")
        "#{type} result from #{i}"
      end)
    end
  end

  def await_first(tasks, timeout \\ 5000) do
    map = Map.new(tasks, fn task -> {task.ref, task} end)

    receive do
      {ref, result} when is_map_key(map, ref) ->
        stop_all_tasks(tasks, ref)
        {:ok, result}
    after
      timeout ->
        stop_all_tasks(tasks, nil)
        {:error, :timeout}
    end
  end

  defp stop_all_tasks(tasks, ignore_ref) do
    tasks
    |> Enum.filter(fn %Task{ref: ref} ->
      Process.demonitor(ref, [:flush])
      ref != ignore_ref
    end)
    |> Enum.each(&Task.shutdown(&1, :brutal_kill))
  end
end

{time, result} = :timer.tc(fn -> TaskFour.run() end)

IO.inspect(result, label: "Results: ")
IO.puts("Execution time: #{time / 1_000_000} seconds")

I feel like it is better to use your own receive. It is generally hidden behind bahaviours bu it is totally fine to do so.

I’m still not handling the cleanup in case of timeout but you get the general idea.

2 Likes

This can leak an :await_timeout message if the timer fires after a Task has completed but while stop_all_tasks is still running; consider using an after clause on the receive to avoid that problem.

1 Like

Oh right there is no need to send that timeout anymore.

@al2o3cr I changed my code above, what do you think? I do not exit because we call that function in Enum.map with multiple groups of tasks, so I guess the function should be clean and handle all messages