Handling crashes of a pubsub

I’ve currently got a pubsub app I’m using as an event dispatcher between a couple umbrella applications. It’s built using Phoenix.PubSub and so far it’s working great.

One thing I’m unsure of is how to handle the pubsub app crashing. It’s all supervised so it should come back up but all the subscribers held in state would be gone at that point. I’m not sure if there’s an OTP way of handling this or if I’d need to use something like Redis to store the pubsub GenServer state.

2 Likes

Phoenix.PubSub.subscribe accepts a link: true option to link the caller to the pubsub server, which would take it down if the pubsub server crashes. So it sounds like this is what you want pass. Alternatively, you could choose to monitor the server and react to the DOWN event as you see fit.

3 Likes

Is this assuming the subscriber is supervised and the supervisor is responsible for registering the subscriber with the pubsub app? Or something along those lines.

Yes, the supervisor of the subscriber would be responsible for restarting the process, or in the case of channel processes, which link: true under the hood, the remote client is responsible for failure recovery by detecting the error and rejoining.

1 Like

I’m still struggling a bit. I’m probably doing something basic wrong but I’m confused.

Here’s my EventServer:

defmodule EventServer do
  @moduledoc """
  """

  #
  # client
  #

  @spec broadcast(struct) :: :ok | { :error, term }
  def broadcast(event) do
    Phoenix.PubSub.broadcast(EventServer.PubSub, to_string(event.__struct__), event)
  end

  @spec subscribe(module, atom | binary) :: :ok | { :error, term }
  def subscribe(subscriber, topic) do
    GenServer.start_link(__MODULE__, { subscriber, to_string(topic), :handle_event })

    :ok
  end

  @spec unsubscribe(atom | binary) :: :ok | { :error, term }
  def unsubscribe(topic) do
    Phoenix.PubSub.unsubscribe(EventServer.PubSub, to_string(topic))
  end

  #
  # callbacks
  #

  def init({ subscriber, topic, fun }) do
    Phoenix.PubSub.subscribe(EventServer.PubSub, topic, link: true)

    { :ok, { subscriber, fun } }
  end

  def handle_info(msg, { subscriber, fun } = state) do
    apply(subscriber, fun, [ msg ])

    { :noreply, state }
  end
end

My supervisor:

defmodule EventServer.Supervisor do
  @moduledoc false

  use Supervisor

  #
  # client
  #

  def start_link() do
    Supervisor.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  #
  # callbacks
  #

  def init(:ok) do
    Supervisor.init(children(), strategy: :one_for_one)
  end

  #
  # private
  #

  defp children() do
    [
      supervisor(Phoenix.PubSub.PG2, [ EventServer.PubSub, [] ])
    ]
  end
end

Here’s the code I’m using to manually test things:

defmodule Event do
  defstruct [ :some_attr ]
end

defmodule Listener do
  use GenServer

  def start_link(_args) do
    GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def init(:ok) do
    IO.inspect "HERE"
    { :ok, nil }
  end

  def handle_event(%Event{} = _event) do
    IO.inspect "HELLOOOOO"
  end
end

defmodule SupSup do
  def start_link() do
    Supervisor.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def init(:ok) do
    Supervisor.init(children(), strategy: :one_for_one)
  end

  defp children() do
    [
      { Listener, [] }
    ]
  end
end

SupSup.start_link()

EventServer.subscribe(Listener, Event)
EventServer.broadcast(%Event{})

Supervisor.stop(EventServer.Supervisor)
:timer.sleep(500)
EventServer.Supervisor.start_link()

EventServer.broadcast(%Event{})

When I stop EventServer.Supervisor I would expect my Listener GenServer to also stop when using link: true but it doesn’t.

Supervisor.stop/1 causes an exit with reason :normal, which won’t trigger links. Try Supervisor.stop(EventServer.Supervisor, :boom).

2 Likes

@dom, thanks for pointing this out. Unfortunately, it made no difference.

I’ve modified what I had to, I think, be more in line with how Phoenix.PubSub works.

I’ve removed the GenServer bits from EventServer and moved them to the listener in order to link the listener’s GenServer process instead of EventServer's. It also cleaned up my API a bit which is a plus.

Unfortunately, I’m still having the same issue.

EventServer module

defmodule EventServer do
  @moduledoc """
  """

  @spec broadcast(struct) :: :ok | { :error, term }
  def broadcast(event) do
    Phoenix.PubSub.broadcast(EventServer.PubSub, to_string(event.__struct__), event)
  end

  @spec subscribe(atom | binary) :: :ok | { :error, term }
  def subscribe(topic) do
    Phoenix.PubSub.subscribe(EventServer.PubSub, to_string(topic), link: true)
  end

  @spec unsubscribe(atom | binary) :: :ok | { :error, term }
  def unsubscribe(topic) do
    Phoenix.PubSub.unsubscribe(EventServer.PubSub, to_string(topic))
  end
end

Code for manual testing:

defmodule Event do
  defstruct [ :some_attr ]
end

defmodule Listener do
  use GenServer

  def start_link(_args) do
    GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def init(:ok) do
    EventServer.subscribe(Event)
    
    { :ok, nil }
  end

  def handle_info(event, state) do
    IO.inspect("HERE")

    { :noreply, state }
  end
end

defmodule SupSup do
  def start_link() do
    Supervisor.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def init(:ok) do
    Supervisor.init(children(), strategy: :one_for_one)
  end

  defp children() do
    [
      { Listener, [] },
    ]
  end
end

SupSup.start_link()

EventServer.broadcast(%Event{}) # successfully broadcasts, prints "HERE" !!!

Supervisor.stop(EventServer.Supervisor, :boom)
:timer.sleep(500)
EventServer.Supervisor.start_link()

EventServer.broadcast(%Event{}) # does nothing

I was finally able to get it to work. The final problem ended up being the restart strategy when the subscriber was trying to start again while the pubsub was still down.

Here’s the final code:

EventServer

defmodule EventServer do
  @moduledoc """
  """

  @spec broadcast(struct) :: :ok | { :error, term }
  def broadcast(event) do
    Phoenix.PubSub.broadcast(EventServer.PubSub, to_string(event.__struct__), event)
  end

  @spec subscribe(atom | binary) :: :ok | { :error, term }
  def subscribe(topic) do
    if Process.whereis(EventServer.PubSub) do
      Phoenix.PubSub.subscribe(EventServer.PubSub, to_string(topic), link: true)
    else
      { :error, :down }
    end
  end

  @spec unsubscribe(atom | binary) :: :ok | { :error, term }
  def unsubscribe(topic) do
    Phoenix.PubSub.unsubscribe(EventServer.PubSub, to_string(topic))
  end
end

Manual testing code:

defmodule Event do
  defstruct [ :some_attr ]
end

defmodule Listener do
  use GenServer

  def start_link(_args) do
    GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def init(:ok) do
    send(self(), :setup)

    { :ok, nil }
  end

  def handle_info(:setup, state) do
    case EventServer.subscribe(Event) do
      { :error, :down } ->
        :timer.sleep(5000)
        send(self(), :setup)
      _ ->
        # do nothing
    end

    { :noreply, state }
  end

  def handle_info(event, state) do
    IO.inspect("HERE")

    { :noreply, state }
  end
end

defmodule SupSup do
  def start_link() do
    Supervisor.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def init(:ok) do
    Supervisor.init(children(), max_restarts: 100, strategy: :one_for_one)
  end

  defp children() do
    [
      { Listener, [] },
    ]
  end
end

SupSup.start_link()

EventServer.broadcast(%Event{}) # successfully broadcasts !!!

Supervisor.stop(EventServer.Supervisor, :boom)
:timer.sleep(5000)
EventServer.Supervisor.start_link()

EventServer.broadcast(%Event{}) # does nothing

A lot of the code in Listener will need to go into an EventServer.Listener type module that will be use-ed by the subscriber implementations.

If anybody has any suggestions on how to make the above better or if I’m doing something dumb please feel free to comment: :blush:

1 Like