How to properly stop Producer -> ConsumerSupervisor?

I am quite new to Elixir and trying to understand GenStage flow.

I created a producer that emits a finite list of numbers

defmodule Producer do
  use GenStage

  def start_link(_) do
    GenStage.start_link(Producer, Enum.to_list(1..10), name: Producer)
  end

  def init(list) do
    {:producer, list}
  end

  def handle_demand(demand, state) when demand > 0 do
    case Enum.split(state, demand) do
      {[],[]} ->
        GenStage.async_info(self(), :stop)
        {:noreply, [], []}

      {pulled, remaining} ->
        {:noreply, pulled, remaining}
    end
  end

  def handle_info(:stop, state) do
    {:stop, :normal, state}
  end
end

and ConsumerSupervisor that starts new children for each number

defmodule ConSupervisor do
  use ConsumerSupervisor

  @opts [
    strategy: :one_for_one,
    subscribe_to: [{Producer, max_demand: 2, min_demand: 0}],
    max_restarts: 20
  ]

  def start_link(arg) do
    ConsumerSupervisor.start_link(__MODULE__, arg)
  end

  def init(_arg) do
    children = [%{id: Consumer, start: {Consumer, :start_link, []}, restart: :transient}]

    ConsumerSupervisor.init(children, @opts)
  end
end

Children are just tasks with 30% rate of success

defmodule Consumer do
  def start_link(event) do
    Task.start_link(fn ->
      Process.sleep(1000)

      if :rand.uniform(3) == 3 do
        IO.puts("task failed with #{event}")
        raise("ooops")
      end

      num = Kernel.inspect(event)
      IO.puts("Inserted #{num}")
    end)
  end
end

I want to terminate Producer the way that it can be assured that all Tasks are completed. For now when it terminates it emits cancel for ConsumerSupervisor that discard all Tasks retrying to finish successfully.

[info] GenStage consumer #PID<0.433.0> is stopping after receiving cancel from producer #PID<0.431.0> with reason: :normal

I feel like it should be tricky and require some consecutive cast/call messaging, but maybe there is more straightforward way to do this?

I’m not very familiar with genState. Maybe you can use ConsumerSupervisor.count_children/1 repeatedly to check is all tasks completed.

1 Like

As @Ljzn mentioned you can check active workers with count_children and stop if there are none active.

  def handle_info(:stop, state) do
    %{active: active} = ConsumerSupervisor.count_children(consumer_pid_or_registered_name)
    if active > 0 do
      Process.send_after(self(), :stop, 5000)
      {:noreply, [], state}
    else
      {:stop, :normal, state}
    end
  end
1 Like

@Ljzn and @NeutronStein thank you, it is exactly what I need!