Registry returns stopped processes

This is my registry with some custom client code:

defmodule Auther.OAuth.Store.Registry do
  use Supervisor

  alias Auther.OAuth.Store.Entry

  @registry_name __MODULE__
  @dyn_supervisor_name __MODULE__.Supervisor

  def start_link(_arg) do
    Supervisor.start_link(__MODULE__, [])
  end

  @impl Supervisor
  def init(_) do
    children = [
      {DynamicSupervisor, strategy: :one_for_one, name: @dyn_supervisor_name},
      {Registry, keys: :unique, name: @registry_name}
    ]

    Supervisor.init(children, strategy: :one_for_all)
  end

  @spec insert(key :: String.t(), value :: any) :: any
  def insert(key, value) do
    name = name_for_key(key)

    DynamicSupervisor.start_child(@dyn_supervisor_name, {Entry, [name, value]})
  end

  defp name_for_key(key), do: {:via, Registry, {@registry_name, key}}

  @spec fetch(key :: String.t()) :: {:ok, any} | {:error, :entry_not_found}
  def fetch(key) do
    res =
      with [{pid, _val}] <- Registry.lookup(@registry_name, key),
           true <- Process.alive?(pid), # todo this OK?
           do: {:ok, Entry.get(pid)}

    case res do
      {:ok, _} = res -> res
      _ -> {:error, :entry_not_found}
    end
  end
end

Here are the entries (GenServers):

defmodule Auther.OAuth.Store.Entry do
  use GenServer, restart: :transient
  require Logger

  @expiry_time :timer.minutes(2)

  @impl GenServer
  def init(params) do
    schedule_expiry()
    {:ok, params}
  end

  defp schedule_expiry() do
    Process.send_after(self(), :expired, @expiry_time)
  end

  @impl GenServer
  def handle_info(:expired, state) do
    Logger.debug("Temporary credentials expired.")
    {:stop, :normal, state}
  end

  @impl GenServer
  def handle_call(:get, _from, state) do
    Logger.debug("Temporary credentials accessed, removing them now.")
    {:stop, :normal, state, state}
  end

  #### CLIENT

  def start_link([name, data]) do
    GenServer.start_link(__MODULE__, data, name: name)
  end

  def get(pid) do
    GenServer.call(pid, :get)
  end
end

The goal is to have every entry either expire after 2min or remove itself when accessed once. This works by having the GenServer entries stop themselfes.

The problem is that in Registry.fetch/1 the Registry.lookup/2 function sometimes returns processes, that are already shut down or in the process of shutting down. I can consistently reproduce the problem if I call Registry.fetch/1 twice with the same key argument. The first call returns the expected process and the message can be sent, the second call returns the same process but the message can’t be sent because it’s already stopped.

I have temporarily solved the problem by adding Process.alive? into my with expression, but this seems fragile to me. Is there another approach I can use to make this more resilient?

You can’t. Given the async nature you cannot prevent this race condition. Even Process.alive? won’t solve the problem as the process might shut down between checking if it is alive and sending a message. You essentially have to deal with the fact that you might try to retrieve data from a no longer alive process.

3 Likes

That could be made easier and cleaner by using your :via tuple in fetch and handle the exit message if your process is not alive, instead of doing it all by yourself.

1 Like

Take a look at how GenServer.call deals with this:

The catch clause here deals with situations where the PID obtained from whereis doesn’t survive to the end of the call.

Digging in further, we can see how :gen.call does it:

This is a common approach in OTP: before sending a message, attach a monitor to the target. This guarantees either a reply to the message or a {:DOWN, ...} message, even if the target PID exited before the monitor was created.

3 Likes

Combining previous answers: If you use Entry.get({:via, Registry, {@registry_name, key}}) you’ll get the automatic error handling of GenServer.call, as you’re not fetching the pid on your own.

2 Likes

That won’t change anything call-site though, right? I’ll still get the :exit being thrown at me.

I’ve changes it to this for now:

def fetch(key) do
    try do
      case Registry.lookup(@registry_name, key) do
        [{pid, _val}] -> {:ok, Entry.get(pid)}
        _ -> {:error, :entry_not_found}
      end
    catch
      :exit, _ -> {:error, :entry_not_found}
    end
  end

More something like that (not tested):

def fetch(key) do
  try do
      GenServer.call(name_for_key(key), :get)
  catch
    :exit, _ -> {:error, :entry_not_found}
  end
end

You may also want to check what is the exit reason to distinguish between processes that were not found and other reasons.