Hi I am trying to use Server Sent Events.
I want to send chunked responses.
I am able to send them like so
conn
|> put_resp_header("cache-control", "no-cache")
|> put_resp_header("connection", "keep-alive")
|> put_resp_header("content-type", "text/event-stream;")
|> send_chunked(200)
|> chunk("This is some respsonse")
The responses are being sent, but without gzip compression.
How do i get responses to be sent with gzip compression?
I am using the latest phoenix with latest bandit.
Please advise.
PS If I don’t send chunked response, it gets compressed. For ex -
conn
|> put_resp_header("cache-control", "no-cache")
|> put_resp_header("connection", "keep-alive")
|> put_resp_header("content-type", "text/event-stream;")
|> text("This would get compressed")
Is some magic happening in the text/2 method?
1 Like
Got it to work.
We need to set the content-encoding to gzip
and then manually compress the responses like so
conn
|> put_resp_header("cache-control", "no-cache")
|> put_resp_header("connection", "keep-alive")
|> put_resp_header("content-type", "text/event-stream;")
|> put_resp_header("content-encoding", "gzip")
|> send_chunked(200)
And then compress the message manually and send them
defp send_chunk(conn, message) do
message = :zlib.gzip(message)
{:ok, conn} =
conn
|> prepare_sse()
|> chunk(message)
conn
end
6 Likes
thank you for reporting back, I am sure it’ll be useful to someone
1 Like
SSE compression in long lived connection for streaming updates –
# lib/aet_web/controllers/sse_controller.ex
defmodule AetWeb.SseController do
use AetWeb, :controller
def stream(conn, _params) do
conn =
conn
|> put_resp_header("content-type", "text/event-stream")
|> put_resp_header("content-encoding", "gzip")
|> put_resp_header("cache-control", "no-cache")
# |> put_resp_header("connection", "keep-alive")
|> send_chunked(200)
# Initialize zlib compression context with gzip wrapper
z = :zlib.open()
:ok = :zlib.deflateInit(z, 9, :deflated, 31, 8, :default) # 31 = gzip format
# Subscribe to PubSub
Phoenix.PubSub.subscribe(Aet.PubSub, "events")
# Start streaming
try do
stream_events(conn, z)
after
# IO.puts("Finalizing stream...")
compressed_end = :zlib.deflate(z, "", :finish)
# IO.inspect(compressed_end, label: "Final chunk")
Plug.Conn.chunk(conn, compressed_end)
cleanup_zlib(z)
end
end
defp stream_events(conn, z) do
receive do
{:event, data} ->
# Compress incrementally with sync flush
compressed_chunk = :zlib.deflate(z, data, :sync) # :sync flushes immediately
case Plug.Conn.chunk(conn, compressed_chunk) do
{:ok, conn} ->
IO.puts("Chunk sent successfully")
stream_events(conn, z)
{:error, :closed} ->
IO.puts("Client disconnected")
conn # Cleanup handled in `after` clause
end
after
1000 -> # Timeout to prevent infinite loop if no events
IO.puts("No events received in 1s, continuing...")
stream_events(conn, z)
end
end
defp cleanup_zlib(z) do
IO.puts("Cleaning up zlib context...")
case :zlib.deflateEnd(z) do
:ok -> :zlib.close(z)
error ->
IO.inspect(error, label: "DeflateEnd error")
:zlib.close(z)
end
rescue
e ->
IO.inspect(e, label: "Cleanup error")
:zlib.close(z)
end
end
2 Likes