SSE chunks and compression - responses being sent but without gzip compression

Hi I am trying to use Server Sent Events.
I want to send chunked responses.

I am able to send them like so

conn
      |> put_resp_header("cache-control", "no-cache")
      |> put_resp_header("connection", "keep-alive")
      |> put_resp_header("content-type", "text/event-stream;")
      |> send_chunked(200)
      |> chunk("This is some respsonse")

The responses are being sent, but without gzip compression.
How do i get responses to be sent with gzip compression?

I am using the latest phoenix with latest bandit.
Please advise.

PS If I don’t send chunked response, it gets compressed. For ex -

conn
      |> put_resp_header("cache-control", "no-cache")
      |> put_resp_header("connection", "keep-alive")
      |> put_resp_header("content-type", "text/event-stream;")
      |> text("This would get compressed")

Is some magic happening in the text/2 method?

1 Like

Got it to work.
We need to set the content-encoding to gzip
and then manually compress the responses like so

conn
      |> put_resp_header("cache-control", "no-cache")
      |> put_resp_header("connection", "keep-alive")
      |> put_resp_header("content-type", "text/event-stream;")
      |> put_resp_header("content-encoding", "gzip")
      |> send_chunked(200)

And then compress the message manually and send them

defp send_chunk(conn, message) do
    message = :zlib.gzip(message)
    {:ok, conn} =
      conn
      |> prepare_sse()
      |> chunk(message)
    conn
  end
6 Likes

thank you for reporting back, I am sure it’ll be useful to someone

1 Like

SSE compression in long lived connection for streaming updates –

# lib/aet_web/controllers/sse_controller.ex
defmodule AetWeb.SseController do
  use AetWeb, :controller

  def stream(conn, _params) do
    conn =
      conn
      |> put_resp_header("content-type", "text/event-stream")
      |> put_resp_header("content-encoding", "gzip")
      |> put_resp_header("cache-control", "no-cache")
      # |> put_resp_header("connection", "keep-alive")
      |> send_chunked(200)

    # Initialize zlib compression context with gzip wrapper
    z = :zlib.open()
    :ok = :zlib.deflateInit(z, 9, :deflated, 31, 8, :default)  # 31 = gzip format

    # Subscribe to PubSub
    Phoenix.PubSub.subscribe(Aet.PubSub, "events")

    # Start streaming
    try do
      stream_events(conn, z)
    after
      # IO.puts("Finalizing stream...")
      compressed_end = :zlib.deflate(z, "", :finish)
      # IO.inspect(compressed_end, label: "Final chunk")
      Plug.Conn.chunk(conn, compressed_end)
      cleanup_zlib(z)
    end
  end

  defp stream_events(conn, z) do
    receive do
      {:event, data} ->
        # Compress incrementally with sync flush
        compressed_chunk = :zlib.deflate(z, data, :sync)  # :sync flushes immediately

        case Plug.Conn.chunk(conn, compressed_chunk) do
          {:ok, conn} ->
            IO.puts("Chunk sent successfully")
            stream_events(conn, z)
          {:error, :closed} ->
            IO.puts("Client disconnected")
            conn  # Cleanup handled in `after` clause
        end
    after
      1000 ->  # Timeout to prevent infinite loop if no events
        IO.puts("No events received in 1s, continuing...")
        stream_events(conn, z)
    end
  end

  defp cleanup_zlib(z) do
    IO.puts("Cleaning up zlib context...")
    case :zlib.deflateEnd(z) do
      :ok -> :zlib.close(z)
      error ->
        IO.inspect(error, label: "DeflateEnd error")
        :zlib.close(z)
    end
  rescue
    e ->
      IO.inspect(e, label: "Cleanup error")
      :zlib.close(z)
  end
end
2 Likes