Update Phoenix PubSub

I am trying to update Phoenix to its latest version, According to the guide… I have changed

-  pubsub: [name: EvercamMedia.PubSub,
-           adapter: Phoenix.PubSub.PG2]
+  pubsub_server: EvercamMedia.PubSub

I am finding trouble with this method.

Phoenix.PubSub.Local.subscribers(EvercamMedia.PubSub, "cameras:#{camera_exid}", 0)

I did a few discussions over slack and I am suggested to register the pids with the registry, But I am not sure which pids? and where/when to register them.

After getting the subscribers, this is what code is doing.

  def subscribers(camera_exid) do
    Phoenix.PubSub.Local.subscribers(EvercamMedia.PubSub, "cameras:#{camera_exid}", 0)
  end

  def parse_clients(camera_exid) do
    pids = subscribers(camera_exid)
    Enum.map(pids, fn(pid) ->
      socket = Phoenix.Channel.Server.socket(pid)
      case socket do
        %Phoenix.Socket{assigns: %{current_user: user, ip: ip, source: source}} ->
          desc =
            "#{user.username}"
            |> check_empty_nil(ip)
            |> check_empty_nil(source)
          "{#{desc}}"
        %Phoenix.Socket{assigns: %{ip: ip, source: source}} -> "{#{ip}, #{source}}"
        _ -> ""
      end
    end)
    |> Enum.filter(fn(v) -> v != "" end)
    |> Enum.join(", ")
  end

I am only looking for the place where I am going to find subscribers? or where even they are gettign subscribed after update?

this is the place where I am starting the streamer.

defmodule EvercamMediaWeb.CameraChannel do
  use Phoenix.Channel
  alias EvercamMedia.Snapshot.StreamerSupervisor
  alias EvercamMedia.Util

  def join("cameras:" <> camera_exid, _auth_msg, socket) do
    camera = Camera.get_full(camera_exid)
    user = Util.deep_get(socket, [:assigns, :current_user], nil)

    case Permission.Camera.can_snapshot?(user, camera) do
      true ->
        send(self(), {:after_join, camera_exid})
        {:ok, socket}
      _ -> {:error, "Unauthorized."}
    end
  end

  def terminate(_msg, socket) do
    {:noreply, socket}
  end

  def handle_info({:after_join, camera_exid}, socket) do
    StreamerSupervisor.start_streamer(camera_exid)
    {:noreply, socket}
  end
end

Yo.

I did a few discussions over slack and I am suggested to register the pids with the registry, But I am not sure which pids? and where/when to register them.

Edit: See update below

Phoenix.PubSub is built atop of Elixir’s Registry module and starts one as part of its supervision tree.

You can read more about Registry’s, but you can think of them as basically a Map of names → pids. They are linked; so when the process terminates, the Registry removes the corresponding key.

In the case of PubSub, the subscribe/1 function registers the calling pid into its own Registry for you, you can see that here.

So this means that in your code, you can just use the Registry module to look up which processes are registered against the correct topic.

For an example of that, you can check out the PubSub tests, but for ease:

Registry.lookup(NameOfPubSubModule, "name_of_topic")

See Registry.lookup/2 for more.

Hope that helps.

Edit: See Chris’s reply below, this usage of Registry is apparently an implementation detail and you should be registering your own processes against your own Registry instance. TIL, thanks Chris.

1 Like
Registry.lookup(EvercamMedia.PubSub, "cameras:trave-tvjeu")
[
  {#PID<0.3266.0>,
   {:fastlane, #PID<0.3246.0>, Phoenix.Socket.V1.JSONSerializer, []}},
  {#PID<0.3277.0>,
   {:fastlane, #PID<0.3274.0>, Phoenix.Socket.V1.JSONSerializer, []}}
]

lookup gave me this, only fastlane are those ? which joined the topic?

Or more precisely, how i can get only those pids which are off joined topic and have socket information?

Its the first pid, got it thanks

PubSub’s use of Registry is an implementation detail, so instead of trading one private Phoenix.PubSub.Local access with another private Registry access, you should use your own registry to track your processes by Registering in your channel join/3.

4 Likes

Thanks, I am trying to read Registry, but its docs are going beyond my head to understand, and implement a very simple thing.

Register the process.

To Chris’s point, I started some projects by trying to use pubsub to get channel info, and it was painful over time. It’s one of those things that does work if you’re okay with the trade-offs, but you start a cat and mouse game for breaking changes over time. These changes are totally fair game because it’s not meant to be used that way, so you end up needing to reimplement your solution.

For an example of how to track a channel on join, you can checkout this instrumentation.tracker module in a project of mine: https://github.com/pushex-project/pushex/blob/master/lib/push_ex_web/channels/push_channel.ex#L42 Now there’s no breaking changes to worry about!

Edit: this code doesn’t use a Registry, just a normal process. You can probably write less code if you use a Registry.

1 Like

One thing:

Registry.lookup(NameOfPubSubModule, "name_of_topic")

dont track if anyone left the topic?

I’m not quite following the question, but I think you’re asking how it handles people that have disconnected.

Registry automatically links to each process in the registry, so that if the process dies, it is removed from the registry.

Each entry in the registry is associated to the process that has registered the key. If the process crashes, the keys associated to that process are automatically removed. All key comparisons in the registry are done using the match operation ( ===/2 ).

This is how removals are handled automatically for you.