Subscribe to Phoenix.PubSub from another application

I’m new to distributed elixir.

I have two clean installations of Phoenix 1.4.2 - apps named A and B. I’m running these with --sname option. I can run command from another application using :rpc.call. Can I subscribe to Phoenix.PubSub from another app?

I have tried:

iex(a@MacBook-Pro)1> Node.connect :"b@MacBook-Pro"
iex(a@MacBook-Pro)2> Phoenix.PubSub.subscribe(B.PubSub, "topic")
# throws argument error - :ets.lookup(B.PubSub, :subscribe)

Should it be somehow wrapped by a GenServer on the B side and used with Node.spawn_link? Is there some easier solution? First I wanted to broadcast messages between two apps installed on same server using websockets but then I thought there should be an easier method.

3 Likes

I’m not super familiar with the implementation details, but I’ll try to describe it conceptually and hopefully that’ll help.

Phoenix.PubSub works, by creating a cluster of different nodes. You need to start a Phoenix.PubSub process on every node in the cluster. You can specify an adapter, which allows Phoenix.PubSub to discover what other nodes are available to connect to. In the case of the redis adapter, each node will connect to redis and messages will be sent through that. In the case of PG2 (the default), Phoenix.PubSub will connect with other Phoenix.PubSub processes on any of the nodes already clustered by the nodes being connected.

So, you’ve correctly connected the nodes. But it looks like you’ve not started a Phoenix.PubSub process. So, when you call Phoenix.PubSub.subscribe/2, the process that would’ve setup stuff locally on the current node hasn’t done so and you’re getting an error about a lookup on an ets table not existing.

I think in the simple iex session you’re just missing something like:

Phoenix.PubSub.PG2.start_link(name: B.PubSub)

2 Likes

My apps are clean installations of Phoenix, so they have PubSub enabled by default in config.exs:

config :b, BWeb.Endpoint,
  # ...
  pubsub: [name: B.PubSub, adapter: Phoenix.PubSub.PG2]

I also tried to manually set up another PubSub in application.ex:

defmodule B.Application do
  def start(_type, _args) do
    children = [
      BWeb.Endpoint,
      {Phoenix.PubSub.PG2, name: :another_pubsub},
    ]
    # ...
  end

I can access this PubSub from the B app:

iex(b@MacBook-Pro)1> Phoenix.PubSub.subscribe :another_pubsub, "asd"
:ok

but I still cannot connect from the A app.

I did some brief digging. The server_name needs to be the same for two PubSub’s to connect via PG2.

So, the name argument needs to be the same as what it is on node A. You also still need to connect the nodes if you’re not doing that.

1 Like

I figured it out without the Phoenix.PubSub. I think it’s all I need.

Create pg2 group in A

defmodule A.Application do
  def start(_type, _args) do
    :pg2.create :some_group
    # ...
  end

Connect to group in B

defmodule B.Application do
  def start(_type, _args) do
    children = [
      # ...
      {B.Worker, []},
    ]
    # ...
  end
defmodule B.Worker do
  use GenServer

  def start_link(opts) do
    {:ok, pid} = GenServer.start_link(__MODULE__, :ok, opts)

    true = Node.connect(:"a@MacBook-Pro")
    :ok = :pg2.join(:some_group, pid)

    {:ok, pid}
  end
end

Send message in A

iex(a@MacBook-Pro)1> [pid] = :pg2.get_members :some_group
iex(a@MacBook-Pro)2> send(pid, :hello) 

Handle messages from A in B

defmodule B.Worker do
  def handle_info(msg, state) do
    IO.inspect msg

    {:noreply, state}
  end
end
2 Likes

Maybe someone will be interested in my final implementation of the subscriber module

defmodule B.Worker do
  use GenServer

  @reconnect_interval 1_000

  def start_link(opts) do
    GenServer.start_link(__MODULE__, :ok, opts)
  end

  def init(:ok) do
    Process.send_after(self(), :join_pg2, 0)

    {:ok, %{}}
  end

  def handle_info(:join_pg2, state) do
    node_name = :"a@MacBook-Pro"

    case Node.connect(agent_node) do
      true ->
        if :group_name not in :pg2.which_groups() do
          :global.sync()
          :ok = :pg2.join(:group_name, self())
        end

        Node.monitor(node_name, true)

        {:noreply, state}

      false ->
        Process.send_after(self(), :join_pg2, @reconnect_interval)

        {:noreply, state}
    end
  end

  # from Node.monitor
  def handle_info({:nodedown, _node}, state) do
    Process.send_after(self(), :join_pg2, 0)

    {:noreply, state}
  end

  def handle_info(msg, state) do
    IO.inspect msg

    {:noreply, state}
  end
end

6 Likes

You can just do

send(self(), :join_pg2)

2 Likes

I invite you to have a look at Distributed Phoenix Chat with PubSub PG2 adapter.