Alternative to using trap_exit in GenServer

Helllo,

I have created a process registry based on Genserver with each process mapped to a name is itself a genserver which holds a function to run as part of it’s state.

In the registry I am using trap_exit to allow the registry to create a new process with the same function to run if a process dies.

The basic problem being soved is I need to be able to look up a dynamically started process by a string. That process needs to be restarted when it dies with the same args as it was given when first created and still be able to be looked up with the same string as it’s predecessor.

My question is - is this the best way to do this?

Here is the code using the current strategy -

defmodule Comet.HandlerRegistry do
  use GenServer, restart: :permanent

  def start_link(_) do
    GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def lookup(server, topic) do
    GenServer.call(server, {:lookup, topic})
  end

  def add_handler(server, topic, func) do
    GenServer.cast(server, {:create, topic, func})
  end

  def create_handler(topic, func, funs, handlers, refs) do
    if Map.has_key?(handlers, topic) do
      {funs, handlers, refs}
    else
      {:ok, handler} = Comet.Handler.start_link(func)
      ref = Process.monitor(handler)
      refs = Map.put(refs, ref, topic)
      handlers = Map.put(handlers, topic, handler)
      funs = Map.put(funs, topic, func)
      {funs, handlers, refs}
    end
  end

  @impl true
  def init(:ok) do
    Process.flag(:trap_exit, true)
    funs = %{}
    handlers = %{}
    refs = %{}
    {:ok, {funs, handlers, refs}}
  end

  @impl true
  def handle_call({:lookup, topic}, _from, state) do
    {_funs, handlers, _refs} = state
    {:reply, Map.fetch(handlers, topic), state}
  end

  @impl true
  def handle_cast({:create, topic, func}, {funs, handlers, refs}) do
    {funs, handlers, refs} = create_handler(topic, func, funs, handlers, refs)
    {:noreply, {funs, handlers, refs}}
  end

  @impl true
  def handle_info({:DOWN, ref, :process, _pid, _reason}, {funs, handlers, refs}) do
    {topic, refs} = Map.pop(refs, ref)
    handlers = Map.delete(handlers, topic)
    {:noreply, create_handler(topic, Map.get(funs, topic), funs, handlers, refs)}
  end

  @impl true
  def handle_info(_msg, state) do
    {:noreply, state}
  end
end

Thanks, Sean

1 Like

It looks to me like a good use of DynamicSupervisor. Adapting from your example, the basic dynamic supervision logic could look more or less like this:

defmodule MySupervisor do
  use DynamicSupervisor

  def start_link(init_arg) do
    DynamicSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
  end

  def start_child(topic, fun) do
    DynamicSupervisor.start_child(__MODULE__, {Comet.Handler, fun})
  end

  @impl true
  def init(init_arg) do
    DynamicSupervisor.init(
      strategy: :one_for_one,
      extra_arguments: [init_arg]
    )
  end
end

On top of it, you would have to implement the topic registration and lookup logic, which you could implement with Registry, more or less like in the KV.Bucket example on the Elixir guides.

3 Likes

… and Registry is already existing.

You can register a process via something, and lookup via this key.

  def start_link(name, medium \\ %{}) do
    GenServer.start_link(__MODULE__, medium, name: via_tuple(name))
  end
...
  def whereis_name(name) do
    case Registry.lookup(RegSchedulers, name) do
      [{pid, _ref}] -> pid
      [] -> nil
    end
  end
...
  defp via_tuple(name), do: {:via, Registry, {RegSchedulers, name}}
3 Likes

Thanks for that. I was aware of Dynamic Supervisors and the Registry however could not find any documentation for making the Registry aware that a new process had been started for a given key so a new entry could be added.

Going through the guide I am still no closer!! from the guide -

When a bucket terminates, the supervisor will start a new bucket in its place. After all, that’s the role of the supervisor!

However, when the supervisor restarts the new bucket, the registry does not know about it. So we will have an empty bucket in the supervisor that nobody can access! To solve this, we want to say that buckets are actually temporary. If they crash, regardless of the reason, they should not be restarted.

So thinking about this - if a process crashes I can

1 - kill the registry and the dynamic supervisor and restart it all - this is not really an option. The other processes in the registry could be processing something and killing them off could cause data inconsistancy.

2 - when a process starts via the supervisor - either initially or as a replacement, that process would send a message to the registry to register itself.

Am I missing something here? Is there another way to do this?

Don’t do this, there is no need, and a failing worker should not take down its supervisor. That’s why supervisor trap exit.

It’s all done automatically, if a worker dies, it is removed from registry, and when it restart, it register again, with the name You gave to it, with the via tuple.

BTW dynamic workers are supposed to die normally at one time. They usually are transient, meaning they restart only on failure.

Maybe You are looking for pool, like poolboy.

1 Like

Ahh, thanks, I think I now I see how the parts fit together to manage this -

  • Start the registry with a name
  • Start the supervisor
  • Start a child of the supervisor (genserver) which sets its name with the :via the tuple and gets registered with the registry
  • Child crashes
  • Supervisor restarts the process which again registers itself with the Register through the :via tuple

Thank you both @lucaong and @kokolegorille. The best way to do this is something I have been wrestling with for quite a while!!!