Proposal: New function `Task.await_many`

Hopefully the fact that the suggestion thread received no replies means that everyone loves it and has no revisions :sweat_smile:

I’ve added one code example to demonstrate usage, but otherwise left it unchanged.

Background

The Task module currently contains three functions that synchronously retrieve results from asynchronous tasks:

  • Task.await: Blocks waiting for a reply from a single task. Accepts a timeout value. If the task is successful, returns reply. If the task dies or the timeout is reached, exits with the corresponding reason.
  • Task.yield: Blocks waiting for a reply from a single task. Accepts a timeout value. If the task is successful, returns {:ok, reply}. If the task dies or the timeout is reached, returns {:exit, reason} or nil.
  • Task.yield_many: Blocks waiting for replies from a list of tasks. Accepts a timeout value. When all tasks are complete or the timeout is reached, returns a list of result tuples: {:ok, reply} for successful tasks, {:exit, reason} for dead tasks, and nil for timed-out.

Additionally, the Task module contains one function that handles both creating asynchronous tasks and retrieving the results:

  • Task.async_stream: Asynchronously applies a given function to each element in a given enumerable. Accepts a timeout value that is applied to each task separately. Returns an enumerable that emits results, blocking as needed. If a task dies, exits with the reason. When tasks complete, emits {:ok, reply}. When tasks reach the timeout, either exits or emits {:exit, :timeout}, depending on configuration options.

The discussion that eventually became Task.async_stream included an alternative suggestion of Task.async_many and Task.await_many. In the end, async_stream was chosen because it provides the ability to bound the maximum concurrency and stream results, making it the most robust way to handle intensive processing over an enumerable.

Proposal

I propose this addition to retrieve results from multiple asynchronous tasks while adhering to await behavior:

Task.await_many(tasks, timeout \\ 5000)

Blocks waiting for replies from a list of tasks. If the tasks complete, returns a list of replies. If the timeout is reached, exits with :timeout. If a task dies, exits with the reason given by the task.

Task.await (together with Task.async) provides a simple solution that can be used as a drop-in replacement for synchronous code. The addition of Task.await_many will provide the building blocks for many common asynchronous flows, with unsurprising default behavior.

It fits well with the existing feature set and should be very easy for people to use and understand, provided they are familiar with the other Task functions.

As a toy example, consider baking a cake, a construction of heterogeneous sometimes-parallelizable tasks:

oven_prep = Task.async(fn -> preheat_oven(350) end),

{pan, bowl} = wash_dishes()

frosting_prep = Task.async(fn -> make_frosting(bowl, :pink) end)

[_, greased_pan, batter] = Task.await_many([
  oven_prep,
  Task.async(fn -> grease_pan(pan) end),
  Task.async(fn -> mix_batter() end),
], 600_000)

baking = Task.async(fn ->
  baked_cake = bake(batter, greased_pan, 30)
  cool(baked_cake, 10)
end)

eat_dinner()

[cooled_cake, frosting] = Task.await_many([baking, frosting_prep])

cooled_cake
|> frost(frosting)
|> eat()

Alternatives

Why not Task.await?

A common pattern suggested online [1][2][3] is to enumerate calls to Task.await:

Enum.map(tasks, &Task.await(&1, timeout))

Because the await calls happen sequentially, the timeout is reset for each element of the list. This can lead to unexpected and likely unwanted behavior in which this call may block much longer than the specified timeout.

Why not Task.yield_many?

Task.yield_many works fine for this situation, but it adheres to the semantics of Task.yield rather than Task.await. It returns a tuple instead of the bare reply, and on failure it does not exit or kill still-running tasks. To achieve the behavior of await, you must write something like this to handle the various possible results:

Task.yield_many(tasks)
|> Enum.map(fn {task, result} ->
  case result do
    nil ->
      # Maybe unnecessary since we are exiting?
      Task.shutdown(task, :brutal_kill)
      exit(:timeout)
    {:exit, reason} ->
      exit(reason)
    {:ok, result} ->
      result
  end
end)

Rather than expecting every developer to write this boilerplate (and not make any mistakes in doing so), I think it would be better to provide a construct in the standard library.

Why not Task.async_stream?

Task.async_stream is great for enumerating the same expensive operation across a list of inputs, and it absolutely should be used for that. However, it is not well-suited to situations where the collection of tasks is less uniform. Consider the cake example:

prep = [
  Task.async(fn -> preheat_oven(350) end),
  Task.async(fn -> grease_pan(pan) end),
  Task.async(fn -> mix_batter() end),
]

This would be a very awkward fit for async_stream. It is a specialized tool that should not be applied in a generalized way. In addition, async_stream has its own return values and exit behavior that does not match that of await.

One potential harm of adding Task.await_many is that people might be tempted to use it when they would be better off using Task.async_stream. I believe this can be mitigated with proper documentation.

Why not GenStage?

GenStage provides powerful and flexible abstractions for handling asynchronous flows. For applications that have complicated needs, this is a great tool. Often, though, we have much simpler needs and applying GenStage to the problem would be massive overkill.

Task.await is easy to use and easy to reason about. The goal of Task.await_many is the same. It’s okay if it doesn’t cover every possible use case, as long as it covers the ones we most commonly encounter in a way that doesn’t encourage us to make mistakes.

6 Likes

It is not that big a boilerplate. :slight_smile: Also, have you tried emulating your desired behaviour with Task.Supervisor?

2 Likes

Could you explain what you have in mind with Task.Supervisor? My understanding of that module is that it focuses on monitoring and restarting tasks, so I think of it as being for a different use case than the success-or-error behavior of the basic Task stuff.

I mostly mean that you can link all your tasks with a process that’s different than the current caller, which gives you more control over their lifetimes. With a little bit of boilerplate you can emulate your proposed Task.await_many function without the caller process ever being in danger of being killed (because tasks are by default linked to it).

I see what you mean. The way I think of it, Task.await serves a different mindset than Task.Supervisor. Supervised tasks are good when you want to easily add some fault tolerance for operations that may crash. Task.await is good when you want to reap benefits of parallelization but have your code behave exactly the same as if it were happening in a single process. I want Task.await_many to extend the Task.await mindset, thus the immediate exits, bare return values, etc.

Does that make sense? Should I add something to the proposal to better explain that distinction?

1 Like

also, as per jeg2’s description, supervisors are not just for restarting, they are for lifecycle management, even if that lifecycle is “die and then never come back”.

Supervised tasks are good when you want to easily add some fault tolerance for operations that may crash.

You also want supervised tasks for graceful shutdown and notification on vm cleanup via the supervision tree, even if they are fire and forget. Maybe someone more experienced can explain that I’m wrong, but IMO, In prod you should almost never do Task.async and you should almost always use Task.Supervisor.async instead.

2 Likes

Regardless of the advisability of Task.async vs. Task.Supervisor.async, we still need some way to wait for the spawned process to finish. Unless I’m missing something, I don’t think Task.Supervisor provides anything for that, right? The documentation gives an example of passing it along: Task.Supervisor.async(...) |> Task.await(). So we’re back where we started, needing a good wait to await multiple tasks at once.

1 Like

I think what @dimitarvp is saying, is, build a task.supervisor, attach it to, say, a DynamicSupervisor. Then attempt to collect awaits, but if it times out, instead slay the task.supervisor.

1 Like

Can you help me understand the downside to the following approach?

[_, {:ok, greased_pan}, {:ok, batter}] =
  [
    fn -> preheat_oven(350) end,
    fn -> grease_pan(pan) end,
    fn -> mix_batter() end,
  ]
  |> Task.async_stream(fn work -> work.() end, timeout: 600_000)
  |> Enum.to_list
3 Likes

I don’t think there’s anything wrong technically with that solution, but in my opinion it’s a bit unintuitive. async_stream is designed to map a transformation but in this case we’re essentially mapping the identity function. I’d feel a little guilty leaving this for the next person to come along, knowing that they’d have to puzzle out why I was using async_stream but not really streaming anything.

For common use cases, I dislike making people perpetually reinvent these little bits of boilerplate (same goes for the solution I presented with yield_many). I see it as giving people unnecessary opportunities to make mistakes. For example, on my first try at writing your solution, I wouldn’t have realized that I needed the Enum.to_list, so I would have floundered around with confusing failures for a little while until I figured that out. And there’s the question of matching await behavior. It looks to me like this solution will exit with a descriptive reason if one of the tasks exits, which is good… unless we’ve started trapping exits earlier in the code, and then the behavior changes. And of course if we’re trapping exits, a process could crash early and we might not find out about it until much later because async_stream defaults to ordered: true… And I’ve been digging around the docs for 15 minutes now and I’m still only medium-confident that I understand how your solution behaves on edge cases.

5 Likes

Thanks. I think you make some excellent points!

3 Likes

Are you essentially proposing a Task.await equivalent to JavaScript’s Promise.all()?

Yes, that’s a reasonable comparison. The exact semantics of the two languages are different, of course, but Task.await_many is to Task.await as Promise.all is to Promise#then.

Promise.all doesn’t get used as much as it should in JavaScript, either :grin:

It may make sense to go the route that Bluebird did, and create a library. That way, in addition to “many” (JS’s “all”), you can have others:

  • any (eg. returns the value of the first Task to complete)
  • reduce
  • map
  • etc.

They’re all really just syntax sugar around boilerplate; which has its pros and cons.