Proper use of Task.async?

As many of you may have realized by now (sorry for all the posts here) I’ve been working on a db problem where I’m trying to aggregate a large list of similar but different tables.

One suggestion I’ve received for dealing with this task was to create a module that would query concretely each table then aggregate them into a single list at the app level vs at the db via unions and view tables. The suggestion was to use Task.async.

Since my max concurrent users will never go above 1000 at any given time I felt this could be a good solution for my use case. I should have enough pool workers to deal with the load of requests to query the db concurrently. I realize this will have cpu overhead but the trade off of how I could do the query will be a good tradeoff since I have a very large list of tables that I will need to manage and rewriting a very long union query does not seem like the best way to manage this. I know little about Tasks and I’m just starting to learn.

So back to my question:

If I have a single function that delegates multiple concurrent queries to compose a single result set, is this a good use case for Task.async?

Also I’ve seen Task await but still lack a good comprehension about it. If I want to block my function until every async task has completed and then composed a new list of results what does that look like?

IE: if a wanted to make a function that took a list of number and squared each value and returned a new list of all the squared values (“regardless of order”) via using Task.async what would that look like?

Thanks for your help in advance, this community awesome !
Best - Josh Chernoff.

Maybe something like this

|> query -> Task.async(Repo, :all, [query]) end) # starts queries
|> Enum.flat_map(fn task -> Task.await(task) end) # waits for them, collects results into one list

Note that Task.await by default waits for up to 5 seconds, then it times out (exits).

1 Like

So Enum.flat_map is blocked until each Task.await has completed? Can it really be this simple?

Edit: just read your edit about the 5 seconds.

I think so. Try running this in your shell to maybe get a better idea

1..10 |> _ -> Task.async(:timer, :sleep, [:rand.uniform(5000)]) end)

and then

1..10 |> _ -> Task.async(:timer, :sleep, [:rand.uniform(5000)]) end) |> task -> Task.await(task) end)

The first snippet returns immediately, since it just spawn_links the tasks.

The second one enters a receive block inside each anonymous function fn task -> Task.await(task) end, thus making the last wait on each of them until they either return, exit or time out.


Compatibility with OTP behaviours

It is not recommended to await a long-running task inside an OTP behaviour such as GenServer. Instead, you should match on the message coming from a task inside your GenServer.handle_info/2 callback.

If you need to go that route have a look at:


I personally like using async_stream, I have found I use this quite a bit

Task.async_stream(Enum, Mod, Fun, args, [max_concurrency: 10])
|> Enum.to_list()

This will just keep kicking off the functions with a maximum number at a time (in this case 10) and then gathers all the results into a list. No need to call task.await at all.


so using your exmple of squring a list, here it is using task.async_stream

defmodule CalcSquares do
def square(x), do: x*x

def square_list(list) do
	|> Task.async_stream(CalcSquares, :square, [], [max_concurrency: 10])
	|> {:ok, val} -> val end)


IO.inspect CalcSquares.square_list(1…10
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]


A slightly more complicated example:

# lib/my_app/application.ex
defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    app = Application.get_application(__MODULE__)
    name = MyApp.TaskSupervisor
    children = [
      {Task.Supervisor, name: name}, # use Task.Supervisor to protect
      {Worker, app: app, name: name} # Worker GenServer process from harm
    opts = [strategy: :rest_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)

  def prep_stop(state) do
    IO.puts("prep_stop: #{state}")

  def stop(state) do
    IO.puts("stop: #{state}")
# lib/worker.ex

defmodule Worker do
  use GenServer

  def start_link(args) do
    GenServer.start_link(__MODULE__, args);

  def init(args) do
    with {:ok, app} <- Keyword.fetch(args, :app),
         {:ok, name} <- Keyword.fetch(args, :name) do
      ref = Process.send_after(self(), :init, 1000)
      {:ok, {app, name, ref}}
       _ ->
         {:stop, :badarg}

  def handle_info(:init, {app, name, _}) do
    # launch the task
    IO.puts("Pre-launch: #{inspect app} #{inspect name}")
    task_ref = launch_task(name)
    {:noreply, {app, name, task_ref}}
  def handle_info({task_ref, result}, {app, name, task_ref}) do
    # normal task result arrives here - demonitor/flush :normal :DOWN
    Process.demonitor(task_ref, [:flush])
    IO.puts("Result: #{inspect result}")
    {:stop, :normal, {app, name, nil}}
  def handle_info({:DOWN, _down_ref, :process, pid, reason}, state) do
    # :DOWN message arrives when task exits without result
    # reason won't be :normal
    IO.puts("DOWN: #{inspect pid} #{inspect reason}")
    {:stop, reason, state}
  def handle_info(msg, state) do
    IO.inspect msg
    {:noreply, state}

  def terminate(reason, {app, _, _}) do
    IO.puts("terminate: #{inspect reason}")
    Application.stop(app)                   # experiment complete

  defp launch_task(name) do
    # async_nolink: won't force GenServer process to exit in case of any task exit
    args = [name, [1000, -500, 2000]] # expected: [ok: 1000, exit: :crash, exit: :timeout]
    task = Task.Supervisor.async_nolink(name, __MODULE__, :stream_tasks, args)

  # Single task being blocked (instead of GenServer process)
  # until all elements in the list have been processed
  def stream_tasks(name, list) do
    # uncomment next line to cause :DOWN message instead of result
    # exit(:crash_start)
    options = [timeout: 1500, on_timeout: :kill_task] # options to kill JUST "2000" task
    |> Task.Supervisor.async_stream_nolink(list, __MODULE__, :stream_fun, [], options)
    |> Enum.to_list()

  # Function being run as a task on each element in the list
  def stream_fun(delay) when is_integer(delay) do
    {timeout, crash} =
      cond do
        delay >= 0 ->
          {delay, false}
        true ->
          {-delay, true}


    cond do
      crash ->
        exit(:crash) # crash the task
      true ->
        timeout      # return result

$ iex -S mix
Erlang/OTP 20 [erts-9.3] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:10] [hipe] [kernel-poll:false] [dtrace]

Compiling 2 files (.ex)
Interactive Elixir (1.6.3) - press Ctrl+C to exit (type h() ENTER for help)
Pre-launch: :my_app MyApp.TaskSupervisor

[error] Task #PID<0.133.0> started from #PID<0.130.0> terminating
** (stop) :crash
    (my_app) lib/worker.ex:81: Worker.stream_fun/1
    (elixir) lib/task/supervised.ex:88: Task.Supervised.do_apply/2
    (elixir) lib/task/supervised.ex:38: Task.Supervised.reply/5
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Function: &Worker.stream_fun/1
    Args: [-500]

Result: [ok: 1000, exit: :crash, exit: :timeout]
terminate: :normal

[info]  Application my_app exited: :stopped

@peerreynders Thanks for your example! It really helped me to work through a similar scenario.