PubSub - Broadcast on a module, Subscribe on aonther

Hi all!

I’m trying to subscribe to a PubSub on a different module when I run the task which is gonna broadcast.

In the first place, I call an async task, which calls a function on a specific GenServer. That job could take 10 seconds.

I want to allow user to keep navigating the site and subscribe everywhere to that topic, waiting for the PubSub reply message.

If I stay and wait, I receive the message with the handle_info, but if I change the page, even subscribed again, the broadcast is lost on the space…

To my surprise, seems like “broadcast” only send the message to a specific process instead “broadcast” to all the process…

I’m thinking on face this with WebSockets, but I thought that PubSub maybe has something for these “persist” cases… SQS or MQ could be another option but I think that is too much…

Any suggestion?

You can broadcast on a channel, and receive on another one…

You can do it per user, or per session.

As long as users have their own custom channels, it’s possible to use it for reply. If You subscribe to this channel in each liveview, that should be fine.

1 Like

Is what I’m trying to do… My channel / topic is called “blockchain”.

On my liveview mount()

 if connected?(socket) do
      Phoenix.PubSub.subscribe(App.PubSub, @topic)
 end

On my GenServer:

Phoenix.PubSub.broadcast(App.PubSub, @topic, %{
          status: :error,
          message: "Error detail"
})

On my liveview:

@impl true
  def handle_info(message, socket) do
    IO.puts("Message received!!!")
    case message do
      %{message: error, status: :error} ->
        {:noreply,
         push_event(socket, "transaction_finished", %{status: message[:status], message: error})}

      %{tx: tx, status: :sent} ->
        {:noreply, push_event(socket, "transaction_finished", %{status: message[:status], tx: tx})}

      _ ->
        {:noreply,
         push_event(socket, "transaction_finished", %{status: :error, message: "Error undefined"})}
    end
  end

If I keep waiting, I receive the message. If I change the view, I get nothing…

You also need another one…

Phoenix.PubSub.subscribe(App.PubSub, "session:#{session_id}")

You can generate a random session_id and use it as a personal channel, where You receive anything personal.

You might use user instead, if your users needs to be authentified.

Ok… so now I have this:

 def mount(_params, session, socket) do
    user = Accounts.get_user_by_session_token(session["user_token"])
    
    if connected?(socket) do
      IO.puts("Subscribing to user:#{user.id}")
      Phoenix.PubSub.subscribe(Coopchain.PubSub, @topic)
      Phoenix.PubSub.subscribe(Coopchain.PubSub, "user:#{user.id}")
    end
    ....
end

And on the GenServer side I should broadcast to…?

 Phoenix.PubSub.broadcast(App.PubSub, "user:#{user_id}", %{
          status: :error,
          message: "Error on abi parser"
        })

I’m not sure about following you… Thanks for your patience…

This is the flow:

1)I subscribe on liveview.
2)I send a job to GenServer using Task Supervisor async.
3)I move to another view when I’m also subscribed.
4)I receive the PubSub message there.

broadcast is only on GenServer… The first job is triggered by a Task:

Task.Supervisor.async(App.TaskSupervisor, fn ->
      params = {
        user_id,
        coop_id,
        data.title,
        data.file,
        data.hash
      }
end)
      subscribe_to_contract(params) #This function call GenServer async.

Something like this… the main part is You will connect to this custom channels in all your liveviews.

Is what I do… but message does not arrives… I got Client #PID<0.1240.0> is dead

Using Task Supervisor async will link the caller, and that is not good if the caller is the liveview process… because it will die when the liveview process dies.

Seems like RabbitMQ or SQS is my only alternative here… so bad…

There are more tools than just Task async… for example, having a manager gen_server being the middle-man.

When You call your job, You do it through a GenServer, that does the call to the worker. Then, the liveview can die, but the worker won’t.

You might look at OTP tools first, before RabbitMQ and SQS. There are many, and the Task module is one I use when I don’t need to control the process.

1 Like

Thanks for your reply kokolegorille. I think that what you say is like I had before, but the GenServer call is code-blocking. That the reason I’ve settled a Task Async, to avoid the code execution blocking…

Before:
LiveView
GenServer call
Wait 10 seconds…
return the view {:noreply, etc}

The good: PubSub work changing page.
The bad: I can’t render view until the GenServer ends.

Now:
LiveView
TaskAsync Supervised → GenServer call
return the view {:noreply, etc}

The good: View render immediately
The bad: PubSub does not work when change page as the liveview process dies.

What about GenServer.cast?
And using a GenServer between client and workers?

I am sure what You want is doable, but I am also sure I would not use the Task module.

1 Like

Well, I think I get this working with async_nolink/3

https://hexdocs.pm/elixir/1.13/Task.Supervisor.html#async_nolink/3

Next challenge is to place the code on some plug to avoid repeating code across all the project :slight_smile: