(Sorry, this post may be a bit more stream of consciousness than I thought, but if nothing else it’s helping me rubber duck a bit. Hopefully it makes as much sense outside of my head than it does inside)
I am using ranch, but I may be missing how to make my life easier outside of socket pooling and transport wrapping.
The code I have in my protocol handler right now looks like:
defmodule RtmpServer.Handler do
defmodule State do
defstruct ip: nil,
previous_headers: %{},
connection_details: %RtmpCommon.ConnectionDetails{}
end
require Logger
@moduledoc "Handles the rtmp socket connection"
@doc "Starts the handler for an accepted socket"
def start_link(ref, socket, transport, opts) do
pid = spawn_link(__MODULE__, :init, [ref, socket, transport, opts])
{:ok, pid}
end
def init(ref, socket, transport, _opts) do
:ok = :ranch.accept_ack(ref)
{:ok, {ip, _port}} = :inet.peername(socket)
client_ip_string = ip |> Tuple.to_list() |> Enum.join(".")
Logger.info "#{client_ip_string}: client connected"
case RtmpServer.Handshake.process(socket, transport) do
{:ok, client_epoch} ->
Logger.debug "#{client_ip_string}: handshake successful"
state = %State{
ip: ip,
connection_details: %RtmpCommon.ConnectionDetails{peer_epoch: client_epoch}
}
{:error, reason} = read_next_chunk(socket, transport, state)
Logger.debug "#{client_ip_string}: connection error: #{reason}"
{:error, reason} -> Logger.info "#{client_ip_string}: handshake failed (#{reason})"
end
end
def read_next_chunk(socket, transport, state = %State{}) do
case RtmpCommon.Chunking.read_next_chunk(socket, transport, state.previous_headers) do
{:ok, {updated_headers, header, data}} ->
process_chunk(socket, transport, %{state | previous_headers: updated_headers}, header, data)
{:error, reason} -> {:error, reason}
end
end
defp process_chunk(socket, transport, state, chunk_header, chunk_data) do
client_ip = state.ip |> Tuple.to_list() |> Enum.join(".")
result = with {:ok, received_message} <- RtmpCommon.Messages.Parser.parse(chunk_header.message_type_id, chunk_data),
:ok <- log_received_message(client_ip, received_message),
do: RtmpCommon.MessageHandler.handle(received_message, state.connection_details)
case result do
{:ok, {new_connection_details, _response}} ->
__MODULE__.read_next_chunk(socket, transport, %{state | connection_details: new_connection_details})
{:error, {:no_handler_for_message, message_type}} ->
Logger.debug "#{client_ip}: no handler for message: #{inspect(message_type)}"
__MODULE__.read_next_chunk(socket, transport, state)
end
end
defp log_received_message(client_ip, message) do
Logger.debug "#{client_ip}: Message received: #{inspect(message)}"
end
end
That code loop around reading RTMP chunks over and over again, parsing them to get the header and data, parse an RtmpMessage out of the data, then handle that specific message.
The problem I am having, and where I am probably over-thinking it, is in the past 2-3 months of experimenting and playing around with random small projects in Erlang and Elixir it has been ingrained in me the power of reacting to messages.
The problem is the RtmpCommon.Chunking.read_next_chunk()
is forced to make many manual calls to transport.recv()
due to the RTMP protocol (Have to read the first byte to know how long the next piece of data is, I then parse the next piece of data and compare that to previously received data to know how long the next bit of data is, etc…).
That seems to be preventing me from having a message based process (or even using a gen_server/gen_fsm) which will help simplify the architecture. Being able to react to messages in a timely manner will be extremely useful for rtmp clients (watchers of video), pings, external notifications from other processes, etc…
One of the workarounds I have thought of so far far is to add a receive
at the beginning of the read_next_chunk()
function, but that means if the TCP client does not send data I will end up delaying receiving and responding to a single BEAM message (and it’s definitely not guaranteed to be constant tcp traffic coming into the server).
Another idea I had is to make this handler module do nothing but receive/parse chunks and then send to a gen_server or gen_fsm to actually process the RTMP chunk. That would allow the second process to react to BEAM messages (including timer messages it sends itself) and have the benefit of it isolating itself out of the chunk reading process. The downside to this is that either the 2nd process has to have the socket and transport so it can format and send responses itself, or it needs to be a synchronous gen_server so it can return the all of the responses the handler should send back to the client (and use some type of queue to queue up pings and other non-responsive messages).
The last idea is to just use {active, once}, {packet, raw}
so that TCP messages are received as raw packets, and write the protocol handler module to be able to keep incomplete rtmp packets in state so it can be processed on the next tcp packet received message that I receive. At that point the whole protocol handler can live inside a single gen_server or gen_fsm and be fully reactive.
After typing that out, I guess the last idea is probably going to be the best way to go.