Keeping track of WebSocket disconnects

Hmmm, perhaps, but at first sight seemed overkill to me. I do not want to track presences globally, nor I want to notify anyone about diffs. I only want to log when a socket is closed.

It’s not specific to websocket, it is for GenServer…

https://hexdocs.pm/elixir/GenServer.html#c:terminate/2

You can see why it is not always called.

BTW, a simple monitor will catch those disconnections too.

Could you eleborate the idea based on a monitor?

On websocket join… You can add a Process.monitor to another process, in charge of catching DOWN events.

I’ve found something similar for channels from @chrismccord (hey!) here. I might try going that route, it’s more idiomatic than the ad-hoc channel.

That is exactly what I meant :slight_smile:

1 Like

I see a practical blocker, don’t know how to get the PID of the process you want to monitor from the socket module.

In theory, if I understand it correctly, that should be socket.tranport_pid, but that is still nil within connect/3.

Sorry, I monitor channel, not socket… You cannot use self() in socket connect.

See this older topic.

I have read there is an instrumentation event :phoenix_socket_connect, but I am not familiar with Phoenix instrumentation, searching right now in case that may help.

Ah, I see how it goes via :telemetry in the implementation of Phoenix.Logger.

Nah, in the event I do not have the socket, or the assigns, or the socket ID. You get these arguments:

[:phoenix, :socket_connected]
%{duration: 398900}
%{
  connect_info: %{},
  endpoint: DriverTelemetryWeb.Endpoint,
  log: false,
  params: %{"jwt" => "foo", "vsn" => "2.0.0"},
  result: :ok,
  serializer: Phoenix.Socket.V2.JSONSerializer,
  transport: :websocket,
  user_socket: DriverTelemetryWeb.DriverSocket,
  vsn: "2.0.0"
}
:ok

Running out of the ideas with the existing APIs.

I’ll sleep on it.

You can do it by defining your own Phoenix.Socket and implementing init with a track call. I didn’t want to do all of that, and was okay with a warning appearing, so I just defined the function before the use Phoenix.Socket call. https://github.com/pushex-project/pushex/blob/master/lib/push_ex_web/channels/push_socket.ex#L8

That isn’t best practice, but demonstrates how you can track a Socket instead of a Channel. You enter a world of “may not be supported” once you do this, but I think it’s pretty small here.

2 Likes

Awesome, so I have a preliminary solution for this. Let me share it.

First, I think the natural place to log the connection is connect/2:

defmodule MyAppWeb.DriverSocket do
  def connect(%{"jwt" => jwt}, socket) do
    case verify(jwt) do
      {:ok, driver_id} ->
        Logger.info("Driver #{driver_id} connected")
        {:ok, assign(socket, :driver_id, driver_id)}

      _ ->
        :error
    end
  end
end

We’ll log the disconnection in the monitor:

defmodule MyAppWeb.SocketMonitor do
  use GenServer
  require Logger

  def start_link(_) do
    GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
  end

  def monitor(socket_pid, driver_id) do
    GenServer.call(__MODULE__, {:monitor, {socket_pid, driver_id}})
  end

  @impl true
  def init(sockets) do
    {:ok, sockets}
  end

  @impl true
  def handle_call({:monitor, {socket_pid, driver_id}}, _from, sockets) do
    Process.monitor(socket_pid)
    {:reply, :ok, Map.put(sockets, socket_pid, driver_id)}
  end

  @impl true
  def handle_info({:DOWN, _ref, :process, socket_pid, _reason}, sockets) do
    {driver_id, sockets} = Map.pop(sockets, socket_pid)
    Logger.info("Driver #{driver_id} disconnected")
    {:noreply, sockets}
  end
end

You start the monitor when the application boots:

defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    children = [
      # ...
      MyAppWeb.SocketMonitor
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

And, finally, the custom definition of init/1 in the socket module (I’ve simplified the one above):

defmodule MyAppWeb.DriverSocket do
  def init(state) do
    res = {:ok, {_, socket}} = Phoenix.Socket.__init__(state)
    MyAppWeb.SocketMonitor.monitor(socket.transport_pid, socket.assigns.driver_id)
    res
  end

  # Must go below, so our init/1 matches first.
  use Phoenix.Socket
end

This is redefining an internal method, but the risk seems controlled to me. I guess the test suite of the application would be all broken if Phoenix changes anything related to that.

Of course, it would be really cool that Phoenix had public API for this use case.

It would be awesome to be able to get rid of the warning, “this clause cannot match because a previous clause…”. Let me /cc @OvermindDL1 since he knows all the tricks.

This alternative does not issue a warning and it is not coupled to the implementation of the original init/1, only to its return value:

defmodule MyAppWeb.DriverSocket do
  use Phoenix.Socket
  defoverridable init: 1

  def init(state) do
    res = {:ok, {_, socket}} = super(state)
    MyAppWeb.SocketMonitor.monitor(socket.transport_pid, socket.assigns.driver_id)
    res
  end
end
2 Likes

Oh nice! I didn’t really know you could do that. I like that it’s not dependent on the implementation.

Next step is to replace the socket monitor GenServer with a simple process spawn in init/1 that monitors and waits for the :DOWN message:

defmodule MyAppWeb.DriverSocket do
  use Phoenix.Socket
  defoverridable init: 1

  def init(state) do
    res = {:ok, {_, socket}} = super(state)
    monitor(socket)
    res
  end

  defp monitor(socket) do
    spawn fn ->
      Process.monitor(socket.transport_pid)
      receive do
        {:DOWN, _ref, :process, _monitored_pid, _reason} ->
          Logger.info("Driver #{socket.assigns.driver_id} disconnected")
      end
    end
  end
end

This is so simple in comparison! And for this particular use case it seems to suffice. Do you see any drawbacks?

The biggest downside of a monitor vs a link is if the spawned process dies, you’re going to lose your monitoring. However, that chance is fairly low and something you can probably live with if this is just for analysis.

If you’re doing business logic based on the socket being up, then you could end up in a messed up state. Phoenix tracker solves this by linking the channel process to a single process so that deaths are bidirectional (although the tracker process handles it via a down)

Yeah, in considering this I am taking into account that the spawned function is trivial, the template would be:

defp monitor(socket) do
  spawn fn ->
    Process.monitor(socket.transport_pid)
    receive do
      {:DOWN, _ref, :process, _monitored_pid, _reason} ->
        on_disconnect(socket)
    end
  end
end

defp on_disconnect(socket) do
  # Log disconnection.
  # Decrement gauge for users connected to this node in metrics.
end

I do not even match on the socket pid, because obviously that process cannot receive any other :DOWN message.

That function does not even use OTP, if the Process.monitor/1 call crashes, or the process leaks because the message is not received when the process managing the socket dies, there would be something really broken in the language implementation.

I’ve see that socket.transport_pid is equal to self() in the test suite. Therefore, you cannot test what happens when you disconnect by disconnecting!

My pragmatic compromise for this right now, subject to anyone suggesting a better idea, is to assume that the monitor code is exactly this simple:

defp monitor(socket) do
  spawn(fn ->
    Process.monitor(socket.transport_pid)
    receive do
      {:DOWN, _ref, :process, _monitored_pid, _reason} ->
        on_disconnect(socket)
    end
  end)
end

and testing on_disconnect/1.

I have been able to test socket disconnection by using a separate process. The idea is the following.

First, in the decorated init/1 you store the PID of the monitor in the socket assigns:

def init(state) do
  {:ok, {state, socket}} = super(state)
  monitor_pid = monitor(socket)
  {:ok, {state, assign(socket,  :monitor_pid, monitor_pid)}}
end

Then, in the test you execute the connect/2 call in a separate process, so killing it does not kill the actual test (because, as I explained above, the transport PID is the current process PID).

We need to block until the socket monitor has processed the :DOWN message and exited, to be able to test what should happen in a disconnection, so this needs some coordination:

test "we log driver disconnections" do
  parent = self()

  child = spawn(fn ->
    {:ok, socket} = connect(Subject, @params)
    send parent, {self(), socket, :connected}
    Process.sleep(:infinity)
  end)

  receive do
    {^child, socket, :connected} ->
      kill_socket_and_wait_for_its_monitor = fn ->
        ref = Process.monitor(socket.assigns.monitor_pid)
        Process.exit(socket.transport_pid, :kill)
        receive do
          {:DOWN, ^ref, :process, _monitored_pid, _reason} ->
            # Just block until the socket monitor has processed :DOWN and do
            # nothing.
            nil
        end
      end

      assert_log :info,
                 kill_socket_and_wait_for_its_monitor,
                 "Driver #{socket.assigns.driver_id} disconnected"
  end

There, assert_log/3 is a wrapper around capture_log/2, not relevant for the technique.

I find the test a bit ugly, but will do for 1:55 AM. Will probably polish it after some sleep.