Proper use of Task.async?

As many of you may have realized by now (sorry for all the posts here) I’ve been working on a db problem where I’m trying to aggregate a large list of similar but different tables.

One suggestion I’ve received for dealing with this task was to create a module that would query concretely each table then aggregate them into a single list at the app level vs at the db via unions and view tables. The suggestion was to use Task.async.

Since my max concurrent users will never go above 1000 at any given time I felt this could be a good solution for my use case. I should have enough pool workers to deal with the load of requests to query the db concurrently. I realize this will have cpu overhead but the trade off of how I could do the query will be a good tradeoff since I have a very large list of tables that I will need to manage and rewriting a very long union query does not seem like the best way to manage this. I know little about Tasks and I’m just starting to learn.

So back to my question:

If I have a single function that delegates multiple concurrent queries to compose a single result set, is this a good use case for Task.async?

Also I’ve seen Task await but still lack a good comprehension about it. If I want to block my function until every async task has completed and then composed a new list of results what does that look like?

IE: if a wanted to make a function that took a list of number and squared each value and returned a new list of all the squared values (“regardless of order”) via using Task.async what would that look like?

Thanks for your help in advance, this community awesome !
Best - Josh Chernoff.

Maybe something like this

queries
|> Enum.map(fn query -> Task.async(Repo, :all, [query]) end) # starts queries
|> Enum.flat_map(fn task -> Task.await(task) end) # waits for them, collects results into one list

Note that Task.await by default waits for up to 5 seconds, then it times out (exits).

1 Like

So Enum.flat_map is blocked until each Task.await has completed? Can it really be this simple?

Edit: just read your edit about the 5 seconds.

I think so. Try running this in your shell to maybe get a better idea

1..10 |>
Enum.map(fn _ -> Task.async(:timer, :sleep, [:rand.uniform(5000)]) end)

and then

1..10 |>
Enum.map(fn _ -> Task.async(:timer, :sleep, [:rand.uniform(5000)]) end) |>
Enum.map(fn task -> Task.await(task) end)

The first snippet returns immediately, since it just spawn_links the tasks.

The second one enters a receive block inside each anonymous function fn task -> Task.await(task) end, thus making the last Enum.map wait on each of them until they either return, exit or time out.

2 Likes

Caveat:
Compatibility with OTP behaviours

It is not recommended to await a long-running task inside an OTP behaviour such as GenServer. Instead, you should match on the message coming from a task inside your GenServer.handle_info/2 callback.

If you need to go that route have a look at:

3 Likes

I personally like using async_stream, I have found I use this quite a bit

Task.async_stream(Enum, Mod, Fun, args, [max_concurrency: 10])
|> Enum.to_list()

This will just keep kicking off the functions with a maximum number at a time (in this case 10) and then gathers all the results into a list. No need to call task.await at all.

9 Likes

so using your exmple of squring a list, here it is using task.async_stream

defmodule CalcSquares do
def square(x), do: x*x

def square_list(list) do
	list
	|> Task.async_stream(CalcSquares, :square, [], [max_concurrency: 10])
	|> Enum.map(fn {:ok, val} -> val end)
end

end

IO.inspect CalcSquares.square_list(1…10
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

3 Likes

A slightly more complicated example:

# lib/my_app/application.ex
defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    app = Application.get_application(__MODULE__)
    name = MyApp.TaskSupervisor
    children = [
      {Task.Supervisor, name: name}, # use Task.Supervisor to protect
      {Worker, app: app, name: name} # Worker GenServer process from harm
    ]
    opts = [strategy: :rest_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end

  def prep_stop(state) do
    IO.puts("prep_stop: #{state}")
    state
  end

  def stop(state) do
    IO.puts("stop: #{state}")
    :ok
  end
end
# lib/worker.ex

defmodule Worker do
  use GenServer

  def start_link(args) do
    GenServer.start_link(__MODULE__, args);
  end

  def init(args) do
    with {:ok, app} <- Keyword.fetch(args, :app),
         {:ok, name} <- Keyword.fetch(args, :name) do
      ref = Process.send_after(self(), :init, 1000)
      {:ok, {app, name, ref}}
     else
       _ ->
         {:stop, :badarg}
     end
  end

  def handle_info(:init, {app, name, _}) do
    # launch the task
    IO.puts("Pre-launch: #{inspect app} #{inspect name}")
    task_ref = launch_task(name)
    {:noreply, {app, name, task_ref}}
  end
  def handle_info({task_ref, result}, {app, name, task_ref}) do
    # normal task result arrives here - demonitor/flush :normal :DOWN
    Process.demonitor(task_ref, [:flush])
    IO.puts("Result: #{inspect result}")
    {:stop, :normal, {app, name, nil}}
  end
  def handle_info({:DOWN, _down_ref, :process, pid, reason}, state) do
    # :DOWN message arrives when task exits without result
    # reason won't be :normal
    IO.puts("DOWN: #{inspect pid} #{inspect reason}")
    {:stop, reason, state}
  end
  def handle_info(msg, state) do
    IO.inspect msg
    {:noreply, state}
  end

  def terminate(reason, {app, _, _}) do
    IO.puts("terminate: #{inspect reason}")
    Application.stop(app)                   # experiment complete
  end

  defp launch_task(name) do
    # async_nolink: won't force GenServer process to exit in case of any task exit
    args = [name, [1000, -500, 2000]] # expected: [ok: 1000, exit: :crash, exit: :timeout]
    task = Task.Supervisor.async_nolink(name, __MODULE__, :stream_tasks, args)
    task.ref
  end

  # Single task being blocked (instead of GenServer process)
  # until all elements in the list have been processed
  def stream_tasks(name, list) do
    # uncomment next line to cause :DOWN message instead of result
    # exit(:crash_start)
    options = [timeout: 1500, on_timeout: :kill_task] # options to kill JUST "2000" task
    name
    |> Task.Supervisor.async_stream_nolink(list, __MODULE__, :stream_fun, [], options)
    |> Enum.to_list()
  end

  # Function being run as a task on each element in the list
  def stream_fun(delay) when is_integer(delay) do
    {timeout, crash} =
      cond do
        delay >= 0 ->
          {delay, false}
        true ->
          {-delay, true}
      end

    Process.sleep(timeout)

    cond do
      crash ->
        exit(:crash) # crash the task
      true ->
        timeout      # return result
    end
  end

end
$ iex -S mix
Erlang/OTP 20 [erts-9.3] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:10] [hipe] [kernel-poll:false] [dtrace]

Compiling 2 files (.ex)
Interactive Elixir (1.6.3) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> 
Pre-launch: :my_app MyApp.TaskSupervisor

[error] Task #PID<0.133.0> started from #PID<0.130.0> terminating
** (stop) :crash
    (my_app) lib/worker.ex:81: Worker.stream_fun/1
    (elixir) lib/task/supervised.ex:88: Task.Supervised.do_apply/2
    (elixir) lib/task/supervised.ex:38: Task.Supervised.reply/5
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Function: &Worker.stream_fun/1
    Args: [-500]

Result: [ok: 1000, exit: :crash, exit: :timeout]
terminate: :normal
prep_stop: 
stop: 

[info]  Application my_app exited: :stopped
4 Likes

@peerreynders Thanks for your example! It really helped me to work through a similar scenario.

2 Likes