I’ve been thinking if it’s possible to implement a worker pool using GenStage and if that’s a good idea. My initial idea was to create the producer that would produce work items and then create a consumer that would execute the work. The pool would be controlled by ConsumerSupervisor. This seems easy for a fire-and-forget approach, but I need to communicate the result back to the producer. Would sending a message from the worker to the producer be considered a clean design or a hack?
I’m also having an issue with error recovery. When I run the code below, processing each 5th item intentionally crashes. The worker is restarted, but somehow processing of the 6th item is lost. What’s up with that?
defmodule ExGs.Application do
use Application
def start(_type, _args) do
IO.puts("Application.start")
children = [
ExGs.Producer,
ExGs.Consumer
]
opts = [strategy: :one_for_one, name: ExGs.Supervisor]
Supervisor.start_link(children, opts)
end
end
defmodule ExGs.Producer do
use GenStage
def start_link(arg) do
IO.puts("Producer.start_link")
GenStage.start_link(__MODULE__, arg, name: __MODULE__)
end
def init(_arg) do
IO.puts("Producer.init")
{:producer, 0}
end
def handle_demand(demand, state) when demand > 0 do
IO.puts("Producer.handle_demand with #{demand}")
events = Enum.map(0..demand-1, &"#{state + &1}")
{:noreply, events, state + demand}
end
end
defmodule ExGs.Consumer do
use ConsumerSupervisor
def start_link(arg) do
IO.puts("Consumer.start_link")
ConsumerSupervisor.start_link(__MODULE__, arg)
end
def init(_arg) do
IO.puts("Consumer.init")
children = [
%{
id: ExGs.Worker,
start: {ExGs.Worker, :start_link, []},
restart: :transient
}
]
opts = [strategy: :one_for_one, subscribe_to: [{ExGs.Producer, max_demand: 1}]]
ConsumerSupervisor.init(children, opts)
end
end
defmodule ExGs.Worker do
def start_link(event) do
IO.puts("Worker.start_link with #{event}")
Task.start_link(fn ->
IO.puts("Worker.Task handling event #{event}")
if String.ends_with?(event, "5") do
IO.puts("Worker.Task crashing #{event}...")
z = nil
z.crash
else
Process.sleep(1000)
IO.puts("Worker.Task fetched #{event}")
end
end)
end
end
def handle_demand(demand, state) when demand > 0 do
peek()
new_state = demand + state;
last = new_state - 1
IO.puts("Producer.handle_demand with #{demand},#{last}")
events = Enum.map(state..last, &"#{&1}")
{:noreply, events, new_state}
end
defp peek() do
messages = inspect Process.info(self(),[:messages])
IO.puts("Messages: #{messages}")
end
It results in the following output:
13:06:35.572 [error] Task #PID<0.180.0> started from #PID<0.169.0> terminating
** (UndefinedFunctionError) function nil.crash/0 is undefined (module nil is not available)
nil.crash()
(elixir) lib/task/supervised.ex:88: Task.Supervised.do_apply/2
(stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Function: #Function<0.85896955/0 in ExGs.Worker.start_link/1>
Args: []
Messages: [messages: [{:DOWN, #Reference<0.2379802615.4220518404.113316>, :process, #PID<0.169.0>, :shutdown}]]
Producer.handle_demand with 1,6
Messages: [messages: []]
Producer.handle_demand with 1,7
Worker.start_link with 7
That is, when the producer runs handle_demand the news of the demise of the consumer process is still unprocessed in the process mailbox. So it is likely that the pid is still in the %GenStage{:consumers} map and more importantly as a subscription in the %GenStage{:dispatcher_state} - ultimately causing the event to be cast to a dead process.
I’d consider rescue exceptions in your worker, and signal task failure / completion to the producer. The producer would have to track events that have been sent but not completed, and wait for confirmation or a timeout.
I’m starting to think using supervision trees should be used for restarting long-lived processes, not retrying short lived tasks.
If a task fails quickly, it will take out the whole ConsumerSupervisor and all the in-progress tasks.
Would rescuing exceptions handle all possible failures of the worker? If not, then I’d still need to be prepared for the crashes of the worker.
Each process should live under a supervision tree so that we have clean shutdown/restarts. So this implies that the restart strategy for these tasks pretty much has to be :temporary.
i.e. don’t have the supervisor retry the failed action. “Let it fail” needs to get past a failing action, so retrying a failing action in perpetuity is not an option. On :transient the (repeating) failure escalates to terminating the supervisor which is really only appropriate if failure on any one single event is intolerable (i.e. all-or-nothing) - but that would also imply that the next higher level supervisor would likely restart everything (:one_for_all) which is not the case here.
Would rescuing exceptions handle all possible failures of the worker?
Supervisors are primarily about resilience in the face of unexpected failure. If you expect failures and handling them goes beyond the supervisor strategies then you need to implement your own manager/handler process that traps exits and deals with pre-mature linked process terminations accordingly. Standard supervisors don’t implement complex behaviours in order to keep them extremely reliable.