Phoenix pubsub -> AppWeb.Endpoint or Phoenix.PubSub ?

I am learning Phoenix PubSub by writing a tutorial. I kind-of-sort-of know it. I can kludge it together well enough to make an app - but I don’t understand it with confidence.

The code to develop my tutorial begins with a simple app where a user has two live views and these are set on two different routes.

/send and /receive

/send has a form and when submitted /receive renders the result in real time.

It’s that simple, that’s all it does with no extra code. Minimalist and understandable.
The code works, I understand it and it’s posted below.

A while ago I bought the Pragmatic Studio" video tutorial on Phoenix LiveView. The section on PubSub was too complicated for me to follow so I gave up on it. I revisited it tonight and here is my question.

In my code (below) I use AppWeb.Endpoint to access the broadcast and subscribe methods. This works fine.

In the Pragmatic Studio example they use Phoenix.PubSub

I tried changing my code to mimic their style and my app broke.

I am curious what style should be used, why it should be used and how to update my code to use their style. I’ve posted an image explaining what they do via an imgur link here:

Thank you.

My Code

Router

  scope "/", AppWeb do
    pipe_through :browser
    live "/send", SendLive, :home
    live "/receive", ReceiveLive, :home
    get "/", PageController, :home
  end

This is the code for /send



defmodule AppWeb.SendLive do
  use AppWeb, :live_view
  
  def mount(_params, _session, socket) do
    {:ok, socket}
  end
  
  def handle_event("send", %{"text" => text}, socket) do
    IO.inspect text

    AppWeb.Endpoint.broadcast(topic, "message", text) # Broadcast
    {:noreply, socket}
  end
  
  defp topic do #Topic
    "chat"
  end

  def render(assigns)do   
   ~H"""
    <div>
      <h1>Send Message</h1>

      <form phx-submit="send">
        <input type="text" name="text" />
        <button type="submit">Send</button>
      </form>
    </div>

    """
  end
  
end

This is the code for /receive



defmodule AppWeb.ReceiveLive do
  use AppWeb, :live_view
  
  def mount(_params, _session, socket) do

    if connected?(socket) do
      AppWeb.Endpoint.subscribe(topic)     # PubSub Subscribe
    end

    {:ok, assign(socket, messages: "")}
  end
  
  def handle_info(%{event: "message", payload: message}, socket) do  # Handle Info is needed
    IO.inspect message
    {:noreply, assign(socket, messages: message)}
  end
  

  defp topic do  # Topic
    "chat"
  end

  def render(assigns)do   
   ~H"""
     <div>
    <h1>ChatLive</h1>
    <%= @messages %>


    </div>

    """
  end
  
end

Hi @wktdev this is the code that worked yea? Can you show the error you got when you tried to use Phoenix.PubSub?

You can learn more about the differences between the two on this and later posts:

2 Likes

In this case the shape of the handle_info function changes. I could probably fiddle with it and fix it but I don’t know the direct way to troubleshoot it.

My code update is below and the error is here:

[error] GenServer #PID<0.1281.0> terminating
** (FunctionClauseError) no function clause matching in AppWeb.ReceiveLive.handle_info/2
(app 0.1.0) lib/app_web/live/receive_live.ex:17: AppWeb.ReceiveLive.handle_info(“sd”, phoenix.LiveView.Socket<id: “phx-F35BYK8vGyaDNgNE”, endpoint: AppWeb.Endpoint, view: AppWeb.ReceiveLive, parent_pid: nil, root_pid: #PID<0.1281.0>, router: AppWeb.Router, assigns: %{changed: %{}, flash: %{}, live_action: :home, messages: “”}, transport_pid: #PID<0.1274.0>, …>)
(phoenix_live_view 0.18.18) lib/phoenix_live_view/channel.ex:276: Phoenix.LiveView.Channel.handle_info/2
(stdlib 4.2) gen_server.erl:1123: :gen_server.try_dispatch/4
(stdlib 4.2) gen_server.erl:1200: :gen_server.handle_msg/6
(stdlib 4.2) proc_lib.erl:240: :proc_lib.init_p_do_apply/3
Last message: “sd”
State: %{components: {%{}, %{}, 1}, join_ref: “4”, serializer: Phoenix.Socket.V2.JSONSerializer, socket: phoenix.LiveView.Socket<id: “phx-F35BYK8vGyaDNgNE”, endpoint: AppWeb.Endpoint, view: AppWeb.ReceiveLive, parent_pid: nil, root_pid: #PID<0.1281.0>, router: AppWeb.Router, assigns: %{changed: %{}, flash: %{}, live_action: :home, messages: “”}, transport_pid: #PID<0.1274.0>, …>, topic: “lv:phx-F35BYK8vGyaDNgNE”, upload_names: %{}, upload_pids: %{}}

Here is my attempt to change my code using the PragProg tutorial style.

/send



defmodule AppWeb.SendLive do
  use AppWeb, :live_view
  
  def mount(_params, _session, socket) do
    {:ok, socket}
  end
  
  def handle_event("send", %{"text" => text}, socket) do
    IO.inspect text

    # AppWeb.Endpoint.broadcast(topic, "message", text) # Broadcast

    Phoenix.PubSub.broadcast(App.PubSub, "message", text) # Broadcast



    {:noreply, socket}
  end
  
  defp topic do #Topic
    "chat"
  end

  def render(assigns)do   
   ~H"""
    <div>
      <h1>Send Message</h1>

      <form phx-submit="send">
        <input type="text" name="text" />
        <button type="submit">Send</button>
      </form>
    </div>

    """
  end
  
end

/receive



defmodule AppWeb.ReceiveLive do
  use AppWeb, :live_view
  
  def mount(_params, _session, socket) do

    if connected?(socket) do
      # AppWeb.Endpoint.subscribe(topic)     # PubSub Subscribe
      Phoenix.PubSub.subscribe(App.PubSub, "message")     # PubSub Subscribe
    end


    {:ok, assign(socket, messages: "")}
  end
  
  def handle_info(%{event: "message", payload: message},socket) do  # Handle Info is needed
    IO.inspect message
    {:noreply, assign(socket, messages: message)}
  end
  

  defp topic do  # Topic
    "chat"
  end

  def render(assigns)do   
   ~H"""
     <div>
    <h1>ChatLive</h1>
    <%= @messages %>


    </div>

    """
  end
  
end

You can see what it is sending you right there: AppWeb.ReceiveLive.handle_info(“sd”

Basically when you use Phoenix.PubSub it doesn’t do any wrapping of the message, it just sends you it raw. That’s why in the tutorial you linked it has the broadcast call send {:volunteer_created, volunteer} and then the handle_info pattern match is {:volunteer_created, volunteer} exactly the same without any extra wrapping.

1 Like

As far as the error I just see jiberish with that error output. I don’t even know how to read that error.

I tried to look at the “raw output” of Phoenix.PubSub.broadcast so i did this:

I commented out handle_info so my code doesn’t immediately throw an error and then I attempted to use IO.inspect to view data as it is passed through the Phoenix.PubSub.broadcast definition so I could understand the method better.

I stuck IO.inspect in it:

  def broadcast(pubsub, topic, message, dispatcher \\ __MODULE__)

      IO.inspect "Why doen't this trigger when I submit form?"

      when is_atom(pubsub) and is_binary(topic) and is_atom(dispatcher) do
    {:ok, {adapter, name}} = Registry.meta(pubsub, :pubsub)

    with :ok <- adapter.broadcast(name, topic, message, dispatcher) do
      dispatch(pubsub, :none, topic, message, dispatcher)
    end
  end

My IO.inspect didn’t render when I submitted the form.

I did my best to mirror the PragPrag code further. In my example below I replaced {volunteer_created, volunteer} with:

{:text_stuff, text}

Anyway, I am asking what do I need to change to update it ? I tried to cobble an answer together based on your post and I can’t figure it out.

/send



defmodule AppWeb.SendLive do
  use AppWeb, :live_view
  
  def mount(_params, _session, socket) do
    {:ok, socket}
  end
  
  def handle_event("send", %{"text" => text}, socket) do
    IO.inspect text

    # AppWeb.Endpoint.broadcast(topic, "message", text) # Broadcast

    Phoenix.PubSub.broadcast(App.PubSub, "message", {:test_stuff, text}) # Broadcast

    {:noreply, socket}
  end
  
  defp topic do #Topic
    "chat"
  end

  def render(assigns)do   
   ~H"""
    <div>
      <h1>Send Message</h1>

      <form phx-submit="send">
        <input type="text" name="text" />
        <button type="submit">Send</button>
      </form>
    </div>

    """
  end
  
end



/receive



defmodule AppWeb.ReceiveLive do
  use AppWeb, :live_view
  
  def mount(_params, _session, socket) do

    if connected?(socket) do
      # AppWeb.Endpoint.subscribe(topic)     # PubSub Subscribe
      Phoenix.PubSub.subscribe(App.PubSub, "message")     # PubSub Subscribe
    end


    {:ok, assign(socket, messages: "")}
  end
  
  def handle_info({:text_stuff, text}, socket) do  # Handle Info is needed
    IO.inspect message
    {:noreply, assign(socket, messages: message)}
  end
  

  defp topic do  # Topic
    "chat"
  end

  def render(assigns)do   
   ~H"""
     <div>
    <h1>ChatLive</h1>
    <%= @messages %>


    </div>

    """
  end
  
end

What you have looks right to me, did you try it? What output did you get?

Go slowly bit by bit.

This is saying that it called a function and no clauses matched.

Now it’s going to tell you what function it called.

AppWeb.ReceiveLive which is a module, and then handle_info which is a function. After that you have "sd". After that you have a Phoenix.LiveView.Socket struct with a bunch of complicated internals. The forum has mangled it a tiny bit cause there’s like an embedded Phoenix tag in there but the first part is. Overall though this is saying that it called a function:

AppWeb.ReceiveLive.handle_info(“sd”, socket)

Your handle_info function looked like:

def handle_info(%{event: "message", payload: message}, socket) do 

Which can’t match.

1 Like

It’s the same problem.
The shape of the handle_info function doesn’t match Phoenix.PubSub.subscribe.

Phoenix.PubSub.subscribe has a string that creates an association between it and the handle_info function. In my original code this is “message”. When I tried to update my code to mirror the PragProg code it gets removed per the PragProg example.

At least that is the way I interpret the error.

== Compilation error in file lib/app_web/live/receive_live.ex ==
** (CompileError) lib/app_web/live/receive_live.ex:10: undefined function text/0 (expected AppWeb.ReceiveLive to define such a function or for it to be imported, but none are available)

That is a compilation error, which is not the same problem. It means you weren’t even able to run your code, it did not compile.

If I compile the code you gave us that doesn’t happens, so there is something different about what you have in your files vs what you copied and pasted. Please copy and paste exactly what you have.

Here is the exact code I am running in my text editor at the moment:

/send

defmodule AppWeb.SendLive do
  use AppWeb, :live_view
  
  def mount(_params, _session, socket) do
    {:ok, socket}
  end
  
  def handle_event("send", %{"text" => text}, socket) do
    IO.inspect text

    # AppWeb.Endpoint.broadcast(topic, "message", text) # Broadcast

    Phoenix.PubSub.broadcast(App.PubSub, "message", {:test_stuff, text}) # Broadcast

    {:noreply, socket}
  end
  
  defp topic do #Topic
    "chat"
  end

  def render(assigns)do   
   ~H"""
    <div>
      <h1>Send Message</h1>

      <form phx-submit="send">
        <input type="text" name="text" />
        <button type="submit">Send</button>
      </form>
    </div>

    """
  end
  
end

/receive

defmodule AppWeb.ReceiveLive do
  use AppWeb, :live_view
  
  def mount(_params, _session, socket) do

    if connected?(socket) do
      # AppWeb.Endpoint.subscribe(topic)     # PubSub Subscribe
      Phoenix.PubSub.subscribe(App.PubSub, "message")     # PubSub Subscribe
    end


    {:ok, assign(socket, messages: "")}
  end
  
  def handle_info({:text_stuff, text}, socket) do  # Handle Info is needed
    IO.inspect message
    {:noreply, assign(socket, messages: message)}
  end
  

  defp topic do  # Topic
    "chat"
  end

  def render(assigns)do   
   ~H"""
     <div>
    <h1>ChatLive</h1>
    <%= @messages %>


    </div>

    """
  end
  
end

This is the error it generates from the terminal:

= Compilation error in file lib/app_web/live/receive_live.ex ==
** (CompileError) lib/app_web/live/receive_live.ex:10: undefined function text/0 (expected AppWeb.ReceiveLive to define such a function or for it to be imported, but none are available)


Line 10 of receive_live.ex that you posted is an empty line, the error mesage you’re psting doesn’t match.

However,

This function starts talking about a message variable in your IO.inspect that comes from nowhere. I would make sure that your files are saved and that the terminal directory you are in matches where your editor is.

Sorry about that. Here is another version with the innards of handle_info changed. There are still errors and I haven’t learned how to make any sense of this. I don’t understand how to integrate handle_info.

/send

defmodule AppWeb.SendLive do
  use AppWeb, :live_view
  
  def mount(_params, _session, socket) do
    {:ok, socket}
  end
  
  def handle_event("send", %{"text" => text}, socket) do
    IO.inspect text

    # AppWeb.Endpoint.broadcast(topic, "message", text) # Broadcast

    Phoenix.PubSub.broadcast(App.PubSub, "message", {:test_stuff, text}) # Broadcast

    {:noreply, socket}
  end
  
  defp topic do #Topic
    "chat"
  end

  def render(assigns)do   
   ~H"""
    <div>
      <h1>Send Message</h1>

      <form phx-submit="send">
        <input type="text" name="text" />
        <button type="submit">Send</button>
      </form>
    </div>

    """
  end
  
end

/receive

defmodule AppWeb.ReceiveLive do
  use AppWeb, :live_view
  
  def mount(_params, _session, socket) do

    if connected?(socket) do
      # AppWeb.Endpoint.subscribe(topic)     # PubSub Subscribe
      Phoenix.PubSub.subscribe(App.PubSub, "message")     # PubSub Subscribe
    end


    {:ok, assign(socket, message_item: "")}
  end
  
  def handle_info({:text_stuff, text}, socket) do  # Handle Info is needed
    IO.inspect text
    {:noreply, assign(socket, message_item: text)}
  end
  

  defp topic do  # Topic
    "chat"
  end

  def render(assigns)do   
   ~H"""
     <div>
    <h1>ChatLive</h1>
    <%= @message_item %>


    </div>

    """
  end
  
end

When I load the pages it does not throw an error.

When I submit a form I get this:

[error] GenServer #PID<0.1120.0> terminating
** (FunctionClauseError) no function clause matching in AppWeb.ReceiveLive.handle_info/2
    (app 0.1.0) lib/app_web/live/receive_live.ex:15: AppWeb.ReceiveLive.handle_info({:test_stuff, "sa"}, #Phoenix.LiveView.Socket<id: "phx-F38EVxpODaCkogVG", endpoint: AppWeb.Endpoint, view: AppWeb.ReceiveLive, parent_pid: nil, root_pid: #PID<0.1120.0>, router: AppWeb.Router, assigns: %{__changed__: %{}, flash: %{}, live_action: :home, message_item: ""}, transport_pid: #PID<0.1114.0>, ...>)
    (phoenix_live_view 0.18.18) lib/phoenix_live_view/channel.ex:276: Phoenix.LiveView.Channel.handle_info/2
    (stdlib 4.2) gen_server.erl:1123: :gen_server.try_dispatch/4
    (stdlib 4.2) gen_server.erl:1200: :gen_server.handle_msg/6
    (stdlib 4.2) proc_lib.erl:240: :proc_lib.init_p_do_apply/3
Last message: {:test_stuff, "sa"}
State: %{components: {%{}, %{}, 1}, join_ref: "4", serializer: Phoenix.Socket.V2.JSONSerializer, socket: #Phoenix.LiveView.Socket<id: "phx-F38EVxpODaCkogVG", endpoint: AppWeb.Endpoint, view: AppWeb.ReceiveLive, parent_pid: nil, root_pid: #PID<0.1120.0>, router: AppWeb.Router, assigns: %{__changed__: %{}, flash: %{}, live_action: :home, message_item: ""}, transport_pid: #PID<0.1114.0>, ...>, topic: "lv:phx-F38EVxpODaCkogVG", upload_names: %{}, upload_pids: %{}}

There‘s a typo for :text_stuff, but the message uses :test_stuff.

1 Like

Thanks!

It seems to work now:

/send

defmodule AppWeb.SendLive do
  use AppWeb, :live_view
  
  def mount(_params, _session, socket) do
    {:ok, socket}
  end
  
  def handle_event("send", %{"text" => text}, socket) do
    IO.inspect text

    Phoenix.PubSub.broadcast(App.PubSub, "message", {:text_stuff, text}) 

    {:noreply, socket}
  end
  
  defp topic do #Topic
    "message"
  end

  def render(assigns)do   
   ~H"""
    <div>
      <h1>Send Message</h1>
      <form phx-submit="send">
        <input type="text" name="text" />
        <button type="submit">Send</button>
      </form>
    </div>

    """
  end
end


/receive

defmodule AppWeb.ReceiveLive do
  use AppWeb, :live_view
  
  def mount(_params, _session, socket) do

    if connected?(socket) do
      Phoenix.PubSub.subscribe(App.PubSub, topic)    
    end

    {:ok, assign(socket, message_item: "")}
  end
  
  def handle_info({:text_stuff, text}, socket) do 
    {:noreply, assign(socket, message_item: text)}
  end

  defp topic do  # Topic
    "message"
  end

  def render(assigns)do   
   ~H"""
     <div>
    <h1>ChatLive</h1>
    <%= @message_item %>
    </div>

    """
  end
  
end

I have a question about the relationship to handle_info. I just want to know if my mental model is going in the right direction.

Mental Models are helpful not because they are 100 percent accurate but because they are tangibly useful.
The mental model I use is as follows:

If the first two arguments of broadcast and subscribe match each other…

  • Phoenix.PubSub. broadcast (App.PubSub, topic, {:text_stuff, text})
  • Phoenix.PubSub. subscribe (App.PubSub, topic)

Then …

The module that mounts the subscribe “enables” its handle_info function to listen for and receive payload data from broadcast.

In your opinion is this a good way to look at it?

“Enable” might not be the right word, “points” might be better. The point is that the subscribe creates a link between broadcast and handle_info. I am not sure what that relationship is technically called.

“makes the callback respond to broadcast messages” doesn’t seem right either.

Hey @wktdev it’s better to think of PubSub in terms of processes rather than functions. When you have code that calls Phoenix.PubSub.subscribe(App.PubSub, topic) you’re establishing a link between the process that called subscribe the topic.

When another process calls broadcast there is basically code inside Phoenix.PubSub that looks for all processes linked to that topic and calls send(subscribed_process_pid, message).

So where handle_info comes in is because in your case the process calling subscribe is a GenServer and so when GenServer processes get a message, the GenServer code calls YourModule.handle_info(message, state). At that point it’s just regular pattern matching. The pattern you have in handle_info is not part of the link between the subscribed process and the topic. It’s just part of the code that runs when a message comes into that pid.

1 Like