I’m facing a peculiar challenge with a socket server I’ve built using Elixir and Cowboy WebSocket (Cowboy WebSocket documentation). This server has been in production for a while, handling substantial traffic. It consumes messages from RabbitMQ, processes them, and publishes messages to clients based on their subscribed channels.
The issue arises with data-out costs. To tackle this, I enabled built-in compression in Cowboy. However, the problem is that messages are compressed separately for each client. For instance, if a message needs to be sent to 1000 clients, it gets compressed 1000 times, one for each client process. This approach has caused high CPU overhead and spiking latencies, especially during message bursts.
To address this, I’m considering an alternative:
Pre-compressing messages when they’re consumed from RabbitMQ and sending the pre-compressed messages directly to clients that support compression. For clients that don’t support compression, the original uncompressed message would be sent instead. The plan is to add relevant headers so that clients (mostly web browsers) can automatically decompress messages without requiring any changes on the frontend.
However, I’m unclear about how this approach interacts with WebSocket compression features like server_context_takeover, server_max_window_bits, etc. Since Cowboy optimizes compression by managing compression contexts across frames, how would this work when the messages are already pre-compressed?
Has anyone encountered a similar problem or implemented a solution for this? I assume this is a common challenge for socket servers serving public data.
Any insights, best practices, or ideas to optimize CPU and latency in this scenario would be greatly appreciated!
Edit: GoLang’s Gorrila Websocket has a functionality called PreparedMessage that will solve my issues. But plugging this functionality into the cowboy library is way beyond my skill. I can try to implement it when I have some free time.
This does not answer the question of how to serve shared compressed values, but have you tried changing the deflate options to adjust compression settings?
A more experimental option would be to create your own zlib context and emulate what Cowboy does but return the raw frame {binary, iodata()} from the websocket handler directly, where the payload is pulled by sequence number from a shared ETS cache.
Given your problem I think you’re on the right track to pre-compress the payload you want to send. I would not use the deflate options of the websocket itself as this is always a 1:1 relation. I would assume there’s a single point in your app where you broadcast from the single rabbitmq message to multiple (websocket) receivers. That would be the place in your code to do the one-time compression and send it out to all the receivers as a binary frame.
I’ve made a lot of websocket implementations for high traffic websocket servers and a lot of them implemented compression of their payload but none of them used the built-in compression extension of websockets (rfc7692), they just send it out as binary frame and specify in their documentation which form of compression they have used. Another reason I would not opt for the built-in compression is that a client can always ignore it or express that it doesnt support compression (following the standard negotiation flow) and opt for the uncompressed stream and that would undo what you want.
Another thing which can also help you a lot with burst traffic is to implement some form of buffering and sending data out in “ticks”. In my implementation I sometimes had to handle 1000s of messages per second and then I would just buffer everything within 250ms of 1sec and compress and send it out once per that time interval. I was not able to give back pressure to my (external) upstream so this helped a lot with high traffic spikes and not fill up the mailboxes of the involved actors/processes.
defmodule BinaryDataOverPhoenixSockets.Transports.MessagePackSerializer do
@moduledoc false
@behaviour Phoenix.Transports.Serializer
alias Phoenix.Socket.Reply
alias Phoenix.Socket.Message
alias Phoenix.Socket.Broadcast
# only gzip data above 1K
@gzip_threshold 1024
def fastlane!(%Broadcast{} = msg) do
{:socket_push, :binary, pack_data(%{
topic: msg.topic,
event: msg.event,
payload: msg.payload
})}
end
def encode!(%Reply{} = reply) do
packed = pack_data(%{
topic: reply.topic,
event: "phx_reply",
ref: reply.ref,
payload: %{status: reply.status, response: reply.payload}
})
{:socket_push, :binary, packed}
end
def encode!(%Message{} = msg) do
# We need to convert the Message struct into a plain map for MessagePack to work properly.
# Alternatively we could have implemented the Enumerable behaviour. Pick your poison :)
{:socket_push, :binary, pack_data(Map.from_struct msg)}
end
# messages received from the clients are still in json format;
# for our use case clients are mostly passive listeners and made no sense
# to optimize incoming traffic
def decode!(message, _opts) do
message
|> Poison.decode!()
|> Phoenix.Socket.Message.from_map!()
end
defp pack_data(data) do
msgpacked = MessagePack.pack!(data, enable_string: true)
gzip_data(msgpacked, byte_size(msgpacked))
end
defp gzip_data(data, size) when size < @gzip_threshold, do: data
defp gzip_data(data, _size), do: :zlib.gzip(data)
end
Maybe a completely different direction but giving you different options