How do get pids of the processes subscribed to a phoenix pubsub topic

tl;dr

Is there a way to get a list of pids subscribed to a phoenix pubsub topic?


My use case:

– setup

  • many many-to-many chat room channels where messages are posted
  • once a message is posted, some additional individual action (for example, a push notification) needs to be performed for each member of that chat room

– my current “solution”

  • use broadcast_from! with intercept and handle_out, but apparently it encodes each message as many times as there are members in the chat room which is suboptimal

– my “ideal” solution

  • use broadcast_from! without intercept and handle_out to encode the broadcasted message only once, and trigger the additional actions in handle_info of the socket process or a channel process, whatever phoenix uses, but for that I’d need their pids

I’ve skimmed through pubsub source code, but al the functions which list pids seem to be private …

https://github.com/phoenixframework/phoenix_pubsub/blob/master/lib/phoenix/pubsub/pg2_server.ex#L56-L61

There is a way to get the local pids though

https://github.com/phoenixframework/phoenix_pubsub/blob/master/lib/phoenix/pubsub/local.ex#L121-L139

tl;dr

Also, is there a way to create a “backbone” process which would keep some state (like cache) for a particular pubsub topic, such as some metadata for a chat room (like the list of user ids who are members, including those who are offline)? Or is it impossible with the pubsub pattern or at least its current implementation in phoenix?


My use case:

I’d also need to know the members of the chat rooms who are not connected to the websocket endpoint and are thus not members of the channels, but are still members of the corresponding chat rooms.

So I’d need to push notifications to them. To avoid querying the database every time the message is posted (to get the list of user ids in a chat room), I’m thinking about starting a genserver for each chat room which would serve as a cache for chat room metadata like ids of the users who are members.

Then, to avoid “dangling” genservers, is there a way to “bind” or “link” the life-cycle of the chat room channels to that genserver such that the genserver is started once the first chat room channel for a particular chat room is started, and the gernserver is terminated once the last chat room channel for that chat room is terminated?


The best I could come up with so far:

I can probably use a single ets table as a cache and check if there are any active (online) chat room channel members (“chats:” topic subscribers) in channel’s terminate callback to know when to clean up the table.

Been struggling with the same problem all day.
Finally figured it out.
Phoenix.PubSub.Local.subscribers(your_pubsub_name_defined_in_config.exs, topic, 0)

Important: The last argument is 0.

The last argument, according to documentation, is called shard.
Can anyone explain what is a shard???

:wave:

Finally figured it out.

Doesn’t seem to work in my case, unfortunately.

Can anyone explain what is a shard ???

ETS tables holding process ↔ topic pairs (subscriptions) are “partitioned” into several (# pool size) shards (they seem to also have an associated “garbage collector” ets table each) to distribute load. When a new process is registerd for a topic (if you call “subscribe”), this subscription is saved in a shard according to its :erlang.phash2(pid, pool_size) which is not necesseraly 0, so if you always use 0 for the shard id, you might miss some existing subscriptions.

1 Like

I don’t really get what you are trying to achieve.
Are you trying to send messages to channels/sockets from another module?

  • To send message to websockets (client side):
    YourAppWeb.Endpoint.broadcast!(topic, event, msg)
    it triggers channel.on(event, callback) on the client side.
  • To send message to channel processes ( server side):
    Phoenix.PubSub.broadcast!(topic, event, msg)
    it triggers handle_info/2 of the channel module.

It feels like Agent is a better fit if all you need is simple storing and retrieving.

Yes, use start_link() instead of start() to bind parent and child processes together.

Does this help?

2 Likes

Thanks for trying to help me, but I think what I was trying to do was impossible due to some of the limitations of the current implementation of phoenix pubsub. Or maybe I was just expecting too much from it …

I don’t really get what you are trying to achieve.

I wanted to get pids of all processes across all nodes subscribed to a topic. And some other things … I think.

In short, I had a mobile app with chat rooms and I needed to know which of the users are online are and which are not. To those who were not online, I wanted to send a push notification whenever a new message was posted in the chat room. To do that, I needed to keep a state of users who are members of a chat room and also the pids of active connections (users online).

It eventually was pretty trivial to implement with a genserver per chat room and linked cowboy websocket processes.

However, even with the pubsub abstraction, if I only had a single node, I could use a dynamic supervisor (thus a single node) which would spawn genservers for rooms on demand and terminate them when no more channel processes are linked. In each channel join I’d just link that channel process to the corresponding genserver after (possibly) starting it. So the room genservers would’ve worked similarly to grains in erleans, but with grains I could go multi-node, but that would’ve been yet another overkill, in my opinion.

Are you trying to send messages to channels/sockets from another module?

No.

It feels like Agent is a better fit if all you need is simple storing and retrieving.

No, agent would quickly become a bottleneck (or not, if you suggest an agent per chat room). The workaround with ets tables I’ve described in my second post could’ve worked though. But it didn’t seem clean to me since it was ultimately unnecessary (duplication of work).

Yes, use start_link() instead of start() to bind parent and child processes together.

What is a parent and what is a child in this case? I don’t think I mentioned anywhere that I was using start() (since I wasn’t).

what alternative did you choose after upgrading to the latest version of pubsub?

I think Phoenix.Tracker should do the job.