Parallel Maps output order and the need for the pin operator - why is the ^ operator necessary?

The book, Programming Elixir, contains an example implementation of Parallel Maps (pmaps) in Chapter 15.

The example implementation is as follows:

defmodule Parallel do
  def pmap(collection, fun) do
    me = self()
    collection
    |> Enum.map(fn (elem) ->
          spawn_link fn -> (send me, { self(), fun.(elem) }) end
        end)
    |> Enum.map(fn (pid) ->
          receive do { ^pid, result } -> result end
        end)
  end
end

The book states that the pin operator, ^, is required to prevent results occurring in random order, however the pid passed to the receive do clause is from an linked list, where the order was already established and correct from the first Enum.map.

Since this value must match any tuple received by receive do in order for the result to be evaluated, why is the ^ operator necessary?

spawn_link fn -> (send me, { self(), fun.(elem) }) end

The self() in this line is going to be the pid of the spawned process which will then respond back to me. Depending on the data each entry could take a varying amount of time and arrive back to me in a different order. Explicit pattern matches on receive do allow you to pull out a specific message from the inbox regardless of its order in the queue. While I’m pretty sure you understand this last part but for completeness: the pin operator making it so it matches on the value of pid and not rebinding it.

5 Likes

Ah, I did not think rebinding would be a concern here, but I can understand that since pid is a variable the matching would enforce as deep as the data structure portion of the pattern, and try to bind anything else.

1 Like

Right, just doing pid will match any message therefore just grabbing the first available one. A key bit of info I left out for anyone coming across this later is that receive is blocking. The second map has a list of pids in the order they were created since spawn_link returns its pid immediately but of course that doesn’t mean its finished its work. So each iteration will block until it receives a message with that specific ^pid.

1 Like

Oh I didn’t think of that. Should there be some sort of timing out implemented in this?

It’s just the parent process that it blocks on while all the spawn_links do their work concurrently. The blocking is important just to ensure you end up with a list in the same order.

fun = fn n -> n * 2 end

[1, 2, 3]
|> Enum.map(fn (elem) ->
  spawn_link fn -> (send me, { self(), fun.(elem) }) end
end)

This first iteration just spawns all the processes immediately then you end up with:

[#PID<0.1.0>, #PID<0.2.0>, #PID<0.3.0>]
|> Enum.map(fn (pid) ->
  receive do { ^pid, result } -> result end
end)

Now let’s say the #PID<0.2.0> processes finishes its work first and ends up in the parent process’ inbox. It’s done but we don’t want to collect it yet. Because on the first iteration the receive line expands to this:

receive do { #PID<0.1.0>, result } -> result end

It’s going to keep blocking until there is a message in its inbox that matches that. This is how the order is ensured. If the #PID<0.1.0> process finishes first, it’ll grab it right away. Things are still happening concurrently and you get your final result in the same amount of time regardless of the blocking. It would only be slower if you needed to read the values as soon as they came in, but in that case you can’t guarantee order.

I found it helpful to do a simplified experiment in iex without iterations:

iex(1)> send(self(), :one)
:one
iex(2)> send(self(), :two)
:two
iex(3)> send(self(), :three)
:three
iex(4)> Process.info(self(), :messages)
{:messages, [:one, :two, :three]}
iex(5)> receive do :two -> nil end
nil
iex(6)> Process.info(self(), :messages)
{:messages, [:one, :three]}
iex(7)> me = self()
#PID<0.114.0>
iex(8)> spawn_link(fn -> Process.sleep(10_000); send(me, :four) end)
#PID<0.116.0>
iex(9)> receive do :four -> "Got :four!" end
# About 10 seconds later:
"Got :four!"

The first time we call receive, it grabs :two even though it’s not the first message in the queue. Then we spawn another process that sleeps for 10 seconds and returns the message :four. If we call receive do :four -> "Got :four!" end quickly enough, it’ll hang until the 10 seconds are up at which point it receives the message and matches.

2 Likes