Using GenStage to implement a worker pool

I’ve been thinking if it’s possible to implement a worker pool using GenStage and if that’s a good idea. My initial idea was to create the producer that would produce work items and then create a consumer that would execute the work. The pool would be controlled by ConsumerSupervisor. This seems easy for a fire-and-forget approach, but I need to communicate the result back to the producer. Would sending a message from the worker to the producer be considered a clean design or a hack?

I’m also having an issue with error recovery. When I run the code below, processing each 5th item intentionally crashes. The worker is restarted, but somehow processing of the 6th item is lost. What’s up with that?

defmodule ExGs.Application do
  use Application

  def start(_type, _args) do
    IO.puts("Application.start")

    children = [
      ExGs.Producer,
      ExGs.Consumer
    ]

    opts = [strategy: :one_for_one, name: ExGs.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

defmodule ExGs.Producer do
  use GenStage

  def start_link(arg) do
    IO.puts("Producer.start_link")
    GenStage.start_link(__MODULE__, arg, name: __MODULE__)
  end

  def init(_arg) do
    IO.puts("Producer.init")
    {:producer, 0}
  end

  def handle_demand(demand, state) when demand > 0 do
    IO.puts("Producer.handle_demand with #{demand}")
    
    events = Enum.map(0..demand-1, &"#{state + &1}")

    {:noreply, events, state + demand}
  end
end

defmodule ExGs.Consumer do
  use ConsumerSupervisor

  def start_link(arg) do
    IO.puts("Consumer.start_link")
    ConsumerSupervisor.start_link(__MODULE__, arg)
  end

  def init(_arg) do
    IO.puts("Consumer.init")
    children = [
      %{
        id: ExGs.Worker,
        start: {ExGs.Worker, :start_link, []},
        restart: :transient
      }
    ]
    opts = [strategy: :one_for_one, subscribe_to: [{ExGs.Producer, max_demand: 1}]]
    ConsumerSupervisor.init(children, opts)
  end
end

defmodule ExGs.Worker do
  def start_link(event) do
    IO.puts("Worker.start_link with #{event}")
    Task.start_link(fn ->
      IO.puts("Worker.Task handling event #{event}")
      if String.ends_with?(event, "5") do
        IO.puts("Worker.Task crashing #{event}...")
        z = nil
        z.crash
      else
        Process.sleep(1000)
        IO.puts("Worker.Task fetched #{event}")
      end
    end)
  end
end
1 Like

Consider the following modification:

  def handle_demand(demand, state) when demand > 0 do
    peek()
    new_state = demand + state;
    last = new_state - 1
    IO.puts("Producer.handle_demand with #{demand},#{last}")
    events = Enum.map(state..last, &"#{&1}")
    {:noreply, events, new_state}
  end

  defp peek() do
    messages = inspect Process.info(self(),[:messages])
    IO.puts("Messages: #{messages}")
  end

It results in the following output:

13:06:35.572 [error] Task #PID<0.180.0> started from #PID<0.169.0> terminating
** (UndefinedFunctionError) function nil.crash/0 is undefined (module nil is not available)
    nil.crash()
    (elixir) lib/task/supervised.ex:88: Task.Supervised.do_apply/2
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Function: #Function<0.85896955/0 in ExGs.Worker.start_link/1>
    Args: []
Messages: [messages: [{:DOWN, #Reference<0.2379802615.4220518404.113316>, :process, #PID<0.169.0>, :shutdown}]]
Producer.handle_demand with 1,6
Messages: [messages: []]
Producer.handle_demand with 1,7
Worker.start_link with 7

That is, when the producer runs handle_demand the news of the demise of the consumer process is still unprocessed in the process mailbox. So it is likely that the pid is still in the %GenStage{:consumers} map and more importantly as a subscription in the %GenStage{:dispatcher_state} - ultimately causing the event to be cast to a dead process.

4 Likes

I’d consider rescue exceptions in your worker, and signal task failure / completion to the producer. The producer would have to track events that have been sent but not completed, and wait for confirmation or a timeout.

I’m starting to think using supervision trees should be used for restarting long-lived processes, not retrying short lived tasks.

If a task fails quickly, it will take out the whole ConsumerSupervisor and all the in-progress tasks.

3 Likes

Thanks for the analysis! So what would be the right way to handle such errors? I would expect that GenStage would handle this situation :thinking:

Would rescuing exceptions handle all possible failures of the worker? If not, then I’d still need to be prepared for the crashes of the worker.

Each process should live under a supervision tree so that we have clean shutdown/restarts. So this implies that the restart strategy for these tasks pretty much has to be :temporary.

1 Like

It does, but in this case we are dealing with a race condition. The simplest way to move past it is

    children = [
      %{
        id: ExGs.Worker,
        start: {ExGs.Worker, :start_link, []},
        restart: :temporary
      }

i.e. don’t have the supervisor retry the failed action. “Let it fail” needs to get past a failing action, so retrying a failing action in perpetuity is not an option. On :transient the (repeating) failure escalates to terminating the supervisor which is really only appropriate if failure on any one single event is intolerable (i.e. all-or-nothing) - but that would also imply that the next higher level supervisor would likely restart everything (:one_for_all) which is not the case here.

Would rescuing exceptions handle all possible failures of the worker?

Supervisors are primarily about resilience in the face of unexpected failure. If you expect failures and handling them goes beyond the supervisor strategies then you need to implement your own manager/handler process that traps exits and deals with pre-mature linked process terminations accordingly. Standard supervisors don’t implement complex behaviours in order to keep them extremely reliable.

4 Likes