Unknown registry GenServer

I am circling around with the following problem and no post helped me so far.
I have a GenServer module named MyApp.Repo here that runs GenServer.cast but GenServer.call returns an error and kills the process.

GenServer MyApp.Repo terminating
** (ArgumentError) unknown registry: MyApp.Repo

When I start the app, I check:

iex> Enum.member?(Process.registered(), MyApp.Repo) 
=> true

The GenServer module MyApp.Repo is supervised as a child by the Application supervisor.

children = [
      MyAppWeb.Telemetry,
      {Phoenix.PubSub, name: MyApp.PubSub, adapter: Phoenix.PubSub.PG2},
      MyApp.Repo,
     MyAppWeb.Endpoint
  ...

MyApp.Repo module is a basic GenServer that registers a :name:

def start_link(),
  do: GenServer.start_link(__MODULE__, [], name: __MODULE__)
 
def save_with_cast(user),
  do: GenServer.cast(__MODULE__, {:new, user})

def save_with_call(user) do
  IO.inspect(user, label: "CALL______")  
  GenServer.call(__MODULE__, {:new, user})
end
 
def handle_call({:new, user}, _from,_state) do
    IO.puts("CALL______")
    Phoenix.PubSub.broadcast_from(MyApp.PubSub, self(), @topic, {:perform_new, user})
  {:reply, user, [])
end

def handle_cast({:new, user}, _state) do
   Phoenix.PubSub.broadcast_from(MyApp.PubSub, self(), @topic, {:perform_new, user})
  {:noreply, []}
end

def handle_info({:perform_new, message}, _state) do
  ...

MyApp.save_with_cast(user) runs :ok but not MyApp.save_with_call(user).

Can you paste the implementations of handle_cast/2 and handle_call/3, and the stacktrace of the ArgumentError?

I added some code above, and the stacktrace is below:

<< this is sent by the client function >>
CALL____: {"aa@mail.co.uk",
 "SFMyNTY.g2gDbQAAAA1hYUBtYWlsLmNvLnVrbgYAJtMGyoEBYVo.W9C4G6ovv49k_Q-eL-mIUI84DLgDzKw9Le2nwavVuKQ",
 "3efb4325-d1f6-4002-840b-0a2e717c99d8", 1656951853878154000}
<< the server is never called >>
[error] GenServer MyApp.Repo terminating
** (ArgumentError) unknown registry: MyApp.Repo
    (elixir 1.13.4) lib/registry.ex:1338: Registry.info!/1
    (elixir 1.13.4) lib/registry.ex:834: Registry.unregister/2
    (my_app 0.1.0) lib/my_app/repo.ex:138: MyApp.Repo.terminate/2
    (stdlib 4.0.1) gen_server.erl:1158: :gen_server.try_terminate/3
    (stdlib 4.0.1) gen_server.erl:1348: :gen_server.terminate/10
    (stdlib 4.0.1) proc_lib.erl:240: :proc_lib.init_p_do_apply/3
Last message (from #PID<0.596.0>): {:new, {"aa@mail.co.uk", "SFMyNTY.g2gDbQAAAA1hYUBtYWlsLmNvLnVrbgYAJtMGyoEBYVo.W9C4G6ovv49k_Q-eL-mIUI84DLgDzKw9Le2nwavVuKQ", "3efb4325-d1f6-4002-840b-0a2e717c99d8", 1656951853878154000}}
State: []
Client #PID<0.596.0> is alive

Where does the message variable come from?

Is MyApp.Repo pubsub broadcasting to itself?

What is in your terminate/2 callback? It looks like you might be trying to unregister from a nonexistent registry with Registry.unregister(__MODULE__, key)

message was user in fact, I copied the wrong one. I changed the names to be more readable.

Then I added this because I saw this somewhere and wasn’t sure :slight_smile:

def terminate(reason, state) do
    Phoenix.PubSub.unsubscribe(MyApp.Repo, @topic)
    {:stop, reason, state}
  end

OK, I removed this. Now I have a different error:

[error] GenServer MyApp.Repo terminating
** (FunctionClauseError) no function clause matching in MyApp.Repo.handle_call/3
    (my_app 0.1.0) lib/my_app/repo.ex:83:MyApp.Repo.handle_call({:new, {"aa@mail.co.uk", "SFMyNTY.g2gDbQAAAA1hYUBtYWlsLmNvLnVrbgYAcMIZyoEBYVo.IFUl3rJgl7u3RpcHTjTC9VJzZJSQ-TXmBg4DT6BXBYU", "c29df058-f55e-4047-bf86-5b3ddb5fa17b", 1656953094779893000}}, {#PID<0.596.0>, [:alias | #Reference<0.603041372.4001431555.215745>]}, [])

This was my inspiration

It was a typo (:nw instead of :new in the handle_call). cast and call run normal. Thanks for the guidance on this terminatefunction. So the typo killed or stopped and the termination function destroyed things I believe.

There it is!

unsubscribe/2, like broadcast and subscribe and friends, needs a valid PubSub name as the first argument.

Phoenix.PubSub.unsubscribe(MyApp.PubSub, @topic) would be correct, it infers the pid to unsubscribe automatically. That is the cause of your ** (ArgumentError) unknown registry: MyApp.Repo. Unsubscribing in the terminate/2 callback is not necessary by the way, you are subscribed by pid which is automatically cleaned up if the process restarts. Also you do not need pubsub at all to send messages to yourself, just use send(self(), {:perform_new, user}) if MyApp.Repo is the only process interested in the message. And even then, you only need to do this if you require async execution, otherwise you can implement the perform_new directly in the original callback. Having separate call and cast handlers that implement the same message sending to self() like this is kind of an anti-pattern in my opinion, call implies that it’s a synchronous operation but you are forcing it to be async so there’s no reason to have a call implementation.

All that said, this error is raising because your process is already crashing somewhere else. If you remove this terminate/2 callback entirely you may get a more helpful error message about what the real problem is.

1 Like

Double check the handle_call/3 function head, this is indicating you made a mistake there. Or you might need to recompile() your project if you’ve added this function since starting your session

With the way you’re editing your posts as you solve your problem it makes the thread harder to follow for anyone else reading it, and makes it look like I’m replying to a problem you already solved lol. Might want to post new developments as new comments especially if previous posts have already been replied to.

1 Like

ok! Thanks!

1 Like

Yes, this needs an explanation. I am “pubsubing” to other nodes, not sending messages to self()

1 Like

Gotcha, that makes sense

A general observation: if you’re ignoring the GenServer’s state in every handler, you probably don’t need a GenServer. For instance, save_with_call could be simplified to:

def save_with_call(user) do
  Phoenix.PubSub.broadcast_from(MyApp.PubSub, self(), @topic, {:perform_new, user})
end

As a bonus, this approach doesn’t have a single process (the GenServer) forcing everything to happen one-at-a-time.

OK, good point, in fact, I really wondered, indeed, but how do I respond to :perform_new if not from a Genserver? As you say, I only use the messaging part.

You can have a GenServer (or any interested process) subscribe to and handle the messages, or a pool of them with a pool manager dispatching the messages, but you don’t need to send the messages from the GenServer if that’s all that save_with_* is doing. You can pubsub those messages from any process and your MyApp.Repo servers will subscribe to that topic. Then you’ll avoid routing new messages through the same process that wants to receive them, which significantly reduces the work load on each one and increases their throughput receiving the messages

Firstly apologize, just a beginner with Elixir. Then yes I believe I started with this, not using GenServer.call. This relieves a bit of pressure but I may still need the GenServer behaviour. Nodes subscribe to this pubusub topic. Nodes also listen to :net_kernel.monitor_nodes events, and broadcast on a :nodeup event. I need to capture the message the nodes broadcasts and do something, so I don’t know how to do this is not using a handle_info matcher offered by the server

Yes, I don’t need this handle_call indeed.

No need to apologize! OTP takes time to learn but it’s a rewarding process.

Yes exactly, you can have a process or pool of processes on each node subscribing to the pubsub messages, even though they can be broadcasted by any caller in the cluster.

You will also need a long running process to listen for these :nodeup events and rebroadcast them, and any interested process can subscribe to the topic. You should probably have a NodeListener with the sole responsibility to handle and rebroadcast these events. All of the messages we’ve discussed so far can be handled by the handle_info/2 callback of the GenServer behaviour.

You only need to call a GenServer if it holds some state (data, connection, etc.) or performs some computation and you need the results returned. It’s always synchronous. If the calling process can do the work itself then it probably should unless you need to offload it for async behavior or error isolation, in which case you might consider spawning a Task instead of sending it to a single GenServer.

A basic implementation of what I think you’re looking for

defmodule MyApp.NodeListener do
  use GenServer
  require Logger

  def start_link(arg), do: GenServer.start_link(__MODULE__, arg, name: __MODULE__)

  def init(_arg) do
    :net_kernel.monitor_nodes(true)
    {:ok, []}
  end

  def handle_info({:nodeup, _} = payload, state) do
    Phoenix.PubSub.broadcast!(MyApp.PubSub, "cluster", payload)
    {:noreply, state}
  end

  def handle_info(_, state), do: {:noreply, state}
end

defmodule MyApp.Repo do
  use GenServer
  require Logger

  def start_link(arg), do: GenServer.start_link(__MODULE__, arg, name: __MODULE__)

  def init(_arg) do
    Phoenix.PubSub.subscribe(MyApp.PubSub, "cluster")
    Phoenix.PubSub.subscribe(MyApp.PubSub, "users")
    {:ok, []}
  end

  def handle_info({:new, user}, state) do
    Logger.info("Repo (#{inspect(node())}) received user: #{inspect(user)}")
    {:noreply, state}
  end

  def handle_info({:nodeup, _} = msg, state) do
    Logger.info("Repo (#{inspect(node())}) received #{inspect(msg)}")
    {:noreply, state}
  end
end

defmodule MyApp.Users do
  def save(user) do
    Phoenix.PubSub.broadcast!(MyApp.PubSub, "users", {:new, user})
  end
end

defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    children = [
      {Phoenix.PubSub, name: MyApp.PubSub},
      MyApp.NodeListener,
      MyApp.Repo
    ]
    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

$ iex --name a@127.0.0.1 -S mix

# in another terminal
$ iex --name b@127.0.0.1 -S mix

iex(a@127.0.0.1)1> Node.connect(:"b@127.0.0.1")
true

19:43:49.050 [info]  Repo (:"a@127.0.0.1") received {:nodeup, :"b@127.0.0.1"}

19:43:49.050 [info]  Repo (:"b@127.0.0.1") received {:nodeup, :"a@127.0.0.1"}

iex(b@127.0.0.1)1> MyApp.Users.save(1)
:ok

19:44:10.905 [info]  Repo (:"b@127.0.0.1") received user: 1

19:44:10.905 [info]  Repo (:"a@127.0.0.1") received user: 1

Thanks, I refactored with your comments and challenged myself. Quite a few takeaways there. First about backpressure. Not sure to understand this term? Is this related to filling up the server’s mailbox? So you mentioned a pool of servers to push it a bit further. I will probably keep this for later as I imagine you may need libraries like Poolboy for this.
Then besides code readability or “testability”, does segregation bring better performance in terms of robustness on load? For example:

  • two PubSub topic is better than one? (I could not extract a PID to check).
  • designing two servers instead of one relieves partly the load on the mailbox I imagine.
  • also about error handling. Should there be a rescue?
    I ask these questions because concurrency between processes is a delicate topic, and reading about the usage guidelines of GenStage, again they speak about “backpressure”. Maybe some book you may recommend?

Finally, as a side comment, I noticed the catch-all clause you introduced: handle_info(_, state), do: {:noreply, state}.

Backpressure describes the act of not accepting more work before having completed previous/existing work. This allows you to prevent parts of your system from being overloaded, because at best you cannot sent them more work than they can handle. This will push work to queue up in known places where you can then deal with it.