How to Overwrite a Registry Mapping From a Different Process?

What I’m Trying to Do

I’m just starting out learning Elixir, coming from an imperative programming background. I’m working on a system that needs to maintain a mapping of user_id -> pid.

For each user_id, I want to ensure:

  • There is exactly one active process at any time.

  • When a new process claims a user_id:

  • Any existing process for that user_id is terminated.

  • The new process immediately takes over the user_id mapping.

  • This should happen without retries, reconnects, or race conditions.

What I’ve Tried

Registry

I’m using a :unique Registry. The approach is:

  1. Lookup the existing process with:

Registry.lookup(user_id)

  1. If found, call:

Process.exit(old_pid, :some_reason)

  1. Register the new process with:

Registry.register(user_id, pid)

Problem: There’s a race condition. The new process may attempt to register before the old process has unregistered, resulting in:


{:error, {:already_registered, pid}}

This forces retries or client reconnects, which I’d like to avoid.


ETS

ETS allows atomic key replacement with :ets.insert, but:

  • :cross_mark: No automatic cleanup when processes die.

  • :cross_mark: I’d need to manually Process.monitor and handle :DOWN messages.


My Questions

  1. Is there a way to atomically replace a Registry mapping from another process, without requiring reconnects or retries?

  2. If not, and ETS is the way forward, what are the best practices for managing cleanup and avoiding stale entries?

  3. Otherwise, how can I achieve what I’m trying to do?

Thanks for any insights! :folded_hands:

You can simply unregister the old process first.

As an aside, this question looks to be completely synthesized by ChatGPT (or similar), and I’m curious how others here feel about that. I feel a bit weird responding to model outputs, and the question certainly meanders more than necessary (e.g. bringing up ets).

I’m also surprised the model was not able to come up with a solution here.

Thanks for the reply garrison, Unfortunately this doesn’t work. As far as I know - Registry.unregister/2 can only be called the registered process itself, which means that an external process (the new one) can’t directly unregister the old ones mapping.

This means we still have to terminate the old user session process (e.g., with Process.exit). Since that’s an asynchronous operation, and the Registry’s automatic unregistration upon process death is also asynchronous, the new UserSession might try to Registry.register before the user_id is free, leading to a failed registration (error) and the race condition I’m trying to resolve cleanly.

(P.S. – Nice catch on the AI formatting :laughing: Just trying to keep my question as clear as possible.)

1 Like

You can monitor any still alive processes and register the new process under the users id only on receiving the previous processes exit message.

Oh yeah, you’re right. That is actually rather annoying because…

This was my first thought, but there is no guarantee you receive the DOWN message after the Registry does. I guess the race would be unlikely in practice, but it would still be possible, no? Especially over distribution.

This is exactly the reason Erlang guarantees “directly visible resources” (e.g. real registered names) are cleaned up before links/monitors are fired. Given that Registry can’t make those guarantees I expected there to be an API to unregister another process.

I guess you could use a delay or retry loop, but that seems inelegant.

You know, this is really making me wonder. Why is Registry implemented this way, where processes can only register themselves? It makes things very annoying. The Erlang register/unregister (Process.register/unregister for us) don’t have this problem, they accept a pid.

There is no reason you couldn’t write your own registry which allows this, but I don’t understand why Registry was designed that way.

1 Like

How about using a GenServer as a Broker and putting it under application supervision tree?


defmodule UserProcessBroker do
  use GenServer

  # API

  def start_link(_opts) do
    GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
  end

  def register(user_id, pid) do
    GenServer.call(@name, {:register, user_id, pid})
  end

  ## Server Callbacks

  def init(state) do
    {:ok, state}
  end

  def handle_call({:register, user_id, new_pid}, _from, state) do
    state = cleanup_dead_processes(state)

    case Map.get(state, user_id) do
      nil ->
        ref = Process.monitor(new_pid)
        {:reply, :ok, Map.put(state, user_id, {new_pid, ref})}

      {old_pid, old_ref} when old_pid != new_pid ->
        Process.demonitor(old_ref, [:flush])
        Process.exit(old_pid, :user_replaced)

        ref = Process.monitor(new_pid)
        {:reply, :ok, Map.put(state, user_id, {new_pid, ref})}

      _ ->
        # already registered, same PID
        {:reply, :ok, state}
    end
  end

  def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
    new_state =
      Enum.reduce(state, %{}, fn
        {user_id, {^pid, _ref}}, acc -> acc
        {user_id, entry}, acc -> Map.put(acc, user_id, entry)
      end)

    {:noreply, new_state}
  end

  defp cleanup_dead_processes(state) do
    Enum.reduce(state, %{}, fn {user_id, {pid, ref}}, acc ->
      if Process.alive?(pid) do
        Map.put(acc, user_id, {pid, ref})
      else
        Process.demonitor(ref, [:flush])
        acc
      end
    end)
  end
end

Start the broker somewhere in supervision tree -

children = [
  {UserProcessBroker, []}
]
1 Like