Ok so what you can do is:
- each process has a
queue
in their state
- whenever you send work to them, for instance in
handle_call/3
, they do not do the work but just add it to their respective queue
- then they return
0
as a timeout from handle_call
- when they receive the
:timeout
in hande_info/2
, they pull one task from their queue if it is not empty, and handle that task, and then return with a zero timeout from there too.
- in their state, they also keep a flag to tell if they should work or not.
As soon as you will send them a :stop
message, they set their flag to false
and ignore the timeout message, and return :infinity
as a timeout.
I put up a quick and dirty demo:
defmodule Serv do
use GenServer
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: opts[:name])
end
def init(opts) do
{:ok, %{queue: :queue.new(), enabled: true, coworker: opts[:coworker], me: opts[:name]}}
end
def handle_call({:task, val}, _, state) do
{:reply, :ok, %{state | queue: :queue.in(val, state.queue)}, timeout(state)}
end
def handle_call(:pause, _, state) do
{:reply, :ok, %{state | enabled: false}, :infinity}
end
def handle_call(:resume, _, state) do
{:reply, :ok, %{state | enabled: true}, 0}
end
def handle_info(:timeout, state) do
state =
case state.enabled do
true -> do_work(state)
false -> state
end
{:noreply, state, timeout(state)}
end
def handle_info({:your_turn, val}, state) do
{:noreply, %{state | queue: :queue.in(val, state.queue)}, timeout(state)}
end
def do_work(%{queue: q, coworker: buddy} = state) do
case :queue.out(q) do
{:empty, _} ->
state
{{:value, val}, new_q} ->
val = work(val)
IO.puts("result from #{inspect(state.me)}: #{inspect(val)}")
send(buddy, {:your_turn, val})
%{state | queue: new_q}
end
end
def work(val) do
Process.sleep(100)
val + 1
end
defp timeout(%{enabled: enabled}) do
case enabled do
true -> 0
false -> :infinity
end
end
end
{:ok, a} = Serv.start_link(name: A, coworker: B)
{:ok, b} = Serv.start_link(name: B, coworker: A)
GenServer.call(a, {:task, 0})
GenServer.call(b, {:task, 1000})
Process.sleep(1000)
IO.puts("pause A")
GenServer.call(a, :pause)
Process.sleep(1000)
IO.puts("resume A")
GenServer.call(a, :resume)
Process.sleep(1000)
edit: the timeout/1
function should check if the queue is empty. If it is empty it should return :infinity
even if enabled
is true. Otherwise the process will loop again and again on a timeout for nothing.