Sure Jose.
In fact the DynamicSupervisor seems not to detect the termination of the child as it reports the child / children (pid / pids) in its response to the which_children call but Process.alive? reports false. I will pair down the example here.
Producer
defmodule Datasets do
alias Experimental.GenStage
alias __MODULE__, as: Mod
use GenStage
def start_link(tenant_id) do
GenStage.start_link(Mod, tenant_id)
end
def init(tenant_id) do
{:producer, {tenant_id, 0}}
end
def handle_demand(demand, {tenant_id, served}) when demand > 0 do
events =
1..100
|> Enum.drop(served)
|> Enum.take(demand)
IO.inspect(events)
{:noreply, events, {tenant_id, served + Kernel.length(events)}}
end
end
DynamicSupervisor Consumer Module
defmodule DynamicDatasetPipeline do
alias Experimental.DynamicSupervisor
alias __MODULE__, as: Mod
use Experimental.DynamicSupervisor
def start_link() do
DynamicSupervisor.start_link(Mod, nil, name: DynamicDatasetPipeline)
end
def init(nil) do
children = [
worker(DynamicChild, [], restart: :temporary)
]
{:ok, children, strategy: :one_for_one, max_restarts: 0}
end
end
Dynamic Child
defmodule DynamicChild do
use GenServer
alias __MODULE__, as: Mod
def start_link(tenant_id) do
GenServer.start(Mod, tenant_id)
end
def init(state) do
Process.send_after(Kernel.self(), :finish, 5_000)
{:ok, state}
end
def handle_info(:finish, state) do
{:stop, :normal, state}
end
end
I tested it in iex
via
{:ok, producer} = Datasets.start_link(tenant_id)
{:ok, consumer} = DynamicDatasetPipeline.start_link()
GenStage.sync_subscribe(consumer, to: producer, min_demand: 1, max_demand: 5, cancel: :temporary)
and I only see [1, 2, 3, 4, 5]
output and I can see that the children are not alive after the 5 seconds but the DynamicSupervisor thinks they are via
iex(34)> DynamicSupervisor.which_children(consumer) |> Enum.map(fn {_, c, _, _} -> Process.alive? c end)
[false, false, false, false, false]
Hope this helps.