Why parallel processing runs slower

I’m curious why when I use the Enum.map/2 function, parallelizing it with a Task doesn’t make it run faster, but rather makes it run slower!

When comparing the ‘Unuse Task’ case and the ‘Use Task’ case using the Benchee module, the ‘Unuse Task’ case appeared to be better in terms of execution speed and efficiency, which raised questions.

defmodule TaskTest do
  def run() do
    list = Enum.to_list(1..100_000)
    function = fn i -> i+1 end

    Benchee.run(
      %{
        "Unuse Task" => fn -> Enum.map(list, function) end,
        "Use Task" => fn ->
          list
          |> Enum.map(&(Task.async(fn ->
            function.(&1)
          end)))
          |> Task.await_many()
        end
      }
    )
  end
end

TaskTest.run()

1 Like

I would expect there is overhead in both spawning and collecting the result of the functions.

4 Likes

This seems to be caused by Process creation overhead. thank you

1 Like

You are sequentially (via Enum.map) spawning one task per item in the list!

Your second block is not parallel, it’s practically sequential even.

1 Like

To expand a bit: your example is certainly not good, plain old mapping with a very basic function will always be faster serially and not in parallel. See Amdahl’s Law.

I made changes to your thing because I was curious but did not get surprised, parallel in these conditions has no chance to be better:

defmodule TaskTest do
  def run() do
    range = 1..100_000
    list = Enum.to_list(range)
    function = fn i -> i + 1 end

    Benchee.run(%{
      "Serial list" => fn -> Enum.map(list, function) end,
      "Parallel list" => fn ->
        list
        |> Task.async_stream(function, timeout: :infinity, max_concurrency: 30)
        |> Enum.to_list()
      end,
      "Serial range" => fn -> Enum.map(range, function) end,
      "Parallel range" => fn ->
        range
        |> Task.async_stream(function, timeout: :infinity, max_concurrency: 30)
        |> Enum.to_list()
      end
    })
  end
end

Results being:

Name                     ips        average  deviation         median         99th %
Serial list           384.69        2.60 ms     ±7.09%        2.61 ms        3.00 ms
Serial range          327.14        3.06 ms     ±4.06%        3.04 ms        3.44 ms
Parallel list           1.47      680.32 ms     ±0.98%      680.24 ms      690.88 ms
Parallel range          1.42      705.07 ms     ±4.90%      697.70 ms      746.10 ms

I hope you don’t emerge biased out of this experiment, parallel execution is almost always better in real-world scenarios. But this toy example does not demonstrate its benefits well.

3 Likes

It is parallel. Task.async/1 returns immediately without waiting for the work to be done.

The issue is that parallelization introduced more overhead than the gains that can be achieved for some lightning fast math.

3 Likes

Yep Task.async returns immediately, you still go over each element one by one whereas Task.async_stream works in batches. So a loop and doing Task.async still bottlenecks on the speed of iterating through the list one by one somewhat.

And spawning so many processes is much slower indeed.

The issue has already been described accurately while I was working up an illustrative example, so this is probably unnecessary, but here’s a comparison that I think is more fair.

Name               ips        average  deviation         median         99th %
seq           16378.54      0.00006 s   ±230.19%      0.00006 s      0.00009 s
par              70.85       0.0141 s    ±11.70%       0.0138 s       0.0221 s
par_long          0.40         2.53 s     ±0.00%         2.53 s         2.53 s
seq_long        0.0990        10.10 s     ±0.00%        10.10 s        10.10 s

Comparison:
seq           16378.54
par              70.85 - 231.19x slower +0.0141 s
par_long          0.40 - 41355.92x slower +2.52 s
seq_long        0.0990 - 165422.20x slower +10.10 s
defmodule SeqVsPar do
  def seq(n), do: 1..n |> Stream.map(fn i -> i + 1 end) |> Enum.to_list()
  def par(n), do: 1..n |> Task.async_stream(fn i -> i + 1 end) |> Enum.to_list()

  def seq_long(n),
    do:
      1..n
      |> Stream.map(fn i ->
        Process.sleep(100)
        i + 1
      end)
      |> Enum.to_list()

  def par_long(n),
    do:
      1..n
      |> Task.async_stream(fn i ->
        Process.sleep(100)
        i + 1
      end)
      |> Enum.to_list()
end

Benchee.run(
  %{
    "seq" => fn -> SeqVsPar.seq(1_000) end,
    "par" => fn -> SeqVsPar.par(1_000) end,
    "seq_long" => fn -> SeqVsPar.seq_long(100) end,
    "par_long" => fn -> SeqVsPar.par_long(100) end
  },
  parallel: 3
)

In this example seq uses Stream rather than Enum to make the comparison with par more accurately reflect the parallelization step rather than the sequential list creation step. You see that for extremely short calculations the parallelization does take longer because the calculation is shorter than the time of process generation and closure. When you make the calculation take longer by adding the sleep the cost of process generation and closure is now less than that of the calculation step.
Just for grins, below you can see the parallelization effect on my CPU as benchee runs the tasks with :parallel set to the number of available cores. On the left is running par, then par_long then the sequential variants tail after. This shows that the parallelization does actually occur.

5 Likes

Which terminal plotting tool did you use? I like the result.

It’s just a screenshot of a terminal running btm that I have running all the time.

Ah, btm, yeah I like that one too. Also ytop and glances.

1 Like