Pelemay Fast Parallel map has just been released!

This is a present of the end of year for Elixir programmers from me: The new package “Pelemay Fast Parallel map” has been released! It drives multi cores and is 3x faster than Enum! Check it out, soon!

Pelemay Fast Parallel map (or pelemay_fp) - Fast parallel map function for Elixir

Pelemay Fast Parallel map: provides fast Parallel map function, similar to the Enum module, although computations will be executed in parallel using Process.spawn/4.

Here is a quick example on how to calculate the square of each element with PelemayFp:

list
|> PelemayFp.map(& &1 * &1)

We conducted performance evaluation of PelemayFp, Pelemay, Flow, Enum and Pmap on iMac Pro (2017):

## PelemayFpBench
benchmark name          iterations   average time 
PelemayFp                      200   10177.85 µs/op
Pelemay                        100   16762.24 µs/op
PelemayFp and Pelemay          100   18532.70 µs/op
Flow (without sorting)         100   18731.43 µs/op
Enum                            50   31283.36 µs/op
Flow (with sorting)             10   105091.00 µs/op
Pmap                             1   1213749.00 µs/op

Thanks! A happy new year!

26 Likes

Nice idea! Here goes something from me … :smiling_imp:

1 Like

Thank you!
It’s quite a strange behavior…

How does this compare to Task.async_stream, which would be the core way of separating computations into different processes?

2 Likes

Thank you. TIL Task.async_stream.

1 Like

I may have misunderstood the specification of spawn_monitor . Perhaps once a child process is launched by spawn_monitor , will it be started again even if it quit by executing exit(:normal) ?

1 Like

@LostKobrakai I tried Task.async_stream/2 in few benchmarks, but results are not as good as expected …

Edit Updated benchmarks:

pelemay_fp: benchee

benchmark.exs
defmodule Benchmark do
  @list Enum.to_list(1..100)

  def run do
    Benchee.run(
      %{
        "enum" => fn -> Enum.map(@list, &logistic_map_10/1) end,
        "pelemay_fp" => fn -> PelemayFp.map(@list, &logistic_map_10/1) end,
        "stream" => fn -> @list |> Stream.map(&logistic_map_10/1) |> Stream.run() end,
        "task_async_stream" => fn -> @list |> Task.async_stream(&logistic_map_10/1) |> Enum.to_list() end
      },
      time: 10,
      memory_time: 2
    )    
  end

  defp logistic_map_10(v) do
    logistic_map(v)
    |> logistic_map()
    |> logistic_map()
    |> logistic_map()
    |> logistic_map()
    |> logistic_map()
    |> logistic_map()
    |> logistic_map()
    |> logistic_map()
    |> logistic_map()
  end

  defp logistic_map(v) do
    rem(22 * v * (v + 1), 6_700_417)
  end
end

Benchmark.run()
mix run benchmark.exs
Operating System: Linux
CPU Information: Intel(R) Core(TM) i7-3630QM CPU @ 2.40GHz
Number of Available Cores: 8
Available memory: 15.53 GB
Elixir 1.11.2
Erlang 23.2.1

Benchmark suite executing with the following configuration:
warmup: 2 s
time: 10 s
memory time: 2 s
parallel: 1
inputs: none specified
Estimated total run time: 56 s

Benchmarking enum...
Benchmarking pelemay_fp...
Benchmarking stream...
Benchmarking task_async_stream...

Name                        ips        average  deviation         median         99th %
pelemay_fp              37.47 K       26.69 μs  ±5358.81%        9.10 μs      295.71 μs
enum                    28.89 K       34.61 μs    ±41.28%       31.53 μs       57.92 μs
stream                  24.44 K       40.91 μs   ±251.74%       35.87 μs       65.61 μs
task_async_stream        0.43 K     2342.98 μs    ±33.40%     2127.96 μs     4539.60 μs

Comparison: 
pelemay_fp              37.47 K
enum                    28.89 K - 1.30x slower +7.92 μs
stream                  24.44 K - 1.53x slower +14.22 μs
task_async_stream        0.43 K - 87.78x slower +2316.29 μs

Memory usage statistics:

Name                      average  deviation         median         99th %
pelemay_fp                2.10 KB     ±0.00%        2.10 KB        2.10 KB
enum                      1.56 KB     ±0.00%        1.56 KB        1.56 KB
stream                    4.35 KB     ±0.00%        4.35 KB        4.35 KB
task_async_stream       120.59 KB     ±0.69%      120.73 KB      121.70 KB

Comparison: 
pelemay_fp                2.10 KB
enum                      1.56 KB - 0.74x memory usage -0.53906 KB
stream                    4.35 KB - 2.07x memory usage +2.25 KB
task_async_stream       120.59 KB - 57.38x memory usage +118.49 KB

pelemay_fp_benchmark: pelemay_fp_benchmark

mix bench
Settings:
  duration:      1.0 s

## PelemayFpBench
[13:29:41] 1/7: Async stream
[13:29:43] 2/7: Enum
[13:29:45] 3/7: Flow (with sorting)
[13:29:46] 4/7: Flow (without sorting)
[13:29:49] 5/7: PelemayFp
[13:29:50] 6/7: Pmap
[13:29:53] 7/7: Stream

Finished in 14.4 seconds

## PelemayFpBench
benchmark name          iterations   average time 
PelemayFp                      100   12119.25 µs/op
Flow (without sorting)         100   23138.32 µs/op
Enum                            50   29528.56 µs/op
Stream                          50   40619.40 µs/op
Flow (with sorting)             10   109108.60 µs/op
Pmap                             2   878048.00 µs/op
Async stream                     1   2257371.00 µs/op

PelemayFp splits the incoming value into chunks of 12000, so I’m not sure what this benchmark demonstrates - it will only spawn one worker. FWIW, the code in pelemay_fp_benchmark/bench/pelemay_fp_bench.exs at main · zeam-vm/pelemay_fp_benchmark · GitHub use a limit of 100k.

2 Likes

Fix this issue. Thanks!

2 Likes

I added benchmark of Stream and Task.async_stream. Thanks

## PelemayFpBench
benchmark name          iterations   average time 
PelemayFp                      100   14143.22 µs/op
Pelemay                        100   18627.19 µs/op
PelemayFp and Pelemay          100   20414.04 µs/op
Flow (without sorting)         100   20720.33 µs/op
Enum                            50   31957.12 µs/op
Stream                          50   38792.98 µs/op
Flow (with sorting)             10   116906.90 µs/op
Pmap                             2   830539.50 µs/op
Task.async_stream                1   1099493.00 µs/op
3 Likes

I tried out some changes to increase the per-element workload, because there’s a fair bit of overhead with starting communicating processes. See notes in https://github.com/zeam-vm/pelemay_fp_benchmark/pull/1

3 Likes

Thank you for your contribution.

As you mentioned, the threshold parameter is very sensitive to performance, so I guess it requires some kind of parameter optimization technology for practical usage. I have a plan to develop such a technology.

1 Like

PelemayFp that fixed the issue that unreceived messages remained has just been released!

4 Likes

Why is there such a big performance difference with the pmap here:

Is it because the particular benchmark you’re doing benefits from batching? If so, could you get the same benefit by calling Enum.chunk_every before the map, and Enum.concat after? (modifying the async task to call Enum.map as well)

This code is from Programming Elixir by Dave Thomas.
It is simple, but has such a performance bottleneck, which you mentioned.

I tried to use Enum.chunk_every before implementing PelemayFp, but it was slower than I expected. After the new year’s holidays, I’ll show it to you.

Oh you’re right, I played with it a bit and Enum.chunk_every is quite slow because the implementation is generic for all enumerables. If I write a specialized version just for lists instead, then I get similar timings for PelemayFp, Task.async_stream, and pmap (with batches of 12,000 items).

I noticed a small detail, the benchmark code is using a module constant for the list:

@list Enum.to_list(1..100000)

This may affect the timing a bit since constants aren’t subject to garbage collection and don’t get copied when they’re sent from one process to another, unlike variable data.

@list Enum.to_list(1..100000)

This may affect the timing a bit since constants aren’t subject to garbage collection and don’t get copied when they’re sent from one process to another, unlike variable data.

To solve it, I guess it should be implemented using before_each_bench and after_each_bench.

I’ve implemented PelemayFp.ParallelSplitter.split/7 as a faster integrated function like Enum.chunck_every/2 with Process.spawn/4. See it hexdoc of it: PelemayFp.ParallelSplitter — PelemayFp v0.1.2