DynamicSupervisor Asking for more demand when child exits

I am having some difficulty in getting the GenStage DynamicSupervisor when operating as a Consumer to react to the child process terminating and to subsequently ask the Producer for more events (demand). Reading the code and the tests, I see that the tests handle this case by explicitly terminating the child process via terminate_child(sup, child) whereas I was expecting the DynamicSupervisor to detect the :DOWN message from the monitored child in order to ask for more demand.

Am I mis-interpreting this ability or is this not by design? I can make my dynamic child process call the terminate_child function of the DynamicSupervisor but that coupling feels a bit wrong.

Thanks
Mike

The DynamicSupervisor doesn’t use monitors but links + trap_exits and it automatically detects when a child terminates and automatically sends more demand. The example in examples/dynamic_supervisor.exs is a working example of how it behaves (and you can see it sends more demand as the child terminates).

Maybe the difference in behaviour you see is due to the producer? Can you tell us more?

Sure Jose.

In fact the DynamicSupervisor seems not to detect the termination of the child as it reports the child / children (pid / pids) in its response to the which_children call but Process.alive? reports false. I will pair down the example here.

Producer

defmodule Datasets do
  alias Experimental.GenStage
  alias __MODULE__, as: Mod
  use GenStage

  def start_link(tenant_id)  do
    GenStage.start_link(Mod, tenant_id)
  end

  def init(tenant_id) do
    {:producer, {tenant_id, 0}}
  end

  def handle_demand(demand, {tenant_id, served}) when demand > 0 do
    events =
      1..100
      |> Enum.drop(served)
      |> Enum.take(demand)

    IO.inspect(events)

    {:noreply, events, {tenant_id, served + Kernel.length(events)}}
  end
end

DynamicSupervisor Consumer Module

defmodule DynamicDatasetPipeline do
  alias Experimental.DynamicSupervisor

  alias __MODULE__, as: Mod

  use Experimental.DynamicSupervisor

  def start_link() do
    DynamicSupervisor.start_link(Mod, nil, name: DynamicDatasetPipeline)
  end

  def init(nil) do
    children = [
      worker(DynamicChild, [], restart: :temporary)
    ]

    {:ok, children, strategy: :one_for_one, max_restarts: 0}
  end
end

Dynamic Child

defmodule DynamicChild do
  use GenServer
  alias __MODULE__, as: Mod
  
 def start_link(tenant_id) do
    GenServer.start(Mod, tenant_id)
  end

  def init(state) do
    Process.send_after(Kernel.self(), :finish, 5_000)
    {:ok, state}
  end

  def handle_info(:finish, state) do
    {:stop, :normal, state}
  end
end

I tested it in iex via

{:ok, producer} = Datasets.start_link(tenant_id)
{:ok, consumer} = DynamicDatasetPipeline.start_link()

GenStage.sync_subscribe(consumer, to: producer, min_demand: 1, max_demand: 5, cancel: :temporary)

and I only see [1, 2, 3, 4, 5] output and I can see that the children are not alive after the 5 seconds but the DynamicSupervisor thinks they are via

iex(34)> DynamicSupervisor.which_children(consumer) |> Enum.map(fn {_, c, _, _} -> Process.alive? c end)
[false, false, false, false, false]

Hope this helps.

You must start your child processes with start_link. Otherwise the supervisor and child process won’t be linked and the supervisor won’t know when the child process terminates.

Facepalm slap!

So wrapped up in the depths of my code I missed that silly error.

Sorry to have bothered you with my mistake

Regards

1 Like

No problem! We will investigate if we can make such mistakes more obvious.

1 Like