Task.Supervisor
is started as part of the application in MyApp.Application.start/2
. It is then later used by Task.Supervisor.async_stream_nolink/6
within Worker.stream_tasks/2
Worker
is simply a GenServer
that needs results from multiple tasks - while being “safe” from them should they misbehave. async
links to the spawning process so a crashing task will take down everything else.
Worker.stream_fun
is the function being run as a task. The input list [1000, -500, 2000]
results in three separate tasks:
- Sleeps 1000 ms then returns an integer value of 1000
- Sleeps 500 ms and then crashes
- Sleeps 2000 ms and is terminated by the 1500 ms timeout
The Worker
GenServer
receives the completed results from all the tasks in
def handle_info({task_ref, result}, {app, name, task_ref}) do
# normal task result arrives here - demonitor/flush :normal :DOWN
Process.demonitor(task_ref, [:flush])
IO.puts("Result: #{inspect result}")
{:stop, :normal, {app, name, nil}}
end
which happens to be:
[ok: 1000, exit: :crash, exit: :timeout]
i.e. only the first task completed successfully while the others exited: due to a crash or was terminated via a timeout.
Joe Armstrong’s stated goal for the BEAM is:
applications run 0.75 x N times faster on an N-core computer
How we program multicores - Joe Armstrong
Also it’s not too hard to imagine that the async_stream
way of doing things is likely highly optimized while there are lots of ways to foul up a bunch of hand-coded async
s.
There are a number of kernel parameters (like FD limits) that can make a code that downloads 1000 URLs fast or very slow.
My intent wasn’t to advocate boundless process spawning. I can merely see the attraction of simply firing up “a few” tasks via async
and then using Enum.map/2
in combination with Task.await/2
as a kind of await_all
- especially if you aren’t particularly concerned about what happens when one of the tasks crashes.
Task.Supervisor.async_stream_nolink/6
seems like a much more adept solution just for the built-in fault isolation and capture - it’s easy enough to set :max_concurrency
to a high enough value to adjust for a reasonable amount of concurrency given the job at hand (e.g. waiting for a bunch of blocked requests).