How to restart a genserver under DynamicSupervisor which is started under PartitionSupervisor

Hello friends, I think I am creating a bad config for my project.

I have a genserver is started under DynamicSupervisor and I start it with PartitionSupervisor

my Application config:

  def start(_type, _args) do
    children = [
      {Registry, keys: :unique, name: MishkaJobWorkerRegistry},
      {PartitionSupervisor, child_spec: DynamicSupervisor, name: MishkaJobWorkerSupervisor},
    ]

    opts = [strategy: :one_for_one, name: MishkaInstaller.Supervisor]
    Supervisor.start_link(children, opts)
  end

It is my Genserver simple code:

defmodule Queue.Job do
  use GenServer
  require Logger
  
  def start_link(args) do
    id = Map.get(args, :worker)
    type = Map.get(args, :type, :normal)

    GenServer.start_link(__MODULE__, [], name: via(id, type))
  end

  @impl true
  def init(state \\ %{}) do
    {:ok, state}
  end
  
  @doc false
  def child_spec(process_name) do
    %{
      id: __MODULE__,
      start: {__MODULE__, :start_link, [process_name]},
      restart: :transient,
      max_restarts: 4
    }
  end

  defp via(id, value) do
    {:via, Registry, {MishkaJobWorkerRegistry, id, value}}
  end
end

and I start like this:

  def start_job_worker(args) do
    DynamicSupervisor.start_child(
      {:via, PartitionSupervisor, {MishkaJobWorkerSupervisor, self()}},
      {Queue.Job, args}
    )
  end

But if my genserver crashes unexpectedly, it does not reset it and when I check the process of my genserver , it returns false.

Another problem is

when I use this to terminate one of genserver by pid, it returns not found

DynamicSupervisor.terminate_child(MishkaJobWorkerSupervisor, pid)

Where I am creating a bug?

Thank you in advance

This line is the problem - MishkaJobWorkerSupervisor is the PartitionSupervisor, so the PID isn’t found because it’s a child of one of the supervisor’s children.

The call doesn’t crash because DynamicSupervisor.terminate_child sends a {:terminate_child, pid} message, which is handled by the machinery for :supervisor.terminate_child.

To accurately target the message, you need to use a :via tuple pointing to the PartitionSupervisor:

{:via, PartitionSupervisor, {MishkaJobWorkerSupervisor, self()}}

HOWEVER

If the process that’s calling terminate_child isn’t the same one that started the child, I’m not sure that’s going to find the correct DynamicSupervisor. :thinking:

2 Likes

I do not know :thinking:, for example the PartitionSupervisor has not terminate_child and I am forced to use DynamicSupervisor instead of.


More info.

iex(63)> Queue.Job.find_worker_pid Example.WoerkerTest
[{#PID<0.436.0>, :normal}]

I am using this pid with this commad

DynamicSupervisor.which_children pid

it returns this error

iex(65)> DynamicSupervisor.which_children pid
** (exit) exited in: GenServer.call(#PID<0.436.0>, :which_children, :infinity)
    ** (EXIT) an exception was raised:
        ** (RuntimeError) attempted to call GenServer #PID<0.436.0> but no handle_call/3 clause was provided
            (mishka_installer 0.1.0) /home/runner/work/elixir/elixir/lib/elixir/lib/gen_server.ex:863: MishkaInstaller.ProcessingPipelines.Queue.Job.handle_call/3
            (stdlib 5.2.1) gen_server.erl:1131: :gen_server.try_handle_call/4
            (stdlib 5.2.1) gen_server.erl:1160: :gen_server.handle_msg/6
            (stdlib 5.2.1) proc_lib.erl:241: :proc_lib.init_p_do_apply/3
    (elixir 1.16.2) lib/gen_server.ex:1114: GenServer.call/3
    iex:65: (file)

18:34:53.102 [error] GenServer {MishkaJobWorkerRegistry, Example.WoerkerTest, :normal} terminating
** (RuntimeError) attempted to call GenServer #PID<0.436.0> but no handle_call/3 clause was provided
    (mishka_installer 0.1.0) /home/runner/work/elixir/elixir/lib/elixir/lib/gen_server.ex:863: MishkaInstaller.ProcessingPipelines.Queue.Job.handle_call/3
    (stdlib 5.2.1) gen_server.erl:1131: :gen_server.try_handle_call/4
    (stdlib 5.2.1) gen_server.erl:1160: :gen_server.handle_msg/6
    (stdlib 5.2.1) proc_lib.erl:241: :proc_lib.init_p_do_apply/3
Last message (from #PID<0.235.0>): :which_children
State: []
Client #PID<0.235.0> is alive

    (stdlib 5.2.1) gen.erl:240: :gen.do_call/4
    (elixir 1.16.2) lib/gen_server.ex:1111: GenServer.call/3
    (elixir 1.16.2) src/elixir.erl:405: :elixir.eval_external_handler/3
    (stdlib 5.2.1) erl_eval.erl:750: :erl_eval.do_apply/7
    (elixir 1.16.2) src/elixir.erl:378: :elixir.eval_forms/4
    (elixir 1.16.2) lib/module/parallel_checker.ex:112: Module.ParallelChecker.verify/1
    (iex 1.16.2) lib/iex/evaluator.ex:332: IEx.Evaluator.eval_and_inspect/3
    (iex 1.16.2) lib/iex/evaluator.ex:306: IEx.Evaluator.eval_and_inspect_parsed/3
iex(65)>

I know I can create global handle_call and info to prevent this crash, but after this crashing it creates new pid for me

iex(65)> Queue.Job.find_worker_pid Example.WoerkerTest
[{#PID<0.440.0>, :normal}]

New = 0.440.0
Prv = 0.436.0

Did you any suggestion?

I am really confused where am I doing wrong

You’re still sending messages to the wrong processes.

iex(63)> Queue.Job.find_worker_pid Example.WoerkerTest
[{#PID<0.436.0>, :normal}]

This is the PID of a process running callbacks defined in MishkaInstaller.ProcessingPipelines.Queue.Job.

DynamicSupervisor.which_children pid

DynamicSupervisor.which_children is a very thin wrapper over GenServer.call:

The Queue.Job.handle_call implementation does not define a handler for :which_children, so the worker exits abnormally and is then restarted by its supervisor.


Bigger question: since you already have a mechanism for finding a worker process’s PID, what about signaling that PID directly?

I am using this for finding:

def find_worker_pid(worker) do
    Registry.lookup(MishkaJobWorkerRegistry, worker)
end

For example sending stop, or call

  GenServer.call pid, :pop

  @impl true
  def handle_call(:pop, _from, state) do
    {:reply, state, state}
  end

# OR
iex(25)> pid_via = {:via, Registry, {MishkaJobWorkerRegistry, Example.WoerkerTest, :normal}}
iex(26)> GenServer.call pid_via, :pop

And it returns what I saved in my state

Yes, I know, I just wanted to show an error (by forcing the genserver makes crash), it knows its parent. but why I can not terminated it with children, how can find a child pid and terminated it with DynamicSupervisor

I answered your question?

Unfortunately, I have no idea how to kill a child through the DynamicSupervisor

More info from the genserver pid

Process.info pid
[
  current_function: {:gen_server, :loop, 7},
  initial_call: {:proc_lib, :init_p, 5},
  status: :waiting,
  message_queue_len: 0,
  links: [#PID<0.231.0>, #PID<0.224.0>],
  dictionary: [
    "$initial_call": {MishkaInstaller.ProcessingPipelines.Queue.Job, :init, 1},
    "$ancestors": [#PID<0.231.0>, MishkaJobWorkerSupervisor,
     MishkaInstaller.Supervisor, #PID<0.221.0>]
  ],
  trap_exit: false,
  error_handler: :error_handler,
  priority: :normal,
  group_leader: #PID<0.220.0>,
  total_heap_size: 986,
  heap_size: 376,
  stack_size: 12,
  reductions: 1345,
  garbage_collection: [
    max_heap_size: %{
      error_logger: true,
      include_shared_binaries: false,
      kill: true,
      size: 0
    },
    min_bin_vheap_size: 46422,
    min_heap_size: 233,
    fullsweep_after: 65535,
    minor_gcs: 5
  ],
  suspending: []
]

By the way, when I run this

iex(30)> PartitionSupervisor.which_children MishkaJobWorkerSupervisor
[
  {7, #PID<0.233.0>, :supervisor, [DynamicSupervisor]},
  {6, #PID<0.232.0>, :supervisor, [DynamicSupervisor]},
  {5, #PID<0.231.0>, :supervisor, [DynamicSupervisor]},
  {4, #PID<0.230.0>, :supervisor, [DynamicSupervisor]},
  {3, #PID<0.229.0>, :supervisor, [DynamicSupervisor]},
  {2, #PID<0.228.0>, :supervisor, [DynamicSupervisor]},
  {1, #PID<0.227.0>, :supervisor, [DynamicSupervisor]},
  {0, #PID<0.226.0>, :supervisor, [DynamicSupervisor]}
]

it is linked with number 5

I am very sorry for pinging you, sir, @josevalim maybe PartitionSupervisor needs a terminate_child function?

I think after using the DynamicSupervisor under PartitionSupervisor, I think the terminate_child does not work.

Even I use this:

{:ok, pid} = create_worker.()
{:ok, pid1} = create_worker1.()

iex(4)> sup_pid = GenServer.whereis({:via, PartitionSupervisor, {MishkaJobWorkerSupervisor, pid}})
#PID<0.213.0>
iex(5)> GenServer.whereis({:via, PartitionSupervisor, {MishkaJobWorkerSupervisor, pid1}})
#PID<0.211.0>

DynamicSupervisor.terminate_child(MishkaJobWorkerSupervisor, sup_pid)
{:error, :not_found}

iex(6)> PartitionSupervisor.which_children MishkaJobWorkerSupervisor
[
  {7, #PID<0.213.0>, :supervisor, [DynamicSupervisor]},
  {6, #PID<0.212.0>, :supervisor, [DynamicSupervisor]},
  {5, #PID<0.211.0>, :supervisor, [DynamicSupervisor]},
  {4, #PID<0.210.0>, :supervisor, [DynamicSupervisor]},
  {3, #PID<0.209.0>, :supervisor, [DynamicSupervisor]},
  {2, #PID<0.208.0>, :supervisor, [DynamicSupervisor]},
  {1, #PID<0.207.0>, :supervisor, [DynamicSupervisor]},
  {0, #PID<0.206.0>, :supervisor, [DynamicSupervisor]}
]

create_worker is

  def start_job_worker(args) do
    DynamicSupervisor.start_child(
      {:via, PartitionSupervisor, {MishkaJobWorkerSupervisor, self()}},
      {Queue.Job, args}
    )
  end

when I use this to terminate one of genserver by pid, it returns not found

DynamicSupervisor.terminate_child(MishkaJobWorkerSupervisor, pid)

Where I am creating a bug?

The problem is that you are referencing the wrong name here. MishkaJobWorkerSupervisor is not the name that you should use. If you pay close attention to how the process is started here you can see that the :via tuple has been used. To terminate the child, you need to use the same via tuple.

  def start_job_worker(args) do
    DynamicSupervisor.start_child(
      {:via, PartitionSupervisor, {MishkaJobWorkerSupervisor, self()}},
      {Queue.Job, args}
    )
  end

So to terminate you can do something like

DynamicSupervisor.terminate_child(
  {:via, PartitionSupervisor, {MishkaJobWorkerSupervisor, self()}}, 
  # Remember that because you used self() for the partition key, you need to call this from the same process or replace self() with the correct pid.
  pid
)
2 Likes