Function for Task.await_first/1 - Replication Concurrency Pattern

So yeah I would just do this:

defmodule TaskFour do
  def run() do
    task_groups = Enum.map(["Web", "Image", "Video"], &start_tasks/1)
    Enum.map(task_groups, &await_first/1)
  end

  def start_tasks(type) do
    for i <- 1..2 do
      Task.async(fn ->
        Process.sleep(Enum.random(1000..5000))
        IO.puts("Delivering #{type} result from #{i}")
        "#{type} result from #{i}"
      end)
    end
  end

  def await_first(tasks, timeout \\ 5000) do
    map = Map.new(tasks, fn task -> {task.ref, task} end)

    receive do
      {ref, result} when is_map_key(map, ref) ->
        stop_all_tasks(tasks, ref)
        {:ok, result}
    after
      timeout ->
        stop_all_tasks(tasks, nil)
        {:error, :timeout}
    end
  end

  defp stop_all_tasks(tasks, ignore_ref) do
    tasks
    |> Enum.filter(fn %Task{ref: ref} ->
      Process.demonitor(ref, [:flush])
      ref != ignore_ref
    end)
    |> Enum.each(&Task.shutdown(&1, :brutal_kill))
  end
end

{time, result} = :timer.tc(fn -> TaskFour.run() end)

IO.inspect(result, label: "Results: ")
IO.puts("Execution time: #{time / 1_000_000} seconds")

I feel like it is better to use your own receive. It is generally hidden behind bahaviours bu it is totally fine to do so.

I’m still not handling the cleanup in case of timeout but you get the general idea.

2 Likes