Replace pg_notify with PubSub.broadcast

I’m trying to rework a project that set up insert/update triggers that call pg_notify and then has Postgrex.Notifications.listen listeners around the code base to receive those events.

This has caused a proliferation of db connections that is causing performance issues. There are 40 triggers across 26 tables. For listeners there are a few dozen places in the code, but since these are in GenServers and we have a multi node deployment we end up using 100’s of db connections at a time.

I’d like to stop using postgres as an event bus and replace all the database notify/listens with PubSub events. Updating the consumers to do PubSub.subscribe instead of Postgrex.Notifications.listen is pretty straight forward but I’m trying to find a clean way to replace the event producing triggers with Elixir code.

I suspect the “right” way to do it is adding a Broadcast in the app at every point that I insert / update any of these 26 schemas, but I’m not sure of a way to prove that I’ve found every place in the code base that each model has been inserted/updated. Is there any way in the Ecto schema definition to write something that will serve as sort of a application side trigger? That way whenever MyRepo.insert or MyRepo.update is passed one of the watched schemas I could add in a Broadcast?

1 Like

What about a hybrid approach?

  • Keep the sourcing as-is
  • Use a centralized collection of processes that do Postgrex.Notifications.listen and then push notifications to PubSub
  • Listeners subscribe to PubSub

This could also make a smooth transition easier; once the pool of listeners is in place, individual listeners can be switched over to PubSub one at a time.

6 Likes

I think @al2o3cr is dead on with this recommendation. The other critical benefit is that it preserves the same transactional semantics of your current solution, which should help you minimize bugs or corner cases introduced by the migration.

Specifically what I mean is that if pg_notify() occurs inside of a transaction that is later rolled back, no actual notification occurs. If you however change your pg_notify to a broadcast! call then not only does this happen whether or not the transaction rollsback, it also happens immediately. This means if you broadcast!(topic, %{record_id: record_inserted_in_this_transaction.id}) and the broadcast occurs before the transaction has committed, and other processes may try to fetch that record and it won’t be there yet.

2 Likes

The hybrid approach is brilliant. Thank you so much for that insight. I was too focused on ripping out code.

Would this mean one single listener DB connection per replica instead of per listener across all the GenServers? I’m pretty sure the answer is “Yes” but I want to risk asking an obvious question to make sure that the pg_notify()s aren’t proliferating DB connections.

TBH I’m not sure, which I why I handwaved at “a collection of processes” without specifying how many or how they are divided :slight_smile:

1 Like

It looks like Postgres notification channels won’t work in quite the way I think I need to implement @al2o3cr 's idea.

I was hoping to create one single listener for the repo and then listen to all notifications with something like a wildcard match on the channel name Postgrex.Notifications.listen(listener, "*") but wild card matches in Postgres notification channel appears to only do exact matches on an identifier. It’s not like topics in RabbitMQ.

I think this idea might still work but not without updating a couple dozen stored procedures.

Reading through the source of Postgrex.Notifications, I’m not sure notifications are the source of this connection usage - it’s already using a single connection no matter how many processes say listen and no matter how many topics are listened to.

Yes in re-reading the original post I think there needs to be some extra investigation into what the connections are actually doing. My guess is that there is a proliferation of processes that are straight up creating new Postgex.Noitfications processes, instead of using listen on an existing one.

After doing some more digging into our architecture my understanding is it’s a single db connection for all listeners, per process. So our app has a status page for each of the commercial sites our users are at. When a user has the status page is open we start up a GenServer to as a cache for every time that page is open. No reason to run these expensive calculations again to serve the same status info to multiple connections. The cache starts a listener and then sets up multiple listens for all the pg_notifys whose updates could could affect the caches. In short: each site sets up a Postgrex listener and updates the cache and does regular Phoenix PubSub out to the LiveViews displaying the data. We have this pattern in a few more places and the result seems to be that everything works fine until we hit some threshold of user activity where our connection pool gets emptied out (probably exacerbated by some poorly performing queries). Then our web users start refreshing the page and we end up doing a DoS attack on ourselves.

So it looks like both @al2o3cr and @benwilson512 are correct that it’s not the number of notifications we are listening to that’s causing our problems, but maybe we are setting up too many listeners.

Here’s our setup:

  # MyApp.Repo
  def start_notification_listener() do
    Postgrex.Notifications.start_link(__MODULE__.config())
  end

  def listen(listener, channel_name) do
    Postgrex.Notifications.listen(listener, channel_name)
  end

# In our caches
def setup_listeners(site_id) do
{:ok, listener} = MyApp.Repo.start_notification_listener()
{:ok, _ref} = MyApp.Repo.listen(listener, "site_users:#{site_id}")
{:ok, _ref} = MyApp.Repo.listen(listener, "site_equipment:#{site_id}")
end

def handle_info({:notification, _pid, _ref, "site_users:" <> _, payload}, state) do
   ...
end

I guess I need to replace the listeners in each cache with one single listener for that everyone in MyApp.Repo can reuse?

Yes, instead of having each cache call start_notification_listener you should start a single one in your supervision tree somewhere with a name, and have everyone use that.

If you are concerned about that being overloaded you could use a PartitionSupervisor — Elixir v1.14.1 to spawn a fixed number of listener processes and then have each client pid use one of those.

1 Like