So here is an approach where you essentially handle the spawned processes by yourself.
# File: calc.ex
defmodule Calc do
@spec run(Integer.t) :: Integer.t
def run(id) when id > 0 do
desc = "#{id} (#{inspect self()})"
IO.puts (" sleeping: " <> desc)
Process.sleep(id)
IO.puts (" waking from slumber: " <> desc)
id
end
end
# File: accumulator.ex
defmodule Accumulator do
def sum(addend, augend) do
Process.sleep(500) # NOTE: This is responsible for some tasks
addend + augend # running beyond the timeout.
end # The point is that only messages
end # up to the timeout message are actually processed
# See what happens when you comment "sleep" out
# File map_reduce.ex
defmodule MapReduce do
use GenServer
defp map_spawn(fun, reply_to),
do: fn elem ->
Kernel.spawn_monitor(
fn ->
reply =
try do
{self(), {:ok, fun.(elem)}}
rescue _ ->
{self(), :error}
end
GenServer.cast reply_to, reply
end
)
end
defp put_task({pid, ref}, tasks),
do: Map.put tasks, pid, ref
defp remove_task(tasks, pid),
do: Map.delete tasks, pid
defp purge_task({pid, ref}) do
# No longer interested in the process
# or the associated :DOWN messages
Process.demonitor ref, [:flush]
result =
cond do
Process.alive? pid ->
Process.exit pid, :kill
true ->
false # no need
end
IO.puts " purge task #{inspect pid}: #{inspect result}"
result
end
defp cleanup_tasks(tasks) do
tasks
|> Map.to_list()
|> Enum.each(&purge_task/1)
end
defp assess_progress({tasks, _, _, _, _} = state) do
cond do
tasks == %{} ->
{:stop, :normal, state}
true ->
{:noreply, state}
end
end
defp handle_result({tasks, acc, reduce, reply_to, timer_ref}, {pid, result}) do
new_acc =
case result do
{:ok, value} ->
reduce.(acc, value)
:error ->
acc # TODO account for errors in the reply to creator
end
assess_progress {(remove_task tasks, pid), new_acc, reduce, reply_to, timer_ref}
end
def handle_down({tasks, acc, reduce, reply_to, timer_ref}, pid, reason) do
IO.puts "Task #{inspect pid} terminated with reason: #{inspect reason}"
# TODO: should indicate in reply to creator that there are DOWN tasks
assess_progress {(remove_task tasks, pid), acc, reduce, reply_to, timer_ref}
end
## callbacks: message handlers
def handle_cast({pid, _} = reply, {tasks, _, _, _, _} = state) when is_pid pid do
# process result
case Map.get tasks, pid, :none do
:none ->
{:noreply, state}
ref ->
Process.demonitor ref, [:flush] # purge :DOWN message
handle_result state, reply
end
end
def handle_cast(:init, {data, init, fun, reduce, reply_to, timeout}) do
tasks =
data
|> Enum.map(map_spawn(fun, self()))
|> Enum.reduce(%{}, &put_task/2)
timer_ref = Process.send_after self(), :timeout, timeout
{:noreply, {tasks, init, reduce, reply_to, timer_ref}}
end
def handle_info(:timeout, {tasks, acc, reduce, reply_to, _timer_ref}) do
IO.puts " !!! timeout !!!"
{:stop, :normal, {tasks, acc, reduce, reply_to, :none}}
end
def handle_info({:DOWN, _ref, :process, pid, reason}, state) do
handle_down state, pid, reason
end
## callbacks: lifecycle
def init(arg) do
GenServer.cast self(), :init # return ASAP, i.e. delay initialization
{:ok, arg}
end
def terminate(reason, {tasks, acc, _reduce, reply_to, timer_ref}) do
reply =
case {reason, timer_ref} do
{:normal, :none} ->
{:timeout, self(), acc}
{:normal, _} ->
Process.cancel_timer timer_ref
{:ok, self(), acc}
_ ->
{:error, self(), reason}
end
cleanup_tasks tasks
Kernel.send reply_to, reply
IO.puts " unprocessed messages - #{inspect (Process.info self(), :message_queue_len)}"
end
## public interface
def start(data, init, fun, reduce, timeout) do
args = {data, init, fun, reduce, self(), timeout}
# Note: [spawn_opt: :monitor] is not allowed
# http://erlang.org/doc/man/gen_server.html#start_link-4
#
case GenServer.start __MODULE__, args do
{:ok, pid} ->
ref = Process.monitor pid
{pid, ref}
other ->
other
end
end
def run_demo(diff \\ 0, timeout \\ 900) do
data = Enum.map 1..10, fn _ -> Enum.random(1..1000) - diff end
start data, 0, &Calc.run/1, &Accumulator.sum/2, timeout
end
end
.
$ iex -S mix
iex(1)> MapReduce.run_demo 700
sleeping: 6 (#PID<0.120.0>)
sleeping: 126 (#PID<0.122.0>)
sleeping: 130 (#PID<0.125.0>)
sleeping: 152 (#PID<0.127.0>)
sleeping: 289 (#PID<0.128.0>)
{#PID<0.118.0>, #Reference<0.0.4.573>}
waking from slumber: 6 (#PID<0.120.0>)
waking from slumber: 126 (#PID<0.122.0>)
waking from slumber: 130 (#PID<0.125.0>)
waking from slumber: 152 (#PID<0.127.0>)
waking from slumber: 289 (#PID<0.128.0>)
unprocessed messages - {:message_queue_len, 1}
iex(2)> flush()
{:ok, #PID<0.118.0>, 703}
{:DOWN, #Reference<0.0.4.573>, :process, #PID<0.118.0>, :normal}
:ok
iex(3)> MapReduce.run_demo 0, 500
{#PID<0.131.0>, #Reference<0.0.5.441>}
sleeping: 203 (#PID<0.132.0>)
sleeping: 315 (#PID<0.133.0>)
sleeping: 944 (#PID<0.134.0>)
sleeping: 959 (#PID<0.135.0>)
sleeping: 304 (#PID<0.136.0>)
sleeping: 783 (#PID<0.137.0>)
sleeping: 407 (#PID<0.138.0>)
sleeping: 117 (#PID<0.139.0>)
sleeping: 44 (#PID<0.140.0>)
sleeping: 564 (#PID<0.141.0>)
waking from slumber: 44 (#PID<0.140.0>)
waking from slumber: 117 (#PID<0.139.0>)
waking from slumber: 203 (#PID<0.132.0>)
waking from slumber: 304 (#PID<0.136.0>)
waking from slumber: 315 (#PID<0.133.0>)
waking from slumber: 407 (#PID<0.138.0>)
waking from slumber: 564 (#PID<0.141.0>)
waking from slumber: 783 (#PID<0.137.0>)
waking from slumber: 944 (#PID<0.134.0>)
waking from slumber: 959 (#PID<0.135.0>)
!!! timeout !!!
kill task #PID<0.134.0>: false
kill task #PID<0.135.0>: false
kill task #PID<0.137.0>: false
kill task #PID<0.141.0>: false
unprocessed messages - {:message_queue_len, 4}
iex(4)> flush()
{:timeout, #PID<0.131.0>, 1390}
{:DOWN, #Reference<0.0.5.441>, :process, #PID<0.131.0>, :normal}
:ok
iex(5)>
In the end though you shouldn’t spin off processes just because you can. Make sure there is a “good enough reason” for the process to exist. Have a really good, long look at The Erlangelist: To spawn, or not to spawn?
Edit: Added some more IO.puts
to reveal more information to help explain behaviour that may seem strange on first blush.