Repeat previous pipe while condition?

Task.Supervisor is started as part of the application in MyApp.Application.start/2. It is then later used by Task.Supervisor.async_stream_nolink/6 within Worker.stream_tasks/2

Worker is simply a GenServer that needs results from multiple tasks - while being “safe” from them should they misbehave. async links to the spawning process so a crashing task will take down everything else.

Worker.stream_fun is the function being run as a task. The input list [1000, -500, 2000] results in three separate tasks:

  1. Sleeps 1000 ms then returns an integer value of 1000
  2. Sleeps 500 ms and then crashes
  3. Sleeps 2000 ms and is terminated by the 1500 ms timeout

The Worker GenServer receives the completed results from all the tasks in

  def handle_info({task_ref, result}, {app, name, task_ref}) do
    # normal task result arrives here - demonitor/flush :normal :DOWN
    Process.demonitor(task_ref, [:flush])
    IO.puts("Result: #{inspect result}")
    {:stop, :normal, {app, name, nil}}
  end

which happens to be:

[ok: 1000, exit: :crash, exit: :timeout]

i.e. only the first task completed successfully while the others exited: due to a crash or was terminated via a timeout.

Joe Armstrong’s stated goal for the BEAM is:

applications run 0.75 x N times faster on an N-core computer

How we program multicores - Joe Armstrong

Also it’s not too hard to imagine that the async_stream way of doing things is likely highly optimized while there are lots of ways to foul up a bunch of hand-coded asyncs.

There are a number of kernel parameters (like FD limits) that can make a code that downloads 1000 URLs fast or very slow.

My intent wasn’t to advocate boundless process spawning. I can merely see the attraction of simply firing up “a few” tasks via async and then using Enum.map/2 in combination with Task.await/2 as a kind of await_all - especially if you aren’t particularly concerned about what happens when one of the tasks crashes.

Task.Supervisor.async_stream_nolink/6 seems like a much more adept solution just for the built-in fault isolation and capture - it’s easy enough to set :max_concurrency to a high enough value to adjust for a reasonable amount of concurrency given the job at hand (e.g. waiting for a bunch of blocked requests).

2 Likes