Tips for troubleshooting PubSub messages not received?

I’m trying to follow LiveBeat’s example of using PubSub to send messages to LiveViews. I’ve also read through a ton of topics here, so I’m not sure what I’m doing wrong…

I’m broadcasting on a PubSub topic that a LiveView is subscribed to. I think the topic and everything is wired up correctly, but the LiveView never sees the message (or shows a failure for not handling it properly.)

Here’s my PubSub setup:

defmodule Ido.Trips.PubSub do
  @moduledoc """
  PubSub for trips
  """

  # defined in application.ex
  @pubsub Ido.PubSub

  def broadcast_email_processed(element) do
    msg = %Ido.Trips.Events.EmailProcessed{element: element}
    broadcast!(element.trip_id, msg)
  end

  defp broadcast!(trip_id, msg) do
    Phoenix.PubSub.broadcast!(@pubsub, topic(trip_id), {__MODULE__, msg})
  end

  def subscribe_to_trip_id(trip_id) do
    Phoenix.PubSub.subscribe(@pubsub, topic(trip_id))
  end

  def unsubscribe_to_trip_id(trip_id) do
    Phoenix.PubSub.unsubscribe(@pubsub, topic(trip_id))
  end

  defp topic(trip_id) do
    "trip:#{trip_id}"
  end
end

I’m subscribing to it in a handle_params in my LiveView, not mount because I need the id of the Trip:

@impl true
  def handle_params(%{"id" => id} = params, _, socket) do
    if connected?(socket) do
      # make sure no duplicates
      Ido.Trips.PubSub.unsubscribe_to_trip_id(id)

      Ido.Trips.PubSub.subscribe_to_trip_id(id)
    end
...

@impl true
  def handle_info(
        {Ido.Trips.PubSub, %Ido.Trips.Events.EmailProcessed{element: element}},
        socket
      ) do
        {:noreply, put_flash(socket, :info, "An element was processed from email")}
  end

What did I mess up? I don’t have any warnings/errors in the console during startup or when I broadcast the message.

Thanks!

First thing I would check is if a generic handle_info(msg, socket) catch something in the msg.

1 Like

Subscribing in handle_params could prove to be problematic. It’s hard to see without more context—is your LiveView handling both “index” and “show” pages? If not, then you can still get the id from def mount(%{"id" => id}, _session, socket). If they are sharing a LiveView, then as you have it you aren’t unsubscribing from the previous trip before subscribing to the new one which means they would pile up. handle_params get called any time there is a patch event. This applies to things like query params or subroutes that may potentially point to the same LiveView. This means that on any patch to the same trip it will unsubscribe and resubscribe. I’m not actually sure what effect that would have, but I wouldn’t say it’s the best place to put it. Possibly there is a sane way to do so but it’s not something I’ve ever seen before and I feel like it could lead to confusing bugs.

1 Like

Yeah, I have a catch-all at the end that will alert if it ever gets hit. But it’s not being triggered either.

I wondered about that since every example seems to use mount. I tried listening in mount by hard-coding the id since I didn’t see a way to get at the params there, but no change.

Also agreed, unsubscribing and immediately subscribing feels really ugly. I wasn’t sure how to both access the id and prevent having multiple subscriptions.

This is a show LiveView so I don’t have to worry about an index and the id changing.

Ah, so is your problem that you are patching when switching between trips and that’s why you need handle_params? Otherwise, params is the first parameter given to mount so you should definitely be able to get the id from there.

You could also subscribe to a general trips channel and see if it’s the current one or not:

handle_info({:trip_updated, trip}, socket)
    when trip.id == socket.assigns.trip.id do
  # ...
  {:noreply, socket}
end
1 Like

As a quick sanity check to check if your Ido.PubSub is working, could you try the following in an interactive Elixir session with a running Phoenix server i.e. iex -S mix phx.server?

Phoenix.PubSub.subscribe(Ido.PubSub, "trip:foo")
:ok
Process.info(self(), :messages)
{:messages, []}
Phoenix.PubSub.broadcast(Ido.PubSub, "trip:foo", "bar")
:ok
Process.info(self(), :messages)
{:messages, ["bar"]}
1 Like

Ok, I just tried that and have the exact same output as you.

:person_facepalming: Ha, yeah- great point. My eyes completely glazed right over that fact. I’ve moved my subscribe into mount/3.

Hmm, how’s broadcast_email_processed being called?

Any chance the trip_id might be mismatched somehow and the PubSub message gets broadcasted to another topic that the LiveView is not subscribed to? It might be worth throwing an IO.inspect(msg) in there and then verifying that msg.element.trip_id is set as expected.

1 Like

Ooof, I’m an idiot. So I was testing by running mix phx.server in one shell, and iex -S mix, and I somehow thought those would be the same app.

I just ran an iex -S mix phx.server, and then in that same REPL triggered some broadcasts and it worked perfectly.

Thank you all so much!

3 Likes