Help with sample code about processes

Here is a code snippet from the book Programming Elixir:

defmodule FibSolver do
  def fib(scheduler) do
    send scheduler, { :ready, self() }
    receive do
      { :fib, n, client } ->
        send client, { :answer, n, fib_calc(n), self() }
        fib(scheduler)
      { :shutdown } ->
        exit(:normal)
    end
  end

  # very inefficient, deliberately
  defp fib_calc(0), do: 0
  defp fib_calc(1), do: 1
  defp fib_calc(n), do: fib_calc(n-1) + fib_calc(n-2)
end

defmodule Scheduler do
  def run(num_processes, module, func, to_calculate) do
    (1..num_processes)
    |> Enum.map(fn(_) -> spawn(module, func, [self()]) end)
    |> schedule_processes(to_calculate, [])
  end

  defp schedule_processes(processes, queue, results) do
    receive do
      {:ready, pid} when length(queue) > 0 ->
        [ next | tail ] = queue
        send pid, {:fib, next, self()}
        schedule_processes(processes, tail, results)

      {:ready, pid} ->
        send pid, {:shutdown}
        if length(processes) > 1 do
          schedule_processes(List.delete(processes, pid), queue, results)
        else
          Enum.sort(results, fn {n1,_}, {n2,_} -> n1 <= n2 end)
        end

      {:answer, number, result, _pid} ->
        schedule_processes(processes, queue, [ {number, result} | results ])
      end
  end
end

I don’t understand the following lines:

  1. In FibSolver:
    send client, { :answer, n, fib_calc(n), self() }
  2. In Scheduler:
    send pid, {:fib, next, self()}

Why does the Scheduler includes in the message its process’ PID ? The FibSolver has it already (variable “scheduler”).
And in the FibSolver, why send the message to “client”, he can just send the message to “scheduler” as it seems to me that client and scheduler are the same PID.

Thank you for any help…

1 Like

As written it isn’t necessary.

However it supports the interaction pattern where the scheduler receives the request from a client process and forwards it to one of its workers, i.e.:

defmodule FibSolver do
  def fib(scheduler) do
    send(scheduler, {:ready, self()})

    receive do
      {:fib, n, client} ->
        send(client, {:answer, n, fib_calc(n), self()})
        fib(scheduler)

      :shutdown ->
        :ok
    end
  end

  # very inefficient, deliberately
  defp fib_calc(0), do: 0
  defp fib_calc(1), do: 1
  defp fib_calc(n), do: fib_calc(n - 1) + fib_calc(n - 2)
end

defmodule Scheduler do
  def run(num_processes, module, func) do
    1..num_processes
    |> Enum.map(fn _ -> spawn(module, func, [self()]) end)
    |> schedule_processes([], [])
  end

  defp schedule_processes(processes, ready, queue) do
    receive do
      {:fib, _n, _client} = next when ready != [] ->
        [pid | tail] = ready
        send(pid, next)
        schedule_processes(processes, tail, queue)

      {:fib, _n, _client} = next ->
        schedule_processes(processes, ready, [next | queue])

      {:ready, pid} when queue != [] ->
        [next | tail] = queue
        send(pid, next)
        schedule_processes(processes, ready, tail)

      {:ready, pid} ->
        schedule_processes(processes, [pid | ready], queue)

      :shutdown ->
        Enum.map(processes, &send(&1, :shutdown))
        :ok
    end
  end
end

defmodule Demo do
  def client(scheduler, n) do
    send(scheduler, {:fib, n, self()})

    receive do
      {:answer, m, result, pid} ->
        IO.puts("#{inspect(self())}: #{inspect(pid)} sent result for fib(#{m}) = #{result}")
    after
      30000 ->
        IO.puts("#{inspect(self())}: timed out")
    end
  end

  def run(num_processes) do
    scheduler = spawn(Scheduler, :run, [num_processes, FibSolver, :fib])

    pids = start_clients([scheduler, 37], 20, [])
    wait(pids)

    # stop scheduler
    Process.monitor(scheduler)
    send(scheduler, :shutdown)
    wait([scheduler])
  end

  defp start_clients(_args, i, pids) when i < 1 do
    pids
  end

  defp start_clients(args, i, pids) do
    pid = spawn(__MODULE__, :client, args)
    Process.monitor(pid)
    start_clients(args, i - 1, [pid | pids])
  end

  defp wait([]) do
    :ok
  end

  defp wait(pids) do
    new_pids =
      receive do
        {:DOWN, _ref, :process, pid, _reason} ->
          List.delete(pids, pid)
      end

    wait(new_pids)
  end
end

Demo.run(10)
3 Likes