Handling client disconnect during ServerSentEvents using Plug with Bandit

I have been trying to implement an SSE endpoint to send periodic updates to the browser client. Even when the client is closed (navigate away from site, close browser tab), I still see the Bandit.DelegatingHandler process created for the plug handler alive in the observer. I was hoping that the client leaving the connection would cause the handler to exit without my intervention. When using Plug.Cowboy with the same setup the cowboy_stream_h handler will exit after a short matter of time when the client leaves the connection. (after some further experimentation I see that the Cowboy handler exits and resets on a regular period even when the client connection is not severed… this must be a default general timeout)

How does Bandit/Thousand Island handle client disconnects? Are there any options or configurations I am missing to avoid the possible process leak?

Are there better patterns for implementation here?

@mtrudel I very much appreciate the tools you have built. Any tips or direction?

Let me know if there is any further context needed. Thank you all kindly!

defmodule Web.Router do
  use Plug.Router
  use Plug.ErrorHandler

  plug :match
  plug :dispatch

  get "/sse" do
    chunked_conn = prepare_sse(conn)

    CallbackRunner.run_on_event(fn x ->
      send_message(chunked_conn, %{data: [1, 2, 3]})
    end)
    
    Process.flag(:trap_exit, true)
    sleep_until_client_exit(chunked_conn, self())
  end

  defp prepare_sse(conn) do
    conn
    |> Plug.Conn.put_resp_header("connection", "keep-alive")
    |> Plug.Conn.put_resp_header("content-type", "text/event-stream")
    |> send_chunked(200)
  end

  defp make_message(params) do
    data = :jiffy.encode(params)
    "event: message\ndata: #{data}\nid: #{Web.generate_id()}\nretry: 6000\n\n"
  end

  defp send_message(conn, msg) do
    sse_msg = make_message(msg)
    conn |> chunk(sse_msg)
  end

  defp sleep_until_client_exit(conn, pid) do
    receive do
      {:EXIT, _from, _reason} ->
         IO.inspect("EXIT")
         conn
      _other ->
         sleep_until_client_exit(conn, pid)
    end
  end

  get "/" do
    html = """
      <!DOCTYPE html>
      <html lang='en'>
      <head>
        <meta charset='UTF-8'>
        <meta http-equiv='X-UA-Compatible' content='IE=edge'>
        <meta name='viewport' content='width=device-width, initial-scale=1.0'>
        <title></title>
      </head>
      <body>
        <script>
          const evtSource = new EventSource('/sse')
          evtSource.onopen = (event) => console.log(event)
          evtSource.onerror = (event) => console.error(event)
          evtSource.onmessage = (msg) => console.log(msg)
        </script>
      </body>
      </html>
    """
    
    send_resp(conn, 200, html)
  end
end
2 Likes

This is the same underlying issue as Stopping the request handler · Issue #202 · mtrudel/bandit · GitHub; the tl;dr here is that the single-function nature of the Plug API makes it impossible for a single-process-per-connection model such as Bandit to close connections eagerly (at least for HTTP/1; HTTP/2 connections are fundamentally multi-process & should eagerly shut down requests on client disconnection).

All is not lost, however. You should be able to detect this with your code! As of 1.0.0-pre.14 (just released a few minutes ago), your application code will see any errors encountered (including {:error, :closed}) in the return value of Plug.Conn.chunk/2 calls. You should be able to case on the output of this value in your send_message/2 function in order to properly shut things down as needed.

2 Likes

@mtrudel thank you for connecting all the dots! I’ll dive in and see what I can do.

Congrats on the recent release :wink: exciting times ahead.

E

2 Likes

Worked like a charm.

  defp send_message(conn, msg) do
    sse_msg = make_message(msg)
    conn |> chunk(sse_msg)
  end

  defp sleep_until_client_exit(conn, pid) do
    Process.sleep(5000)
    case send_message(conn, %{heartbeat: true}) do
      {:ok, ^conn} ->
        sleep_until_client_exit(conn, pid)
      {:error, reason} ->
        IO.inspect(reason)
        conn
      _ ->
        sleep_until_client_exit(conn, pid)
    end
  end
2 Likes