Subscribe all clients/presences in one channel to an external topic

We have a use case where we want to route an incoming call to all online users within a channel (“lobby”). Each of these users (“candidate pool”) can either accept or decline the incoming call. The first user accepting the incoming call will be connected and the selection process ends (first come first serve, max. 1).
Before anyone accepted the call, each candidate still in the pool (= not declined yet) should know who else is still in the pool.

Since channels and Presence are made for that, my intuition is to implement the system using it.
My idea is to create an external topic (new channel) for the incoming call, then add all candidates (from the “lobby” channel) to it, and use broadcast Presence diffs whenever the pool’s state changes. When a user accepts the incoming call, the external channel is immediately closed.

However, I’d like to avoid broadcasting to all users in “lobby” to join the external channel (client-side roundtrip), because of performance, reliability and security. I’d like to keep the logic server-side, ideally preserving the multi-node attributes that channels+presence gives me out-of-the box.
If I understand correctly, this forbids approaches using the “lobby” subscribers’ PIDs directly.

So my approach would be to broadcast a message to “lobby” with the external channel’s topic, then intercept the broadcast, let handle_out have each subscriber join the new channel, and cancel the message going to the client. All server-side. Once all candidates have joined the external topic, standard channels+Presence broadcasting can handle the rest.

Is this the best approach for the given use case?
Am I overlooking something?

On a sidenote, the phoenix JS-client will not “know” it’s subscribed to the external channel, so I have to either use its onMessage callback, or manipulate the topic of each outgoing message server-side using intercept and handle_out, correct?

Thanks a lot in advance for any feedback! :slight_smile:

4 Likes

You can have more than one presence setup per channel as well, just have to give each a unique ID, but I use this quite often, just as a note. :slight_smile:

But say you have N users in a lobby/pool, a call is made to this pool so a message is ‘broadcast’ to the topic id of the lobby/pool to pop up a message on the users interface or so. The first to accept it sends a message back to the call handler thing that has it join the call process while also sending a message to the other clients that it was accepted elsewhere with whatever other useful data. You may not even need presence for this unless you want to know who all is waiting rather than just broadcasting all willy-nilly (which is fine really ^.^). :slight_smile:

But yeah, I’d have a user connect to a call when it is accepted, not when it is offered. You could even have them all join a presence so you know who is waiting for a response, and as they deny it they leave the presence, until one accepts then they all drop off when they get the message that it was accepted by someone else. :slight_smile:

1 Like

Thanks for your fast reply and also commenting on the use case :slight_smile:

You can have more than one presence setup per channel as well, just have to give each a unique ID, but I use this
quite often, just as a note. :slight_smile:

Nice to know! I guess you pass in the original channel pid and another topic, similar to here?

You may not even need presence for this unless you want to know who all is waiting […] (which is fine really ^.^). :slight_smile:

You might be right that the info is not needed. However, I think it would be really useful for the users to know, and I’m also eager to wrap my head around this use case of Phoenix, testing whether it could really work like I’ve described in my question, keeping all the multi-node niceties in place :sunglasses:

You could even have them all join a presence so you know who is waiting for a response

This is exactly what I’m trying to achieve. Ideally with broadcasts to only those users in that second presence.
You’re talking about another Presence within the same channel, not a separate channel, right?

So I assume that

  1. I can just copy contents of one Presence to another, without having to deal with PIDs or anything internal, and it just works?
  2. All Presence diffs are sent to the whole channel, no matter if each subscriber has already declined, or even entered after the initial broadcast. Or can I broadcast to only those subscribers who are in the Presence?
1 Like

Precisely! :slight_smile:

Also indeed. :slight_smile:

Presence is for presence information, not for broadcasting. If you want user channel to listen for specific things then have them join another specially named topic then broadcast to that. I do not mean have them join a new topic from the javascript side or even to spawn a new process, but literally just call MyServer.Endpoint.subscribe("new:topic:thing:o'whatever"), and when you want them to stop listening on that topic then just call MyServer.Endpoint.unsubscribe("new:topic:thing:o'whatever"). Anything broadcast to those new topics and not the ‘channel topic’ will not appear in handle_in/handle_out, but rather will appear in handle_info with a Phoenix.Socket.Broadcast structure, use it like this:

  def handle_info(%Phoenix.Socket.Broadcast{event: "notifications:"<>_ = event, payload: %{notifications: notifications}, topic: _topic}, socket) do
    offset = 0 #socket.assigns.offset
    pageSize = length(notifications) #10
    msg = %{
      offset: offset,
      pageSize: pageSize,
      notifications: notifications,
    }
    push socket, event, msg
    {:noreply, socket}
  end

For example I use this in my notifications channel topic, then the notification internally registers to certain other things depending on which notification groups they are assigned to. :slight_smile:

But yeah, presence is literally just for tracking presence in the group, not to communicating to it, for that you should subscribe to the new topic as well and broadcast to that. :slight_smile:

4 Likes

Thanks very much for the elaboration :thumbsup:

If I understand correctly, for each incoming call, we now have one additional Presence and one additional topic (not channel) that’s “attached” to the main lobby channel but with a different topic each (probably best to use the same topic for both). MyServer.Endpoint.subscribe subscribes the already existing lobby channel process to the new topic, in addition to the main topic it’s subscribed to by default.

For some reason I’ve assumed that “subscribing to external topics” in the docs subscribes an individual client to another topic. Having re-read your answers, a background article and some source code I’ve realized that it’s actually the channel process itself that’s subscribed to topics and that just forwards all broadcasts to PubSub. So it’s enough to subscribe the channel in order to “subscribe all clients in one channel to an external topic” (as I put it in the subject).

Anything broadcast to those new topics and not the ‘channel topic’ will not appear in handle_in/handle_out, but rather will appear in handle_info with a Phoenix.Socket.Broadcast structure, use it like this:

Judging from the source code, handle_info in your example is used just like handle_out. So I can easily lookup if the assigned user is in the Presence associated to the topic (as in Intercepting outgoing events) in order to broadcast to only those users/clients. That’s exactly what I wanted to achieve :slight_smile:

After all, what was decisive to understand is that

  • in a channel callback (handle_in, handle_out etc.) I have a reference to both the individual client (socket) and the channel as a whole (self())
  • the channel itself is subscribed to topics, not the clients (sockets).

Thanks so much for the explanations and bearing with me. It really helped - and hopefully will help others when they read this :thumbsup: :smile:

1 Like

Exactly on both. :slight_smile:

Yep, handle_info just handles messages to this process that it does not know what they are, so that ‘you’ can handle them straight. :slight_smile:

You might want to have a catch-all handle_info at the bottom of the handle_info’s too so a process does not crash if it happens to get a message it does not know (I log them).

No problem, I delved into the code for them when I started using them a while back, so it is good to spread the knowledge. :slight_smile:

2 Likes

I’ve started implementing the approach we discussed, but have problems with the subscription.

The presence part works nicely: I create a presence on an external topic, I can access it in all the channel callbacks, and it even broadcasts presence_diff events to that topic whenever the presence changes.

However, the only client who’s ever receiving anything broadcast to the external topic is the one who triggered the subscription (within a handle_in sent by her).

Here’s a test that shows what I’ve tried, reduced to the broadcasting aspects:

# test/channels/room_channel_test.exs

test "outgoing call:candidate is sent to all but initiating user", %{} do
  {:ok, _, socket} = socket("", %{name: "marla"})
                     |> subscribe_and_join(RoomChannel, "room:lobby")
  {:ok, _, _} = socket("", %{name: "tom"})
                |> subscribe_and_join(RoomChannel, "room:lobby")
  {:ok, _, _} = socket("", %{name: "jeff"})
                |> subscribe_and_join(RoomChannel, "room:lobby")

  # init call
  call = %Call{} |> Repo.insert!
  candidates_channel = "room:candidates:#{call.id}"
  candidates_socket = %Phoenix.Socket{socket | topic: candidates_channel}

  # subscribe channel to external topic
  :ok = Vira.Endpoint.subscribe(socket.channel_pid, candidates_channel)

  # copy over candidate presences (all except initiating user "marla")
  # ex. candidates: %{"tom" => %{metas: [%{phx_ref: "KsiSVirgp6A="}]}, ...}
  candidates = socket |> Presence.list |> Map.delete("marla")
  for {name, %{metas: presences}} <- candidates do
    for presence <- presences do
      Presence.track(candidates_socket, name, presence)
    end
  end

  # broadcast to "room:lobby"
  broadcast_from socket, "call:candidate", %{call: call}

  # broadcast to external topic
  broadcast_from candidates_socket, "call:candidate", %{call: call}
end


# web/channels/room_channel.ex

intercept ["call:candidate"]

def handle_info(%Phoenix.Socket.Broadcast{topic: "room:candidates:" <> call_id_str, event: "presence_diff"} = broadcast, socket) do
  IO.inspect ["handle_info", "presence_diff", broadcast.topic, socket.assigns.name]
  {:noreply, socket}
end

def handle_info(%Phoenix.Socket.Broadcast{topic: "room:candidates:" <> call_id_str, event: "call:candidate"} = broadcast, socket) do
  IO.inspect ["handle_info", "call:candidate", broadcast.topic, socket.assigns.name]
  {:noreply, socket}
end

def handle_out("call:candidate", _payload, socket) do
  IO.inspect ["handle_out", "call:candidate", socket.topic, socket.assigns.name]
  {:noreply, socket}
end

which yields the output:

["handle_info", "presence_diff", "room:candidates:60", "marla"]
["handle_info", "presence_diff", "room:candidates:60", "marla"]
["handle_out", "call:candidate", "room:lobby", "tom"]
["handle_out", "call:candidate", "room:lobby", "jeff"]
["handle_out", "call:candidate", "room:lobby", "marla"]
["handle_info", "call:candidate", "room:candidates:60", "marla"]

This shows that intercepted broadcasts to room:lobby do reach all clients, but broadcasts to the external topic reach only the first client.

Am I overlooking something?
Do I need to broadcast the message to the main topic within handle_info or something like that?

1 Like

To broadcast to the external topic (the one the others joined but did not initialize within) I think you need to use Vira.Endpoint.broadcast. :slight_smile:

Search ‘broadcast’ at (because it has no anchor to link to ;-)): https://hexdocs.pm/phoenix/Phoenix.Endpoint.html

1 Like

That’s exactly the same as what broadcast_from! does in tests. To make sure I’ve tried Vira.Endpoint.broadcast! but with the same result. Broadcasts to the external topic are only sent to one client :expressionless:

In your notifications example, do you subscribe each client individually or subscribe the whole channel once and every client in the channel gets every message sent to the external topic? I’m just wondering, because you have no filter mechanism in there, and the same approach doesn’t work for me :confused:

1 Like

Each client has to subscribe itself. :slight_smile:

1 Like

Hi all, I’m trying to implement something similar to this, so current clients (sockets) when they send a message that creates a record on the storage are notified later about any update to that record.

So I do the following in the channel module → socket.endpoint.subscribe("records:#{record.id}"). The problem is that I’m not able to receive any notification on to that topic.

  def handle_info(%Broadcast{topic: "records:" <> _id, event: ev, payload: payload}, socket) do
    IO.puts "handle_info: #{inspect ev} #{inspect payload}"
    push socket, ev, payload
    {:noreply, socket}
  end

And I’m not sure what I’m missing. I’m just calling MyApp.Endpoint.broadcast("records:#{record_id}", "update", %{}) from another process after updates happened.

When you talk about

Each client has to subscribe itself.

Is the part that I don’t understand I guess, in my example the subscriptions is done through the current channel process (self()) which is what is subscribed. Phoenix has deprecated passing a pid to subscribe and only allow the parent to be subscribed.

is it possible that I need to crear a new socket/channel for that topic pattern?

Correct, you’d use subscribe on your Endpoint to listen to messages from more topics to your socket.

1 Like